Use consumer fd reference in consumer socket obj
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
24#include <sys/types.h>
25#include <unistd.h>
d88aee68 26#include <inttypes.h>
00e2e675
DG
27
28#include <common/common.h>
29#include <common/defaults.h>
30#include <common/uri.h>
31
32#include "consumer.h"
7972aab2
DG
33#include "health.h"
34#include "ust-app.h"
0b2dc8df 35#include "utils.h"
00e2e675 36
f50f23d9
DG
37/*
38 * Receive a reply command status message from the consumer. Consumer socket
39 * lock MUST be acquired before calling this function.
40 *
41 * Return 0 on success, -1 on recv error or a negative lttng error code which
42 * was possibly returned by the consumer.
43 */
44int consumer_recv_status_reply(struct consumer_socket *sock)
45{
46 int ret;
47 struct lttcomm_consumer_status_msg reply;
48
49 assert(sock);
50
4ce514c4 51 ret = lttcomm_recv_unix_sock(*sock->fd, &reply, sizeof(reply));
a6cd2b97
DG
52 if (ret <= 0) {
53 if (ret == 0) {
54 /* Orderly shutdown. Don't return 0 which means success. */
55 ret = -1;
56 }
3448e266 57 /* The above call will print a PERROR on error. */
4ce514c4 58 DBG("Fail to receive status reply on sock %d", *sock->fd);
f50f23d9
DG
59 goto end;
60 }
61
62 if (reply.ret_code == LTTNG_OK) {
63 /* All good. */
64 ret = 0;
65 } else {
66 ret = -reply.ret_code;
ffe60014 67 DBG("Consumer ret code %d", ret);
f50f23d9
DG
68 }
69
70end:
71 return ret;
72}
73
ffe60014
DG
74/*
75 * Once the ASK_CHANNEL command is sent to the consumer, the channel
76 * information are sent back. This call receives that data and populates key
77 * and stream_count.
78 *
79 * On success return 0 and both key and stream_count are set. On error, a
80 * negative value is sent back and both parameters are untouched.
81 */
82int consumer_recv_status_channel(struct consumer_socket *sock,
d88aee68 83 uint64_t *key, unsigned int *stream_count)
ffe60014
DG
84{
85 int ret;
86 struct lttcomm_consumer_status_channel reply;
87
88 assert(sock);
89 assert(stream_count);
90 assert(key);
91
4ce514c4 92 ret = lttcomm_recv_unix_sock(*sock->fd, &reply, sizeof(reply));
ffe60014
DG
93 if (ret <= 0) {
94 if (ret == 0) {
95 /* Orderly shutdown. Don't return 0 which means success. */
96 ret = -1;
97 }
98 /* The above call will print a PERROR on error. */
4ce514c4 99 DBG("Fail to receive status reply on sock %d", *sock->fd);
ffe60014
DG
100 goto end;
101 }
102
103 /* An error is possible so don't touch the key and stream_count. */
104 if (reply.ret_code != LTTNG_OK) {
105 ret = -1;
106 goto end;
107 }
108
109 *key = reply.key;
110 *stream_count = reply.stream_count;
111
112end:
113 return ret;
114}
115
2f77fc4b
DG
116/*
117 * Send destroy relayd command to consumer.
118 *
119 * On success return positive value. On error, negative value.
120 */
121int consumer_send_destroy_relayd(struct consumer_socket *sock,
122 struct consumer_output *consumer)
123{
124 int ret;
125 struct lttcomm_consumer_msg msg;
126
127 assert(consumer);
128 assert(sock);
129
4ce514c4 130 DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd);
2f77fc4b
DG
131
132 /* Bail out if consumer is disabled */
133 if (!consumer->enabled) {
f73fabfd 134 ret = LTTNG_OK;
2f77fc4b
DG
135 DBG3("Consumer is disabled");
136 goto error;
137 }
138
139 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
140 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
141
142 pthread_mutex_lock(sock->lock);
4ce514c4 143 ret = lttcomm_send_unix_sock(*sock->fd, &msg, sizeof(msg));
2f77fc4b 144 if (ret < 0) {
c5c45efa
DG
145 /* Indicate that the consumer is probably closing at this point. */
146 DBG("send consumer destroy relayd command");
f50f23d9 147 goto error_send;
2f77fc4b
DG
148 }
149
f50f23d9
DG
150 /* Don't check the return value. The caller will do it. */
151 ret = consumer_recv_status_reply(sock);
152
2f77fc4b
DG
153 DBG2("Consumer send destroy relayd command done");
154
f50f23d9
DG
155error_send:
156 pthread_mutex_unlock(sock->lock);
2f77fc4b
DG
157error:
158 return ret;
159}
160
161/*
162 * For each consumer socket in the consumer output object, send a destroy
163 * relayd command.
164 */
165void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
166{
2f77fc4b
DG
167 struct lttng_ht_iter iter;
168 struct consumer_socket *socket;
169
170 assert(consumer);
171
172 /* Destroy any relayd connection */
6dc3064a 173 if (consumer->type == CONSUMER_DST_NET) {
2f77fc4b
DG
174 rcu_read_lock();
175 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
176 node.node) {
c617c0c6
MD
177 int ret;
178
2f77fc4b
DG
179 /* Send destroy relayd command */
180 ret = consumer_send_destroy_relayd(socket, consumer);
181 if (ret < 0) {
c5c45efa 182 DBG("Unable to send destroy relayd command to consumer");
2f77fc4b
DG
183 /* Continue since we MUST delete everything at this point. */
184 }
185 }
186 rcu_read_unlock();
187 }
188}
189
a4b92340
DG
190/*
191 * From a consumer_data structure, allocate and add a consumer socket to the
192 * consumer output.
193 *
194 * Return 0 on success, else negative value on error
195 */
196int consumer_create_socket(struct consumer_data *data,
197 struct consumer_output *output)
198{
199 int ret = 0;
200 struct consumer_socket *socket;
201
202 assert(data);
203
204 if (output == NULL || data->cmd_sock < 0) {
205 /*
206 * Not an error. Possible there is simply not spawned consumer or it's
207 * disabled for the tracing session asking the socket.
208 */
209 goto error;
210 }
211
212 rcu_read_lock();
213 socket = consumer_find_socket(data->cmd_sock, output);
214 rcu_read_unlock();
215 if (socket == NULL) {
4ce514c4 216 socket = consumer_allocate_socket(&data->cmd_sock);
a4b92340
DG
217 if (socket == NULL) {
218 ret = -1;
219 goto error;
220 }
221
2f77fc4b 222 socket->registered = 0;
a4b92340
DG
223 socket->lock = &data->lock;
224 rcu_read_lock();
225 consumer_add_socket(socket, output);
226 rcu_read_unlock();
227 }
228
6dc3064a
DG
229 socket->type = data->type;
230
a4b92340
DG
231 DBG3("Consumer socket created (fd: %d) and added to output",
232 data->cmd_sock);
233
234error:
235 return ret;
236}
237
7972aab2
DG
238/*
239 * Return the consumer socket from the given consumer output with the right
240 * bitness. On error, returns NULL.
241 *
242 * The caller MUST acquire a rcu read side lock and keep it until the socket
243 * object reference is not needed anymore.
244 */
245struct consumer_socket *consumer_find_socket_by_bitness(int bits,
246 struct consumer_output *consumer)
247{
248 int consumer_fd;
249 struct consumer_socket *socket = NULL;
250
251 switch (bits) {
252 case 64:
253 consumer_fd = uatomic_read(&ust_consumerd64_fd);
254 break;
255 case 32:
256 consumer_fd = uatomic_read(&ust_consumerd32_fd);
257 break;
258 default:
259 assert(0);
260 goto end;
261 }
262
263 socket = consumer_find_socket(consumer_fd, consumer);
264 if (!socket) {
265 ERR("Consumer socket fd %d not found in consumer obj %p",
266 consumer_fd, consumer);
267 }
268
269end:
270 return socket;
271}
272
173af62f
DG
273/*
274 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
275 * be acquired before calling this function and across use of the
276 * returned consumer_socket.
277 */
278struct consumer_socket *consumer_find_socket(int key,
279 struct consumer_output *consumer)
280{
281 struct lttng_ht_iter iter;
282 struct lttng_ht_node_ulong *node;
283 struct consumer_socket *socket = NULL;
284
285 /* Negative keys are lookup failures */
a4b92340 286 if (key < 0 || consumer == NULL) {
173af62f
DG
287 return NULL;
288 }
289
290 lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
291 &iter);
292 node = lttng_ht_iter_get_node_ulong(&iter);
293 if (node != NULL) {
294 socket = caa_container_of(node, struct consumer_socket, node);
295 }
296
297 return socket;
298}
299
300/*
301 * Allocate a new consumer_socket and return the pointer.
302 */
4ce514c4 303struct consumer_socket *consumer_allocate_socket(int *fd)
173af62f
DG
304{
305 struct consumer_socket *socket = NULL;
306
4ce514c4
DG
307 assert(fd);
308
173af62f
DG
309 socket = zmalloc(sizeof(struct consumer_socket));
310 if (socket == NULL) {
311 PERROR("zmalloc consumer socket");
312 goto error;
313 }
314
315 socket->fd = fd;
4ce514c4 316 lttng_ht_node_init_ulong(&socket->node, *fd);
173af62f
DG
317
318error:
319 return socket;
320}
321
322/*
323 * Add consumer socket to consumer output object. Read side lock must be
324 * acquired before calling this function.
325 */
326void consumer_add_socket(struct consumer_socket *sock,
327 struct consumer_output *consumer)
328{
329 assert(sock);
330 assert(consumer);
331
332 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
333}
334
335/*
336 * Delte consumer socket to consumer output object. Read side lock must be
337 * acquired before calling this function.
338 */
339void consumer_del_socket(struct consumer_socket *sock,
340 struct consumer_output *consumer)
341{
342 int ret;
343 struct lttng_ht_iter iter;
344
345 assert(sock);
346 assert(consumer);
347
348 iter.iter.node = &sock->node.node;
349 ret = lttng_ht_del(consumer->socks, &iter);
350 assert(!ret);
351}
352
353/*
354 * RCU destroy call function.
355 */
356static void destroy_socket_rcu(struct rcu_head *head)
357{
358 struct lttng_ht_node_ulong *node =
359 caa_container_of(head, struct lttng_ht_node_ulong, head);
360 struct consumer_socket *socket =
361 caa_container_of(node, struct consumer_socket, node);
362
363 free(socket);
364}
365
366/*
367 * Destroy and free socket pointer in a call RCU. Read side lock must be
368 * acquired before calling this function.
369 */
370void consumer_destroy_socket(struct consumer_socket *sock)
371{
372 assert(sock);
373
374 /*
375 * We DO NOT close the file descriptor here since it is global to the
2f77fc4b
DG
376 * session daemon and is closed only if the consumer dies or a custom
377 * consumer was registered,
173af62f 378 */
2f77fc4b 379 if (sock->registered) {
4ce514c4
DG
380 DBG3("Consumer socket was registered. Closing fd %d", *sock->fd);
381 lttcomm_close_unix_sock(*sock->fd);
2f77fc4b 382 }
173af62f
DG
383
384 call_rcu(&sock->node.head, destroy_socket_rcu);
385}
386
00e2e675
DG
387/*
388 * Allocate and assign data to a consumer_output object.
389 *
390 * Return pointer to structure.
391 */
392struct consumer_output *consumer_create_output(enum consumer_dst_type type)
393{
394 struct consumer_output *output = NULL;
395
396 output = zmalloc(sizeof(struct consumer_output));
397 if (output == NULL) {
398 PERROR("zmalloc consumer_output");
399 goto error;
400 }
401
402 /* By default, consumer output is enabled */
403 output->enabled = 1;
404 output->type = type;
d88aee68 405 output->net_seq_index = (uint64_t) -1ULL;
173af62f
DG
406
407 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
408
409error:
410 return output;
411}
412
af706bb7
DG
413/*
414 * Iterate over the consumer output socket hash table and destroy them. The
415 * socket file descriptor are only closed if the consumer output was
416 * registered meaning it's an external consumer.
417 */
418void consumer_destroy_output_sockets(struct consumer_output *obj)
419{
420 struct lttng_ht_iter iter;
421 struct consumer_socket *socket;
422
423 if (!obj->socks) {
424 return;
425 }
426
427 rcu_read_lock();
428 cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
429 consumer_del_socket(socket, obj);
430 consumer_destroy_socket(socket);
431 }
432 rcu_read_unlock();
433}
434
00e2e675
DG
435/*
436 * Delete the consumer_output object from the list and free the ptr.
36b588ed
MD
437 *
438 * Should *NOT* be called with RCU read-side lock held.
00e2e675
DG
439 */
440void consumer_destroy_output(struct consumer_output *obj)
441{
442 if (obj == NULL) {
443 return;
444 }
445
af706bb7 446 consumer_destroy_output_sockets(obj);
2f77fc4b 447
af706bb7 448 if (obj->socks) {
2f77fc4b 449 /* Finally destroy HT */
0b2dc8df 450 ht_cleanup_push(obj->socks);
00e2e675 451 }
173af62f 452
00e2e675
DG
453 free(obj);
454}
455
456/*
457 * Copy consumer output and returned the newly allocated copy.
36b588ed
MD
458 *
459 * Should *NOT* be called with RCU read-side lock held.
00e2e675
DG
460 */
461struct consumer_output *consumer_copy_output(struct consumer_output *obj)
462{
6dc3064a 463 int ret;
09a90bcd 464 struct lttng_ht *tmp_ht_ptr;
00e2e675
DG
465 struct consumer_output *output;
466
467 assert(obj);
468
469 output = consumer_create_output(obj->type);
470 if (output == NULL) {
471 goto error;
472 }
09a90bcd
DG
473 /* Avoid losing the HT reference after the memcpy() */
474 tmp_ht_ptr = output->socks;
00e2e675
DG
475
476 memcpy(output, obj, sizeof(struct consumer_output));
477
09a90bcd
DG
478 /* Putting back the HT pointer and start copying socket(s). */
479 output->socks = tmp_ht_ptr;
173af62f 480
6dc3064a
DG
481 ret = consumer_copy_sockets(output, obj);
482 if (ret < 0) {
483 goto malloc_error;
484 }
485
486error:
487 return output;
488
489malloc_error:
490 consumer_destroy_output(output);
491 return NULL;
492}
493
494/*
495 * Copy consumer sockets from src to dst.
496 *
497 * Return 0 on success or else a negative value.
498 */
499int consumer_copy_sockets(struct consumer_output *dst,
500 struct consumer_output *src)
501{
502 int ret = 0;
503 struct lttng_ht_iter iter;
504 struct consumer_socket *socket, *copy_sock;
505
506 assert(dst);
507 assert(src);
508
b82c5c4d 509 rcu_read_lock();
6dc3064a
DG
510 cds_lfht_for_each_entry(src->socks->ht, &iter.iter, socket, node.node) {
511 /* Ignore socket that are already there. */
4ce514c4 512 copy_sock = consumer_find_socket(*socket->fd, dst);
6dc3064a
DG
513 if (copy_sock) {
514 continue;
515 }
516
173af62f
DG
517 /* Create new socket object. */
518 copy_sock = consumer_allocate_socket(socket->fd);
519 if (copy_sock == NULL) {
b82c5c4d 520 rcu_read_unlock();
6dc3064a
DG
521 ret = -ENOMEM;
522 goto error;
173af62f
DG
523 }
524
09a90bcd 525 copy_sock->registered = socket->registered;
6dc3064a
DG
526 /*
527 * This is valid because this lock is shared accross all consumer
528 * object being the global lock of the consumer data structure of the
529 * session daemon.
530 */
173af62f 531 copy_sock->lock = socket->lock;
6dc3064a 532 consumer_add_socket(copy_sock, dst);
173af62f 533 }
b82c5c4d 534 rcu_read_unlock();
173af62f 535
00e2e675 536error:
6dc3064a 537 return ret;
00e2e675
DG
538}
539
540/*
541 * Set network URI to the consumer output object.
542 *
ad20f474
DG
543 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
544 * error.
00e2e675
DG
545 */
546int consumer_set_network_uri(struct consumer_output *obj,
547 struct lttng_uri *uri)
548{
549 int ret;
550 char tmp_path[PATH_MAX];
551 char hostname[HOST_NAME_MAX];
552 struct lttng_uri *dst_uri = NULL;
553
554 /* Code flow error safety net. */
555 assert(obj);
556 assert(uri);
557
558 switch (uri->stype) {
559 case LTTNG_STREAM_CONTROL:
560 dst_uri = &obj->dst.net.control;
561 obj->dst.net.control_isset = 1;
562 if (uri->port == 0) {
563 /* Assign default port. */
564 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
a74934ba
DG
565 } else {
566 if (obj->dst.net.data_isset && uri->port ==
567 obj->dst.net.data.port) {
568 ret = -LTTNG_ERR_INVALID;
569 goto error;
570 }
00e2e675 571 }
ad20f474 572 DBG3("Consumer control URI set with port %d", uri->port);
00e2e675
DG
573 break;
574 case LTTNG_STREAM_DATA:
575 dst_uri = &obj->dst.net.data;
576 obj->dst.net.data_isset = 1;
577 if (uri->port == 0) {
578 /* Assign default port. */
579 uri->port = DEFAULT_NETWORK_DATA_PORT;
a74934ba
DG
580 } else {
581 if (obj->dst.net.control_isset && uri->port ==
582 obj->dst.net.control.port) {
583 ret = -LTTNG_ERR_INVALID;
584 goto error;
585 }
00e2e675 586 }
ad20f474 587 DBG3("Consumer data URI set with port %d", uri->port);
00e2e675
DG
588 break;
589 default:
590 ERR("Set network uri type unknown %d", uri->stype);
a74934ba 591 ret = -LTTNG_ERR_INVALID;
00e2e675
DG
592 goto error;
593 }
594
595 ret = uri_compare(dst_uri, uri);
596 if (!ret) {
597 /* Same URI, don't touch it and return success. */
598 DBG3("URI network compare are the same");
ad20f474 599 goto equal;
00e2e675
DG
600 }
601
602 /* URIs were not equal, replacing it. */
603 memset(dst_uri, 0, sizeof(struct lttng_uri));
604 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
605 obj->type = CONSUMER_DST_NET;
606
607 /* Handle subdir and add hostname in front. */
608 if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
609 /* Get hostname to append it in the pathname */
610 ret = gethostname(hostname, sizeof(hostname));
611 if (ret < 0) {
612 PERROR("gethostname. Fallback on default localhost");
613 strncpy(hostname, "localhost", sizeof(hostname));
614 }
615 hostname[sizeof(hostname) - 1] = '\0';
616
617 /* Setup consumer subdir if none present in the control URI */
618 if (strlen(dst_uri->subdir) == 0) {
619 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
620 hostname, obj->subdir);
621 } else {
622 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
623 hostname, dst_uri->subdir);
624 }
625 if (ret < 0) {
626 PERROR("snprintf set consumer uri subdir");
a74934ba 627 ret = -LTTNG_ERR_NOMEM;
00e2e675
DG
628 goto error;
629 }
630
631 strncpy(obj->subdir, tmp_path, sizeof(obj->subdir));
632 DBG3("Consumer set network uri subdir path %s", tmp_path);
633 }
634
00e2e675 635 return 0;
ad20f474
DG
636equal:
637 return 1;
00e2e675 638error:
a74934ba 639 return ret;
00e2e675
DG
640}
641
642/*
643 * Send file descriptor to consumer via sock.
644 */
f50f23d9 645int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd)
00e2e675
DG
646{
647 int ret;
648
649 assert(fds);
f50f23d9 650 assert(sock);
4ce514c4 651 assert(sock->fd);
00e2e675
DG
652 assert(nb_fd > 0);
653
4ce514c4 654 ret = lttcomm_send_fds_unix_sock(*sock->fd, fds, nb_fd);
00e2e675 655 if (ret < 0) {
3448e266 656 /* The above call will print a PERROR on error. */
4ce514c4 657 DBG("Error when sending consumer fds on sock %d", *sock->fd);
00e2e675
DG
658 goto error;
659 }
660
f50f23d9
DG
661 ret = consumer_recv_status_reply(sock);
662
00e2e675
DG
663error:
664 return ret;
665}
666
ffe60014
DG
667/*
668 * Consumer send communication message structure to consumer.
669 */
670int consumer_send_msg(struct consumer_socket *sock,
671 struct lttcomm_consumer_msg *msg)
672{
673 int ret;
674
675 assert(msg);
676 assert(sock);
4ce514c4 677 assert(sock->fd);
ffe60014 678
4ce514c4 679 ret = lttcomm_send_unix_sock(*sock->fd, msg,
ffe60014
DG
680 sizeof(struct lttcomm_consumer_msg));
681 if (ret < 0) {
682 /* The above call will print a PERROR on error. */
4ce514c4 683 DBG("Error when sending consumer channel on sock %d", *sock->fd);
ffe60014
DG
684 goto error;
685 }
686
687 ret = consumer_recv_status_reply(sock);
688
689error:
690 return ret;
691}
692
00e2e675
DG
693/*
694 * Consumer send channel communication message structure to consumer.
695 */
f50f23d9
DG
696int consumer_send_channel(struct consumer_socket *sock,
697 struct lttcomm_consumer_msg *msg)
00e2e675
DG
698{
699 int ret;
700
701 assert(msg);
f50f23d9 702 assert(sock);
4ce514c4 703 assert(sock->fd);
00e2e675 704
4ce514c4 705 ret = lttcomm_send_unix_sock(*sock->fd, msg,
00e2e675
DG
706 sizeof(struct lttcomm_consumer_msg));
707 if (ret < 0) {
3448e266 708 /* The above call will print a PERROR on error. */
4ce514c4 709 DBG("Error when sending consumer channel on sock %d", *sock->fd);
00e2e675
DG
710 goto error;
711 }
712
f50f23d9
DG
713 ret = consumer_recv_status_reply(sock);
714
00e2e675
DG
715error:
716 return ret;
717}
718
ffe60014
DG
719/*
720 * Populate the given consumer msg structure with the ask_channel command
721 * information.
722 */
723void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
724 uint64_t subbuf_size,
725 uint64_t num_subbuf,
726 int overwrite,
727 unsigned int switch_timer_interval,
728 unsigned int read_timer_interval,
729 int output,
730 int type,
731 uint64_t session_id,
732 const char *pathname,
733 const char *name,
734 uid_t uid,
735 gid_t gid,
d88aee68
DG
736 uint64_t relayd_id,
737 uint64_t key,
7972aab2 738 unsigned char *uuid,
1624d5b7
JD
739 uint32_t chan_id,
740 uint64_t tracefile_size,
2bba9e53 741 uint64_t tracefile_count,
1950109e 742 uint64_t session_id_per_pid,
567eb353
DG
743 unsigned int monitor,
744 uint32_t ust_app_uid)
ffe60014
DG
745{
746 assert(msg);
747
748 /* Zeroed structure */
749 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
750
751 msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION;
752 msg->u.ask_channel.subbuf_size = subbuf_size;
753 msg->u.ask_channel.num_subbuf = num_subbuf ;
754 msg->u.ask_channel.overwrite = overwrite;
755 msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
756 msg->u.ask_channel.read_timer_interval = read_timer_interval;
757 msg->u.ask_channel.output = output;
758 msg->u.ask_channel.type = type;
759 msg->u.ask_channel.session_id = session_id;
1950109e 760 msg->u.ask_channel.session_id_per_pid = session_id_per_pid;
ffe60014
DG
761 msg->u.ask_channel.uid = uid;
762 msg->u.ask_channel.gid = gid;
763 msg->u.ask_channel.relayd_id = relayd_id;
764 msg->u.ask_channel.key = key;
7972aab2 765 msg->u.ask_channel.chan_id = chan_id;
1624d5b7
JD
766 msg->u.ask_channel.tracefile_size = tracefile_size;
767 msg->u.ask_channel.tracefile_count = tracefile_count;
2bba9e53 768 msg->u.ask_channel.monitor = monitor;
567eb353 769 msg->u.ask_channel.ust_app_uid = ust_app_uid;
ffe60014
DG
770
771 memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
772
10a50311
JD
773 if (pathname) {
774 strncpy(msg->u.ask_channel.pathname, pathname,
775 sizeof(msg->u.ask_channel.pathname));
776 msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0';
777 }
ffe60014
DG
778
779 strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name));
780 msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0';
781}
782
00e2e675
DG
783/*
784 * Init channel communication message structure.
785 */
786void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
787 enum lttng_consumer_command cmd,
d88aee68 788 uint64_t channel_key,
ffe60014
DG
789 uint64_t session_id,
790 const char *pathname,
791 uid_t uid,
792 gid_t gid,
d88aee68 793 uint64_t relayd_id,
c30aaa51 794 const char *name,
ffe60014
DG
795 unsigned int nb_init_streams,
796 enum lttng_event_output output,
1624d5b7
JD
797 int type,
798 uint64_t tracefile_size,
2bba9e53
DG
799 uint64_t tracefile_count,
800 unsigned int monitor)
00e2e675
DG
801{
802 assert(msg);
803
00e2e675
DG
804 /* Zeroed structure */
805 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
806
807 /* Send channel */
808 msg->cmd_type = cmd;
809 msg->u.channel.channel_key = channel_key;
ffe60014
DG
810 msg->u.channel.session_id = session_id;
811 msg->u.channel.uid = uid;
812 msg->u.channel.gid = gid;
813 msg->u.channel.relayd_id = relayd_id;
c30aaa51 814 msg->u.channel.nb_init_streams = nb_init_streams;
ffe60014
DG
815 msg->u.channel.output = output;
816 msg->u.channel.type = type;
1624d5b7
JD
817 msg->u.channel.tracefile_size = tracefile_size;
818 msg->u.channel.tracefile_count = tracefile_count;
2bba9e53 819 msg->u.channel.monitor = monitor;
ffe60014
DG
820
821 strncpy(msg->u.channel.pathname, pathname,
822 sizeof(msg->u.channel.pathname));
823 msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0';
824
825 strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name));
826 msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0';
00e2e675
DG
827}
828
829/*
830 * Init stream communication message structure.
831 */
832void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
833 enum lttng_consumer_command cmd,
d88aee68
DG
834 uint64_t channel_key,
835 uint64_t stream_key,
ffe60014 836 int cpu)
00e2e675
DG
837{
838 assert(msg);
839
840 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
841
00e2e675
DG
842 msg->cmd_type = cmd;
843 msg->u.stream.channel_key = channel_key;
844 msg->u.stream.stream_key = stream_key;
ffe60014 845 msg->u.stream.cpu = cpu;
00e2e675
DG
846}
847
848/*
849 * Send stream communication structure to the consumer.
850 */
f50f23d9
DG
851int consumer_send_stream(struct consumer_socket *sock,
852 struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
853 int *fds, size_t nb_fd)
00e2e675
DG
854{
855 int ret;
856
857 assert(msg);
858 assert(dst);
f50f23d9 859 assert(sock);
4ce514c4 860 assert(sock->fd);
ffe60014 861 assert(fds);
00e2e675
DG
862
863 /* Send on socket */
4ce514c4 864 ret = lttcomm_send_unix_sock(*sock->fd, msg,
00e2e675
DG
865 sizeof(struct lttcomm_consumer_msg));
866 if (ret < 0) {
3448e266 867 /* The above call will print a PERROR on error. */
4ce514c4 868 DBG("Error when sending consumer stream on sock %d", *sock->fd);
00e2e675
DG
869 goto error;
870 }
871
f50f23d9
DG
872 ret = consumer_recv_status_reply(sock);
873 if (ret < 0) {
874 goto error;
875 }
876
00e2e675
DG
877 ret = consumer_send_fds(sock, fds, nb_fd);
878 if (ret < 0) {
879 goto error;
880 }
881
882error:
883 return ret;
884}
37278a1e
DG
885
886/*
887 * Send relayd socket to consumer associated with a session name.
888 *
889 * On success return positive value. On error, negative value.
890 */
f50f23d9 891int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
6151a90f 892 struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
d88aee68 893 enum lttng_stream_type type, uint64_t session_id)
37278a1e
DG
894{
895 int ret;
896 struct lttcomm_consumer_msg msg;
897
898 /* Code flow error. Safety net. */
6151a90f 899 assert(rsock);
37278a1e 900 assert(consumer);
f50f23d9 901 assert(consumer_sock);
4ce514c4 902 assert(consumer_sock->fd);
37278a1e
DG
903
904 /* Bail out if consumer is disabled */
905 if (!consumer->enabled) {
f73fabfd 906 ret = LTTNG_OK;
37278a1e
DG
907 goto error;
908 }
909
910 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
911 /*
912 * Assign network consumer output index using the temporary consumer since
913 * this call should only be made from within a set_consumer_uri() function
914 * call in the session daemon.
915 */
916 msg.u.relayd_sock.net_index = consumer->net_seq_index;
917 msg.u.relayd_sock.type = type;
46e6455f 918 msg.u.relayd_sock.session_id = session_id;
6151a90f 919 memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock));
37278a1e 920
4ce514c4
DG
921 DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd);
922 ret = lttcomm_send_unix_sock(*consumer_sock->fd, &msg, sizeof(msg));
37278a1e 923 if (ret < 0) {
3448e266 924 /* The above call will print a PERROR on error. */
6151a90f 925 DBG("Error when sending relayd sockets on sock %d", rsock->sock.fd);
37278a1e
DG
926 goto error;
927 }
928
f50f23d9
DG
929 ret = consumer_recv_status_reply(consumer_sock);
930 if (ret < 0) {
931 goto error;
932 }
933
37278a1e 934 DBG3("Sending relayd socket file descriptor to consumer");
6151a90f 935 ret = consumer_send_fds(consumer_sock, &rsock->sock.fd, 1);
37278a1e
DG
936 if (ret < 0) {
937 goto error;
938 }
939
940 DBG2("Consumer relayd socket sent");
941
942error:
943 return ret;
944}
173af62f
DG
945
946/*
2f77fc4b
DG
947 * Set consumer subdirectory using the session name and a generated datetime if
948 * needed. This is appended to the current subdirectory.
173af62f 949 */
2f77fc4b
DG
950int consumer_set_subdir(struct consumer_output *consumer,
951 const char *session_name)
173af62f 952{
2f77fc4b
DG
953 int ret = 0;
954 unsigned int have_default_name = 0;
955 char datetime[16], tmp_path[PATH_MAX];
956 time_t rawtime;
957 struct tm *timeinfo;
173af62f
DG
958
959 assert(consumer);
2f77fc4b
DG
960 assert(session_name);
961
962 memset(tmp_path, 0, sizeof(tmp_path));
963
964 /* Flag if we have a default session. */
965 if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
966 strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
967 have_default_name = 1;
968 } else {
969 /* Get date and time for session path */
970 time(&rawtime);
971 timeinfo = localtime(&rawtime);
972 strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
173af62f
DG
973 }
974
2f77fc4b
DG
975 if (have_default_name) {
976 ret = snprintf(tmp_path, sizeof(tmp_path),
977 "%s/%s", consumer->subdir, session_name);
978 } else {
979 ret = snprintf(tmp_path, sizeof(tmp_path),
980 "%s/%s-%s/", consumer->subdir, session_name, datetime);
981 }
173af62f 982 if (ret < 0) {
2f77fc4b 983 PERROR("snprintf session name date");
173af62f
DG
984 goto error;
985 }
986
2f77fc4b
DG
987 strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
988 DBG2("Consumer subdir set to %s", consumer->subdir);
173af62f
DG
989
990error:
991 return ret;
992}
806e2684
DG
993
994/*
6d805429 995 * Ask the consumer if the data is ready to read (NOT pending) for the specific
806e2684
DG
996 * session id.
997 *
998 * This function has a different behavior with the consumer i.e. that it waits
6d805429 999 * for a reply from the consumer if yes or no the data is pending.
806e2684 1000 */
d88aee68 1001int consumer_is_data_pending(uint64_t session_id,
806e2684
DG
1002 struct consumer_output *consumer)
1003{
1004 int ret;
6d805429 1005 int32_t ret_code = 0; /* Default is that the data is NOT pending */
806e2684
DG
1006 struct consumer_socket *socket;
1007 struct lttng_ht_iter iter;
1008 struct lttcomm_consumer_msg msg;
1009
1010 assert(consumer);
1011
6d805429 1012 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
806e2684 1013
d88aee68 1014 msg.u.data_pending.session_id = session_id;
806e2684 1015
d88aee68 1016 DBG3("Consumer data pending for id %" PRIu64, session_id);
806e2684 1017
c8f59ee5 1018 /* Send command for each consumer */
b82c5c4d 1019 rcu_read_lock();
806e2684
DG
1020 cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
1021 node.node) {
1022 /* Code flow error */
4ce514c4 1023 assert(socket->fd);
806e2684
DG
1024
1025 pthread_mutex_lock(socket->lock);
1026
4ce514c4 1027 ret = lttcomm_send_unix_sock(*socket->fd, &msg, sizeof(msg));
806e2684 1028 if (ret < 0) {
3448e266 1029 /* The above call will print a PERROR on error. */
4ce514c4 1030 DBG("Error on consumer is data pending on sock %d", *socket->fd);
806e2684 1031 pthread_mutex_unlock(socket->lock);
b82c5c4d 1032 goto error_unlock;
806e2684
DG
1033 }
1034
f50f23d9
DG
1035 /*
1036 * No need for a recv reply status because the answer to the command is
1037 * the reply status message.
1038 */
1039
4ce514c4 1040 ret = lttcomm_recv_unix_sock(*socket->fd, &ret_code, sizeof(ret_code));
a6cd2b97
DG
1041 if (ret <= 0) {
1042 if (ret == 0) {
1043 /* Orderly shutdown. Don't return 0 which means success. */
1044 ret = -1;
1045 }
3448e266 1046 /* The above call will print a PERROR on error. */
4ce514c4 1047 DBG("Error on recv consumer is data pending on sock %d", *socket->fd);
806e2684 1048 pthread_mutex_unlock(socket->lock);
b82c5c4d 1049 goto error_unlock;
806e2684
DG
1050 }
1051
1052 pthread_mutex_unlock(socket->lock);
1053
6d805429 1054 if (ret_code == 1) {
806e2684
DG
1055 break;
1056 }
1057 }
b82c5c4d 1058 rcu_read_unlock();
806e2684 1059
d88aee68
DG
1060 DBG("Consumer data is %s pending for session id %" PRIu64,
1061 ret_code == 1 ? "" : "NOT", session_id);
806e2684
DG
1062 return ret_code;
1063
b82c5c4d
DG
1064error_unlock:
1065 rcu_read_unlock();
806e2684
DG
1066 return -1;
1067}
7972aab2
DG
1068
1069/*
1070 * Send a flush command to consumer using the given channel key.
1071 *
1072 * Return 0 on success else a negative value.
1073 */
1074int consumer_flush_channel(struct consumer_socket *socket, uint64_t key)
1075{
1076 int ret;
1077 struct lttcomm_consumer_msg msg;
1078
1079 assert(socket);
4ce514c4 1080 assert(socket->fd);
7972aab2
DG
1081
1082 DBG2("Consumer flush channel key %" PRIu64, key);
1083
1084 msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL;
1085 msg.u.flush_channel.key = key;
1086
1087 pthread_mutex_lock(socket->lock);
1088 health_code_update();
1089
1090 ret = consumer_send_msg(socket, &msg);
1091 if (ret < 0) {
1092 goto end;
1093 }
1094
1095end:
1096 health_code_update();
1097 pthread_mutex_unlock(socket->lock);
1098 return ret;
1099}
1100
1101/*
1102 * Send a close metdata command to consumer using the given channel key.
1103 *
1104 * Return 0 on success else a negative value.
1105 */
1106int consumer_close_metadata(struct consumer_socket *socket,
1107 uint64_t metadata_key)
1108{
1109 int ret;
1110 struct lttcomm_consumer_msg msg;
1111
1112 assert(socket);
4ce514c4 1113 assert(socket->fd);
7972aab2
DG
1114
1115 DBG2("Consumer close metadata channel key %" PRIu64, metadata_key);
1116
1117 msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
1118 msg.u.close_metadata.key = metadata_key;
1119
1120 pthread_mutex_lock(socket->lock);
1121 health_code_update();
1122
1123 ret = consumer_send_msg(socket, &msg);
1124 if (ret < 0) {
1125 goto end;
1126 }
1127
1128end:
1129 health_code_update();
1130 pthread_mutex_unlock(socket->lock);
1131 return ret;
1132}
1133
1134/*
1135 * Send a setup metdata command to consumer using the given channel key.
1136 *
1137 * Return 0 on success else a negative value.
1138 */
1139int consumer_setup_metadata(struct consumer_socket *socket,
1140 uint64_t metadata_key)
1141{
1142 int ret;
1143 struct lttcomm_consumer_msg msg;
1144
1145 assert(socket);
4ce514c4 1146 assert(socket->fd);
7972aab2
DG
1147
1148 DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key);
1149
1150 msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
1151 msg.u.setup_metadata.key = metadata_key;
1152
1153 pthread_mutex_lock(socket->lock);
1154 health_code_update();
1155
1156 ret = consumer_send_msg(socket, &msg);
1157 if (ret < 0) {
1158 goto end;
1159 }
1160
1161end:
1162 health_code_update();
1163 pthread_mutex_unlock(socket->lock);
1164 return ret;
1165}
1166
1167/*
331744e3 1168 * Send metadata string to consumer. Socket lock MUST be acquired.
7972aab2
DG
1169 *
1170 * Return 0 on success else a negative value.
1171 */
1172int consumer_push_metadata(struct consumer_socket *socket,
1173 uint64_t metadata_key, char *metadata_str, size_t len,
1174 size_t target_offset)
1175{
1176 int ret;
1177 struct lttcomm_consumer_msg msg;
1178
1179 assert(socket);
4ce514c4 1180 assert(socket->fd);
7972aab2 1181
4ce514c4 1182 DBG2("Consumer push metadata to consumer socket %d", *socket->fd);
7972aab2
DG
1183
1184 msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
1185 msg.u.push_metadata.key = metadata_key;
1186 msg.u.push_metadata.target_offset = target_offset;
1187 msg.u.push_metadata.len = len;
1188
7972aab2
DG
1189 health_code_update();
1190 ret = consumer_send_msg(socket, &msg);
331744e3 1191 if (ret < 0 || len == 0) {
7972aab2
DG
1192 goto end;
1193 }
1194
4ce514c4 1195 DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd, len);
7972aab2 1196
4ce514c4 1197 ret = lttcomm_send_unix_sock(*socket->fd, metadata_str, len);
7972aab2
DG
1198 if (ret < 0) {
1199 goto end;
1200 }
1201
1202 health_code_update();
1203 ret = consumer_recv_status_reply(socket);
1204 if (ret < 0) {
1205 goto end;
1206 }
1207
1208end:
1209 health_code_update();
7972aab2
DG
1210 return ret;
1211}
6dc3064a
DG
1212
1213/*
1214 * Ask the consumer to snapshot a specific channel using the key.
1215 *
1216 * Return 0 on success or else a negative error.
1217 */
1218int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
1219 struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
5c786ded 1220 const char *session_path, int wait, int max_stream_size)
6dc3064a
DG
1221{
1222 int ret;
1223 struct lttcomm_consumer_msg msg;
1224
1225 assert(socket);
4ce514c4 1226 assert(socket->fd);
6dc3064a
DG
1227 assert(output);
1228 assert(output->consumer);
1229
1230 DBG("Consumer snapshot channel key %" PRIu64, key);
1231
ee91bab2 1232 memset(&msg, 0, sizeof(msg));
6dc3064a
DG
1233 msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
1234 msg.u.snapshot_channel.key = key;
5c786ded 1235 msg.u.snapshot_channel.max_stream_size = max_stream_size;
6dc3064a
DG
1236 msg.u.snapshot_channel.metadata = metadata;
1237
1238 if (output->consumer->type == CONSUMER_DST_NET) {
1239 msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index;
1240 msg.u.snapshot_channel.use_relayd = 1;
1241 ret = snprintf(msg.u.snapshot_channel.pathname,
1bfe7328
DG
1242 sizeof(msg.u.snapshot_channel.pathname),
1243 "%s/%s-%s-%" PRIu64 "%s", output->consumer->subdir,
1244 output->name, output->datetime, output->nb_snapshot,
ee91bab2 1245 session_path);
6dc3064a
DG
1246 if (ret < 0) {
1247 ret = -LTTNG_ERR_NOMEM;
1248 goto error;
1249 }
1250 } else {
1251 ret = snprintf(msg.u.snapshot_channel.pathname,
1bfe7328
DG
1252 sizeof(msg.u.snapshot_channel.pathname),
1253 "%s/%s-%s-%" PRIu64 "%s", output->consumer->dst.trace_path,
1254 output->name, output->datetime, output->nb_snapshot,
1255 session_path);
6dc3064a
DG
1256 if (ret < 0) {
1257 ret = -LTTNG_ERR_NOMEM;
1258 goto error;
1259 }
07b86b52 1260 msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL;
6dc3064a
DG
1261
1262 /* Create directory. Ignore if exist. */
1263 ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname,
1264 S_IRWXU | S_IRWXG, uid, gid);
1265 if (ret < 0) {
1266 if (ret != -EEXIST) {
1267 ERR("Trace directory creation error");
1268 goto error;
1269 }
1270 }
1271 }
1272
1273 health_code_update();
1274 ret = consumer_send_msg(socket, &msg);
1275 if (ret < 0) {
1276 goto error;
1277 }
1278
1279error:
1280 health_code_update();
1281 return ret;
1282}
This page took 0.08838 seconds and 4 git commands to generate.