Add consumer socket object and relayd commands
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26
27 #include <common/common.h>
28 #include <common/defaults.h>
29 #include <common/uri.h>
30
31 #include "consumer.h"
32
33 /*
34 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
35 * be acquired before calling this function and across use of the
36 * returned consumer_socket.
37 */
38 struct consumer_socket *consumer_find_socket(int key,
39 struct consumer_output *consumer)
40 {
41 struct lttng_ht_iter iter;
42 struct lttng_ht_node_ulong *node;
43 struct consumer_socket *socket = NULL;
44
45 /* Negative keys are lookup failures */
46 if (key < 0) {
47 return NULL;
48 }
49
50 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
51 &iter);
52 node = lttng_ht_iter_get_node_ulong(&iter);
53 if (node != NULL) {
54 socket = caa_container_of(node, struct consumer_socket, node);
55 }
56
57 return socket;
58 }
59
60 /*
61 * Allocate a new consumer_socket and return the pointer.
62 */
63 struct consumer_socket *consumer_allocate_socket(int fd)
64 {
65 struct consumer_socket *socket = NULL;
66
67 socket = zmalloc(sizeof(struct consumer_socket));
68 if (socket == NULL) {
69 PERROR("zmalloc consumer socket");
70 goto error;
71 }
72
73 socket->fd = fd;
74 lttng_ht_node_init_ulong(&socket->node, fd);
75
76 error:
77 return socket;
78 }
79
80 /*
81 * Add consumer socket to consumer output object. Read side lock must be
82 * acquired before calling this function.
83 */
84 void consumer_add_socket(struct consumer_socket *sock,
85 struct consumer_output *consumer)
86 {
87 assert(sock);
88 assert(consumer);
89
90 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
91 }
92
93 /*
94 * Delte consumer socket to consumer output object. Read side lock must be
95 * acquired before calling this function.
96 */
97 void consumer_del_socket(struct consumer_socket *sock,
98 struct consumer_output *consumer)
99 {
100 int ret;
101 struct lttng_ht_iter iter;
102
103 assert(sock);
104 assert(consumer);
105
106 iter.iter.node = &sock->node.node;
107 ret = lttng_ht_del(consumer->socks, &iter);
108 assert(!ret);
109 }
110
111 /*
112 * RCU destroy call function.
113 */
114 static void destroy_socket_rcu(struct rcu_head *head)
115 {
116 struct lttng_ht_node_ulong *node =
117 caa_container_of(head, struct lttng_ht_node_ulong, head);
118 struct consumer_socket *socket =
119 caa_container_of(node, struct consumer_socket, node);
120
121 free(socket);
122 }
123
124 /*
125 * Destroy and free socket pointer in a call RCU. Read side lock must be
126 * acquired before calling this function.
127 */
128 void consumer_destroy_socket(struct consumer_socket *sock)
129 {
130 assert(sock);
131
132 /*
133 * We DO NOT close the file descriptor here since it is global to the
134 * session daemon and is closed only if the consumer dies.
135 */
136
137 call_rcu(&sock->node.head, destroy_socket_rcu);
138 }
139
140 /*
141 * Allocate and assign data to a consumer_output object.
142 *
143 * Return pointer to structure.
144 */
145 struct consumer_output *consumer_create_output(enum consumer_dst_type type)
146 {
147 struct consumer_output *output = NULL;
148
149 output = zmalloc(sizeof(struct consumer_output));
150 if (output == NULL) {
151 PERROR("zmalloc consumer_output");
152 goto error;
153 }
154
155 /* By default, consumer output is enabled */
156 output->enabled = 1;
157 output->type = type;
158 output->net_seq_index = -1;
159
160 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
161
162 error:
163 return output;
164 }
165
166 /*
167 * Delete the consumer_output object from the list and free the ptr.
168 */
169 void consumer_destroy_output(struct consumer_output *obj)
170 {
171 if (obj == NULL) {
172 return;
173 }
174
175 if (obj->socks) {
176 struct lttng_ht_iter iter;
177 struct consumer_socket *socket;
178
179 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
180 consumer_destroy_socket(socket);
181 }
182 }
183
184 free(obj);
185 }
186
187 /*
188 * Copy consumer output and returned the newly allocated copy.
189 */
190 struct consumer_output *consumer_copy_output(struct consumer_output *obj)
191 {
192 struct lttng_ht_iter iter;
193 struct consumer_socket *socket, *copy_sock;
194 struct consumer_output *output;
195
196 assert(obj);
197
198 output = consumer_create_output(obj->type);
199 if (output == NULL) {
200 goto error;
201 }
202
203 memcpy(output, obj, sizeof(struct consumer_output));
204
205 /* Copy sockets */
206 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
207
208 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
209 /* Create new socket object. */
210 copy_sock = consumer_allocate_socket(socket->fd);
211 if (copy_sock == NULL) {
212 goto malloc_error;
213 }
214
215 copy_sock->lock = socket->lock;
216 consumer_add_socket(copy_sock, output);
217 }
218
219 error:
220 return output;
221
222 malloc_error:
223 consumer_destroy_output(output);
224 return NULL;
225 }
226
227 /*
228 * Set network URI to the consumer output object.
229 *
230 * Return 0 on success. Negative value on error.
231 */
232 int consumer_set_network_uri(struct consumer_output *obj,
233 struct lttng_uri *uri)
234 {
235 int ret;
236 char tmp_path[PATH_MAX];
237 char hostname[HOST_NAME_MAX];
238 struct lttng_uri *dst_uri = NULL;
239
240 /* Code flow error safety net. */
241 assert(obj);
242 assert(uri);
243
244 switch (uri->stype) {
245 case LTTNG_STREAM_CONTROL:
246 dst_uri = &obj->dst.net.control;
247 obj->dst.net.control_isset = 1;
248 if (uri->port == 0) {
249 /* Assign default port. */
250 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
251 }
252 break;
253 case LTTNG_STREAM_DATA:
254 dst_uri = &obj->dst.net.data;
255 obj->dst.net.data_isset = 1;
256 if (uri->port == 0) {
257 /* Assign default port. */
258 uri->port = DEFAULT_NETWORK_DATA_PORT;
259 }
260 break;
261 default:
262 ERR("Set network uri type unknown %d", uri->stype);
263 goto error;
264 }
265
266 ret = uri_compare(dst_uri, uri);
267 if (!ret) {
268 /* Same URI, don't touch it and return success. */
269 DBG3("URI network compare are the same");
270 goto end;
271 }
272
273 /* URIs were not equal, replacing it. */
274 memset(dst_uri, 0, sizeof(struct lttng_uri));
275 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
276 obj->type = CONSUMER_DST_NET;
277
278 /* Handle subdir and add hostname in front. */
279 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
280 /* Get hostname to append it in the pathname */
281 ret = gethostname(hostname, sizeof(hostname));
282 if (ret < 0) {
283 PERROR("gethostname. Fallback on default localhost");
284 strncpy(hostname, "localhost", sizeof(hostname));
285 }
286 hostname[sizeof(hostname) - 1] = '\0';
287
288 /* Setup consumer subdir if none present in the control URI */
289 if (strlen(dst_uri->subdir) == 0) {
290 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
291 hostname, obj->subdir);
292 } else {
293 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
294 hostname, dst_uri->subdir);
295 }
296 if (ret < 0) {
297 PERROR("snprintf set consumer uri subdir");
298 goto error;
299 }
300
301 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
302 DBG3("Consumer set network uri subdir path %s", tmp_path);
303 }
304
305 end:
306 return 0;
307
308 error:
309 return -1;
310 }
311
312 /*
313 * Send file descriptor to consumer via sock.
314 */
315 int consumer_send_fds(int sock, int *fds, size_t nb_fd)
316 {
317 int ret;
318
319 assert(fds);
320 assert(nb_fd > 0);
321
322 ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd);
323 if (ret < 0) {
324 PERROR("send consumer fds");
325 goto error;
326 }
327
328 error:
329 return ret;
330 }
331
332 /*
333 * Consumer send channel communication message structure to consumer.
334 */
335 int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg)
336 {
337 int ret;
338
339 assert(msg);
340 assert(sock >= 0);
341
342 ret = lttcomm_send_unix_sock(sock, msg,
343 sizeof(struct lttcomm_consumer_msg));
344 if (ret < 0) {
345 PERROR("send consumer channel");
346 goto error;
347 }
348
349 error:
350 return ret;
351 }
352
353 /*
354 * Init channel communication message structure.
355 */
356 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
357 enum lttng_consumer_command cmd,
358 int channel_key,
359 uint64_t max_sb_size,
360 uint64_t mmap_len,
361 const char *name)
362 {
363 assert(msg);
364
365 /* TODO: Args validation */
366
367 /* Zeroed structure */
368 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
369
370 /* Send channel */
371 msg->cmd_type = cmd;
372 msg->u.channel.channel_key = channel_key;
373 msg->u.channel.max_sb_size = max_sb_size;
374 msg->u.channel.mmap_len = mmap_len;
375 }
376
377 /*
378 * Init stream communication message structure.
379 */
380 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
381 enum lttng_consumer_command cmd,
382 int channel_key,
383 int stream_key,
384 uint32_t state,
385 enum lttng_event_output output,
386 uint64_t mmap_len,
387 uid_t uid,
388 gid_t gid,
389 int net_index,
390 unsigned int metadata_flag,
391 const char *name,
392 const char *pathname)
393 {
394 assert(msg);
395
396 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
397
398 /* TODO: Args validation */
399
400 msg->cmd_type = cmd;
401 msg->u.stream.channel_key = channel_key;
402 msg->u.stream.stream_key = stream_key;
403 msg->u.stream.state = state;
404 msg->u.stream.output = output;
405 msg->u.stream.mmap_len = mmap_len;
406 msg->u.stream.uid = uid;
407 msg->u.stream.gid = gid;
408 msg->u.stream.net_index = net_index;
409 msg->u.stream.metadata_flag = metadata_flag;
410 strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
411 msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
412 strncpy(msg->u.stream.path_name, pathname,
413 sizeof(msg->u.stream.path_name));
414 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
415 }
416
417 /*
418 * Send stream communication structure to the consumer.
419 */
420 int consumer_send_stream(int sock, struct consumer_output *dst,
421 struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd)
422 {
423 int ret;
424
425 assert(msg);
426 assert(dst);
427
428 switch (dst->type) {
429 case CONSUMER_DST_NET:
430 /* Consumer should send the stream on the network. */
431 msg->u.stream.net_index = dst->net_seq_index;
432 break;
433 case CONSUMER_DST_LOCAL:
434 /* Add stream file name to stream path */
435 strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name));
436 strncat(msg->u.stream.path_name, msg->u.stream.name,
437 sizeof(msg->u.stream.path_name));
438 msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
439 /* Indicate that the stream is NOT network */
440 msg->u.stream.net_index = -1;
441 break;
442 default:
443 ERR("Consumer unknown output type (%d)", dst->type);
444 ret = -1;
445 goto error;
446 }
447
448 /* Send on socket */
449 ret = lttcomm_send_unix_sock(sock, msg,
450 sizeof(struct lttcomm_consumer_msg));
451 if (ret < 0) {
452 PERROR("send consumer stream");
453 goto error;
454 }
455
456 ret = consumer_send_fds(sock, fds, nb_fd);
457 if (ret < 0) {
458 goto error;
459 }
460
461 error:
462 return ret;
463 }
464
465 /*
466 * Send relayd socket to consumer associated with a session name.
467 *
468 * On success return positive value. On error, negative value.
469 */
470 int consumer_send_relayd_socket(int consumer_sock,
471 struct lttcomm_sock *sock, struct consumer_output *consumer,
472 enum lttng_stream_type type)
473 {
474 int ret;
475 struct lttcomm_consumer_msg msg;
476
477 /* Code flow error. Safety net. */
478 assert(sock);
479 assert(consumer);
480
481 /* Bail out if consumer is disabled */
482 if (!consumer->enabled) {
483 ret = LTTCOMM_OK;
484 goto error;
485 }
486
487 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
488 /*
489 * Assign network consumer output index using the temporary consumer since
490 * this call should only be made from within a set_consumer_uri() function
491 * call in the session daemon.
492 */
493 msg.u.relayd_sock.net_index = consumer->net_seq_index;
494 msg.u.relayd_sock.type = type;
495 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
496
497 DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
498 ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
499 if (ret < 0) {
500 PERROR("send consumer relayd socket info");
501 goto error;
502 }
503
504 DBG3("Sending relayd socket file descriptor to consumer");
505 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
506 if (ret < 0) {
507 goto error;
508 }
509
510 DBG2("Consumer relayd socket sent");
511
512 error:
513 return ret;
514 }
515
516 /*
517 * Send destroy relayd command to consumer.
518 *
519 * On success return positive value. On error, negative value.
520 */
521 int consumer_send_destroy_relayd(struct consumer_socket *sock,
522 struct consumer_output *consumer)
523 {
524 int ret;
525 struct lttcomm_consumer_msg msg;
526
527 assert(consumer);
528 assert(sock);
529
530 DBG2("Sending destroy relayd command to consumer...");
531
532 /* Bail out if consumer is disabled */
533 if (!consumer->enabled) {
534 ret = LTTCOMM_OK;
535 DBG3("Consumer is disabled");
536 goto error;
537 }
538
539 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
540 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
541
542 pthread_mutex_lock(sock->lock);
543 ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
544 pthread_mutex_unlock(sock->lock);
545 if (ret < 0) {
546 PERROR("send consumer destroy relayd command");
547 goto error;
548 }
549
550 DBG2("Consumer send destroy relayd command done");
551
552 error:
553 return ret;
554 }
This page took 0.039855 seconds and 4 git commands to generate.