Clean-up: replace uses of `int enabled` with boolean flags
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.cpp
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 *
5 * SPDX-License-Identifier: GPL-2.0-only
6 *
7 */
8
9 #define _LGPL_SOURCE
10 #include "consumer.hpp"
11 #include "health-sessiond.hpp"
12 #include "lttng-sessiond.hpp"
13 #include "ust-app.hpp"
14 #include "utils.hpp"
15
16 #include <common/common.hpp>
17 #include <common/defaults.hpp>
18 #include <common/relayd/relayd.hpp>
19 #include <common/string-utils/format.hpp>
20 #include <common/uri.hpp>
21
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/stat.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29
30 /*
31 * Return allocated full pathname of the session using the consumer trace path
32 * and subdir if available.
33 *
34 * The caller can safely free(3) the returned value. On error, NULL is
35 * returned.
36 */
37 char *setup_channel_trace_path(struct consumer_output *consumer,
38 const char *session_path,
39 size_t *consumer_path_offset)
40 {
41 int ret;
42 char *pathname;
43
44 LTTNG_ASSERT(consumer);
45 LTTNG_ASSERT(session_path);
46
47 health_code_update();
48
49 /*
50 * Allocate the string ourself to make sure we never exceed
51 * LTTNG_PATH_MAX.
52 */
53 pathname = calloc<char>(LTTNG_PATH_MAX);
54 if (!pathname) {
55 goto error;
56 }
57
58 /* Get correct path name destination */
59 if (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 &&
60 consumer->relay_minor_version < 11) {
61 ret = snprintf(pathname,
62 LTTNG_PATH_MAX,
63 "%s%s/%s/%s",
64 consumer->dst.net.base_dir,
65 consumer->chunk_path,
66 consumer->domain_subdir,
67 session_path);
68 *consumer_path_offset = 0;
69 } else {
70 ret = snprintf(
71 pathname, LTTNG_PATH_MAX, "%s/%s", consumer->domain_subdir, session_path);
72 *consumer_path_offset = strlen(consumer->domain_subdir) + 1;
73 }
74 DBG3("Consumer trace path relative to current trace chunk: \"%s\"", pathname);
75 if (ret < 0) {
76 PERROR("Failed to format channel path");
77 goto error;
78 } else if (ret >= LTTNG_PATH_MAX) {
79 ERR("Truncation occurred while formatting channel path");
80 goto error;
81 }
82
83 return pathname;
84 error:
85 free(pathname);
86 return nullptr;
87 }
88
89 /*
90 * Send a data payload using a given consumer socket of size len.
91 *
92 * The consumer socket lock MUST be acquired before calling this since this
93 * function can change the fd value.
94 *
95 * Return 0 on success else a negative value on error.
96 */
97 int consumer_socket_send(struct consumer_socket *socket, const void *msg, size_t len)
98 {
99 int fd;
100 ssize_t size;
101
102 LTTNG_ASSERT(socket);
103 LTTNG_ASSERT(socket->fd_ptr);
104 LTTNG_ASSERT(msg);
105
106 /* Consumer socket is invalid. Stopping. */
107 fd = *socket->fd_ptr;
108 if (fd < 0) {
109 goto error;
110 }
111
112 size = lttcomm_send_unix_sock(fd, msg, len);
113 if (size < 0) {
114 /* The above call will print a PERROR on error. */
115 DBG("Error when sending data to consumer on sock %d", fd);
116 /*
117 * At this point, the socket is not usable anymore thus closing it and
118 * setting the file descriptor to -1 so it is not reused.
119 */
120
121 /* This call will PERROR on error. */
122 (void) lttcomm_close_unix_sock(fd);
123 *socket->fd_ptr = -1;
124 goto error;
125 }
126
127 return 0;
128
129 error:
130 return -1;
131 }
132
133 /*
134 * Receive a data payload using a given consumer socket of size len.
135 *
136 * The consumer socket lock MUST be acquired before calling this since this
137 * function can change the fd value.
138 *
139 * Return 0 on success else a negative value on error.
140 */
141 int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len)
142 {
143 int fd;
144 ssize_t size;
145
146 LTTNG_ASSERT(socket);
147 LTTNG_ASSERT(socket->fd_ptr);
148 LTTNG_ASSERT(msg);
149
150 /* Consumer socket is invalid. Stopping. */
151 fd = *socket->fd_ptr;
152 if (fd < 0) {
153 goto error;
154 }
155
156 size = lttcomm_recv_unix_sock(fd, msg, len);
157 if (size <= 0) {
158 /* The above call will print a PERROR on error. */
159 DBG("Error when receiving data from the consumer socket %d", fd);
160 /*
161 * At this point, the socket is not usable anymore thus closing it and
162 * setting the file descriptor to -1 so it is not reused.
163 */
164
165 /* This call will PERROR on error. */
166 (void) lttcomm_close_unix_sock(fd);
167 *socket->fd_ptr = -1;
168 goto error;
169 }
170
171 return 0;
172
173 error:
174 return -1;
175 }
176
177 /*
178 * Receive a reply command status message from the consumer. Consumer socket
179 * lock MUST be acquired before calling this function.
180 *
181 * Return 0 on success, -1 on recv error or a negative lttng error code which
182 * was possibly returned by the consumer.
183 */
184 int consumer_recv_status_reply(struct consumer_socket *sock)
185 {
186 int ret;
187 struct lttcomm_consumer_status_msg reply;
188
189 LTTNG_ASSERT(sock);
190
191 ret = consumer_socket_recv(sock, &reply, sizeof(reply));
192 if (ret < 0) {
193 goto end;
194 }
195
196 if (reply.ret_code == LTTCOMM_CONSUMERD_SUCCESS) {
197 /* All good. */
198 ret = 0;
199 } else {
200 ret = -reply.ret_code;
201 DBG("Consumer ret code %d", ret);
202 }
203
204 end:
205 return ret;
206 }
207
208 /*
209 * Once the ASK_CHANNEL command is sent to the consumer, the channel
210 * information are sent back. This call receives that data and populates key
211 * and stream_count.
212 *
213 * On success return 0 and both key and stream_count are set. On error, a
214 * negative value is sent back and both parameters are untouched.
215 */
216 int consumer_recv_status_channel(struct consumer_socket *sock,
217 uint64_t *key,
218 unsigned int *stream_count)
219 {
220 int ret;
221 struct lttcomm_consumer_status_channel reply;
222
223 LTTNG_ASSERT(sock);
224 LTTNG_ASSERT(stream_count);
225 LTTNG_ASSERT(key);
226
227 ret = consumer_socket_recv(sock, &reply, sizeof(reply));
228 if (ret < 0) {
229 goto end;
230 }
231
232 /* An error is possible so don't touch the key and stream_count. */
233 if (reply.ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
234 ret = -1;
235 goto end;
236 }
237
238 *key = reply.key;
239 *stream_count = reply.stream_count;
240 ret = 0;
241
242 end:
243 return ret;
244 }
245
246 /*
247 * Send destroy relayd command to consumer.
248 *
249 * On success return positive value. On error, negative value.
250 */
251 int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer)
252 {
253 int ret;
254 struct lttcomm_consumer_msg msg;
255
256 LTTNG_ASSERT(consumer);
257 LTTNG_ASSERT(sock);
258
259 DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd_ptr);
260
261 memset(&msg, 0, sizeof(msg));
262 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
263 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
264
265 pthread_mutex_lock(sock->lock);
266 ret = consumer_socket_send(sock, &msg, sizeof(msg));
267 if (ret < 0) {
268 goto error;
269 }
270
271 /* Don't check the return value. The caller will do it. */
272 ret = consumer_recv_status_reply(sock);
273
274 DBG2("Consumer send destroy relayd command done");
275
276 error:
277 pthread_mutex_unlock(sock->lock);
278 return ret;
279 }
280
281 /*
282 * For each consumer socket in the consumer output object, send a destroy
283 * relayd command.
284 */
285 void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
286 {
287 struct lttng_ht_iter iter;
288 struct consumer_socket *socket;
289
290 LTTNG_ASSERT(consumer);
291
292 /* Destroy any relayd connection */
293 if (consumer->type == CONSUMER_DST_NET) {
294 rcu_read_lock();
295 cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
296 int ret;
297
298 /* Send destroy relayd command */
299 ret = consumer_send_destroy_relayd(socket, consumer);
300 if (ret < 0) {
301 DBG("Unable to send destroy relayd command to consumer");
302 /* Continue since we MUST delete everything at this point. */
303 }
304 }
305 rcu_read_unlock();
306 }
307 }
308
309 /*
310 * From a consumer_data structure, allocate and add a consumer socket to the
311 * consumer output.
312 *
313 * Return 0 on success, else negative value on error
314 */
315 int consumer_create_socket(struct consumer_data *data, struct consumer_output *output)
316 {
317 int ret = 0;
318 struct consumer_socket *socket;
319
320 LTTNG_ASSERT(data);
321
322 if (output == nullptr || data->cmd_sock < 0) {
323 /*
324 * Not an error. Possible there is simply not spawned consumer or it's
325 * disabled for the tracing session asking the socket.
326 */
327 goto error;
328 }
329
330 rcu_read_lock();
331 socket = consumer_find_socket(data->cmd_sock, output);
332 rcu_read_unlock();
333 if (socket == nullptr) {
334 socket = consumer_allocate_socket(&data->cmd_sock);
335 if (socket == nullptr) {
336 ret = -1;
337 goto error;
338 }
339
340 socket->registered = 0;
341 socket->lock = &data->lock;
342 rcu_read_lock();
343 consumer_add_socket(socket, output);
344 rcu_read_unlock();
345 }
346
347 socket->type = data->type;
348
349 DBG3("Consumer socket created (fd: %d) and added to output", data->cmd_sock);
350
351 error:
352 return ret;
353 }
354
355 /*
356 * Return the consumer socket from the given consumer output with the right
357 * bitness. On error, returns NULL.
358 *
359 * The caller MUST acquire a rcu read side lock and keep it until the socket
360 * object reference is not needed anymore.
361 */
362 struct consumer_socket *consumer_find_socket_by_bitness(int bits,
363 const struct consumer_output *consumer)
364 {
365 int consumer_fd;
366 struct consumer_socket *socket = nullptr;
367
368 ASSERT_RCU_READ_LOCKED();
369
370 switch (bits) {
371 case 64:
372 consumer_fd = uatomic_read(&the_ust_consumerd64_fd);
373 break;
374 case 32:
375 consumer_fd = uatomic_read(&the_ust_consumerd32_fd);
376 break;
377 default:
378 abort();
379 goto end;
380 }
381
382 socket = consumer_find_socket(consumer_fd, consumer);
383 if (!socket) {
384 ERR("Consumer socket fd %d not found in consumer obj %p", consumer_fd, consumer);
385 }
386
387 end:
388 return socket;
389 }
390
391 /*
392 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
393 * be acquired before calling this function and across use of the
394 * returned consumer_socket.
395 */
396 struct consumer_socket *consumer_find_socket(int key, const struct consumer_output *consumer)
397 {
398 struct lttng_ht_iter iter;
399 struct lttng_ht_node_ulong *node;
400 struct consumer_socket *socket = nullptr;
401
402 ASSERT_RCU_READ_LOCKED();
403
404 /* Negative keys are lookup failures */
405 if (key < 0 || consumer == nullptr) {
406 return nullptr;
407 }
408
409 lttng_ht_lookup(consumer->socks, (void *) ((unsigned long) key), &iter);
410 node = lttng_ht_iter_get_node_ulong(&iter);
411 if (node != nullptr) {
412 socket = lttng::utils::container_of(node, &consumer_socket::node);
413 }
414
415 return socket;
416 }
417
418 /*
419 * Allocate a new consumer_socket and return the pointer.
420 */
421 struct consumer_socket *consumer_allocate_socket(int *fd)
422 {
423 struct consumer_socket *socket = nullptr;
424
425 LTTNG_ASSERT(fd);
426
427 socket = zmalloc<consumer_socket>();
428 if (socket == nullptr) {
429 PERROR("zmalloc consumer socket");
430 goto error;
431 }
432
433 socket->fd_ptr = fd;
434 lttng_ht_node_init_ulong(&socket->node, *fd);
435
436 error:
437 return socket;
438 }
439
440 /*
441 * Add consumer socket to consumer output object. Read side lock must be
442 * acquired before calling this function.
443 */
444 void consumer_add_socket(struct consumer_socket *sock, struct consumer_output *consumer)
445 {
446 LTTNG_ASSERT(sock);
447 LTTNG_ASSERT(consumer);
448 ASSERT_RCU_READ_LOCKED();
449
450 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
451 }
452
453 /*
454 * Delete consumer socket to consumer output object. Read side lock must be
455 * acquired before calling this function.
456 */
457 void consumer_del_socket(struct consumer_socket *sock, struct consumer_output *consumer)
458 {
459 int ret;
460 struct lttng_ht_iter iter;
461
462 LTTNG_ASSERT(sock);
463 LTTNG_ASSERT(consumer);
464 ASSERT_RCU_READ_LOCKED();
465
466 iter.iter.node = &sock->node.node;
467 ret = lttng_ht_del(consumer->socks, &iter);
468 LTTNG_ASSERT(!ret);
469 }
470
471 /*
472 * RCU destroy call function.
473 */
474 static void destroy_socket_rcu(struct rcu_head *head)
475 {
476 struct lttng_ht_node_ulong *node =
477 lttng::utils::container_of(head, &lttng_ht_node_ulong::head);
478 struct consumer_socket *socket = lttng::utils::container_of(node, &consumer_socket::node);
479
480 free(socket);
481 }
482
483 /*
484 * Destroy and free socket pointer in a call RCU. The call must either:
485 * - have acquired the read side lock before calling this function, or
486 * - guarantee the validity of the `struct consumer_socket` object for the
487 * duration of the call.
488 */
489 void consumer_destroy_socket(struct consumer_socket *sock)
490 {
491 LTTNG_ASSERT(sock);
492
493 /*
494 * We DO NOT close the file descriptor here since it is global to the
495 * session daemon and is closed only if the consumer dies or a custom
496 * consumer was registered,
497 */
498 if (sock->registered) {
499 DBG3("Consumer socket was registered. Closing fd %d", *sock->fd_ptr);
500 lttcomm_close_unix_sock(*sock->fd_ptr);
501 }
502
503 call_rcu(&sock->node.head, destroy_socket_rcu);
504 }
505
506 /*
507 * Allocate and assign data to a consumer_output object.
508 *
509 * Return pointer to structure.
510 */
511 struct consumer_output *consumer_create_output(enum consumer_dst_type type)
512 {
513 struct consumer_output *output = nullptr;
514
515 output = zmalloc<consumer_output>();
516 if (output == nullptr) {
517 PERROR("zmalloc consumer_output");
518 goto error;
519 }
520
521 /* By default, consumer output is enabled */
522 output->enabled = true;
523 output->type = type;
524 output->net_seq_index = (uint64_t) -1ULL;
525 urcu_ref_init(&output->ref);
526
527 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
528
529 error:
530 return output;
531 }
532
533 /*
534 * Iterate over the consumer output socket hash table and destroy them. The
535 * socket file descriptor are only closed if the consumer output was
536 * registered meaning it's an external consumer.
537 */
538 void consumer_destroy_output_sockets(struct consumer_output *obj)
539 {
540 struct lttng_ht_iter iter;
541 struct consumer_socket *socket;
542
543 if (!obj->socks) {
544 return;
545 }
546
547 rcu_read_lock();
548 cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
549 consumer_del_socket(socket, obj);
550 consumer_destroy_socket(socket);
551 }
552 rcu_read_unlock();
553 }
554
555 /*
556 * Delete the consumer_output object from the list and free the ptr.
557 */
558 static void consumer_release_output(struct urcu_ref *ref)
559 {
560 struct consumer_output *obj = lttng::utils::container_of(ref, &consumer_output::ref);
561
562 consumer_destroy_output_sockets(obj);
563
564 if (obj->socks) {
565 /* Finally destroy HT */
566 lttng_ht_destroy(obj->socks);
567 }
568
569 free(obj);
570 }
571
572 /*
573 * Get the consumer_output object.
574 */
575 void consumer_output_get(struct consumer_output *obj)
576 {
577 urcu_ref_get(&obj->ref);
578 }
579
580 /*
581 * Put the consumer_output object.
582 */
583 void consumer_output_put(struct consumer_output *obj)
584 {
585 if (!obj) {
586 return;
587 }
588 urcu_ref_put(&obj->ref, consumer_release_output);
589 }
590
591 /*
592 * Copy consumer output and returned the newly allocated copy.
593 */
594 struct consumer_output *consumer_copy_output(struct consumer_output *src)
595 {
596 int ret;
597 struct consumer_output *output;
598
599 LTTNG_ASSERT(src);
600
601 output = consumer_create_output(src->type);
602 if (output == nullptr) {
603 goto end;
604 }
605 output->enabled = src->enabled;
606 output->net_seq_index = src->net_seq_index;
607 memcpy(output->domain_subdir, src->domain_subdir, sizeof(output->domain_subdir));
608 output->snapshot = src->snapshot;
609 output->relay_major_version = src->relay_major_version;
610 output->relay_minor_version = src->relay_minor_version;
611 output->relay_allows_clear = src->relay_allows_clear;
612 memcpy(&output->dst, &src->dst, sizeof(output->dst));
613 ret = consumer_copy_sockets(output, src);
614 if (ret < 0) {
615 goto error_put;
616 }
617 end:
618 return output;
619
620 error_put:
621 consumer_output_put(output);
622 return nullptr;
623 }
624
625 /*
626 * Copy consumer sockets from src to dst.
627 *
628 * Return 0 on success or else a negative value.
629 */
630 int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src)
631 {
632 int ret = 0;
633 struct lttng_ht_iter iter;
634 struct consumer_socket *socket, *copy_sock;
635
636 LTTNG_ASSERT(dst);
637 LTTNG_ASSERT(src);
638
639 rcu_read_lock();
640 cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
641 /* Ignore socket that are already there. */
642 copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
643 if (copy_sock) {
644 continue;
645 }
646
647 /* Create new socket object. */
648 copy_sock = consumer_allocate_socket(socket->fd_ptr);
649 if (copy_sock == nullptr) {
650 rcu_read_unlock();
651 ret = -ENOMEM;
652 goto error;
653 }
654
655 copy_sock->registered = socket->registered;
656 /*
657 * This is valid because this lock is shared accross all consumer
658 * object being the global lock of the consumer data structure of the
659 * session daemon.
660 */
661 copy_sock->lock = socket->lock;
662 consumer_add_socket(copy_sock, dst);
663 }
664 rcu_read_unlock();
665
666 error:
667 return ret;
668 }
669
670 /*
671 * Set network URI to the consumer output.
672 *
673 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
674 * error.
675 */
676 int consumer_set_network_uri(const struct ltt_session *session,
677 struct consumer_output *output,
678 struct lttng_uri *uri)
679 {
680 int ret;
681 struct lttng_uri *dst_uri = nullptr;
682
683 /* Code flow error safety net. */
684 LTTNG_ASSERT(output);
685 LTTNG_ASSERT(uri);
686
687 switch (uri->stype) {
688 case LTTNG_STREAM_CONTROL:
689 dst_uri = &output->dst.net.control;
690 output->dst.net.control_isset = 1;
691 if (uri->port == 0) {
692 /* Assign default port. */
693 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
694 } else {
695 if (output->dst.net.data_isset && uri->port == output->dst.net.data.port) {
696 ret = -LTTNG_ERR_INVALID;
697 goto error;
698 }
699 }
700 DBG3("Consumer control URI set with port %d", uri->port);
701 break;
702 case LTTNG_STREAM_DATA:
703 dst_uri = &output->dst.net.data;
704 output->dst.net.data_isset = 1;
705 if (uri->port == 0) {
706 /* Assign default port. */
707 uri->port = DEFAULT_NETWORK_DATA_PORT;
708 } else {
709 if (output->dst.net.control_isset &&
710 uri->port == output->dst.net.control.port) {
711 ret = -LTTNG_ERR_INVALID;
712 goto error;
713 }
714 }
715 DBG3("Consumer data URI set with port %d", uri->port);
716 break;
717 default:
718 ERR("Set network uri type unknown %d", uri->stype);
719 ret = -LTTNG_ERR_INVALID;
720 goto error;
721 }
722
723 ret = uri_compare(dst_uri, uri);
724 if (!ret) {
725 /* Same URI, don't touch it and return success. */
726 DBG3("URI network compare are the same");
727 goto equal;
728 }
729
730 /* URIs were not equal, replacing it. */
731 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
732 output->type = CONSUMER_DST_NET;
733 if (dst_uri->stype != LTTNG_STREAM_CONTROL) {
734 /* Only the control uri needs to contain the path. */
735 goto end;
736 }
737
738 /*
739 * If the user has specified a subdir as part of the control
740 * URL, the session's base output directory is:
741 * /RELAYD_OUTPUT_PATH/HOSTNAME/USER_SPECIFIED_DIR
742 *
743 * Hence, the "base_dir" from which all stream files and
744 * session rotation chunks are created takes the form
745 * /HOSTNAME/USER_SPECIFIED_DIR
746 *
747 * If the user has not specified an output directory as part of
748 * the control URL, the base output directory has the form:
749 * /RELAYD_OUTPUT_PATH/HOSTNAME/SESSION_NAME-CREATION_TIME
750 *
751 * Hence, the "base_dir" from which all stream files and
752 * session rotation chunks are created takes the form
753 * /HOSTNAME/SESSION_NAME-CREATION_TIME
754 *
755 * Note that automatically generated session names already
756 * contain the session's creation time. In that case, the
757 * creation time is omitted to prevent it from being duplicated
758 * in the final directory hierarchy.
759 */
760 if (*uri->subdir) {
761 if (strstr(uri->subdir, "../")) {
762 ERR("Network URI subdirs are not allowed to walk up the path hierarchy");
763 ret = -LTTNG_ERR_INVALID;
764 goto error;
765 }
766 ret = snprintf(output->dst.net.base_dir,
767 sizeof(output->dst.net.base_dir),
768 "/%s/%s/",
769 session->hostname,
770 uri->subdir);
771 } else {
772 if (session->has_auto_generated_name) {
773 ret = snprintf(output->dst.net.base_dir,
774 sizeof(output->dst.net.base_dir),
775 "/%s/%s/",
776 session->hostname,
777 session->name);
778 } else {
779 char session_creation_datetime[16];
780 size_t strftime_ret;
781 struct tm *timeinfo;
782
783 timeinfo = localtime(&session->creation_time);
784 if (!timeinfo) {
785 ret = -LTTNG_ERR_FATAL;
786 goto error;
787 }
788 strftime_ret = strftime(session_creation_datetime,
789 sizeof(session_creation_datetime),
790 "%Y%m%d-%H%M%S",
791 timeinfo);
792 if (strftime_ret == 0) {
793 ERR("Failed to format session creation timestamp while setting network URI");
794 ret = -LTTNG_ERR_FATAL;
795 goto error;
796 }
797 ret = snprintf(output->dst.net.base_dir,
798 sizeof(output->dst.net.base_dir),
799 "/%s/%s-%s/",
800 session->hostname,
801 session->name,
802 session_creation_datetime);
803 }
804 }
805 if (ret >= sizeof(output->dst.net.base_dir)) {
806 ret = -LTTNG_ERR_INVALID;
807 ERR("Truncation occurred while setting network output base directory");
808 goto error;
809 } else if (ret == -1) {
810 ret = -LTTNG_ERR_INVALID;
811 PERROR("Error occurred while setting network output base directory");
812 goto error;
813 }
814
815 DBG3("Consumer set network uri base_dir path %s", output->dst.net.base_dir);
816
817 end:
818 return 0;
819 equal:
820 return 1;
821 error:
822 return ret;
823 }
824
825 /*
826 * Send file descriptor to consumer via sock.
827 *
828 * The consumer socket lock must be held by the caller.
829 */
830 int consumer_send_fds(struct consumer_socket *sock, const int *fds, size_t nb_fd)
831 {
832 int ret;
833
834 LTTNG_ASSERT(fds);
835 LTTNG_ASSERT(sock);
836 LTTNG_ASSERT(nb_fd > 0);
837 LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY);
838
839 ret = lttcomm_send_fds_unix_sock(*sock->fd_ptr, fds, nb_fd);
840 if (ret < 0) {
841 /* The above call will print a PERROR on error. */
842 DBG("Error when sending consumer fds on sock %d", *sock->fd_ptr);
843 goto error;
844 }
845
846 ret = consumer_recv_status_reply(sock);
847 error:
848 return ret;
849 }
850
851 /*
852 * Consumer send communication message structure to consumer.
853 *
854 * The consumer socket lock must be held by the caller.
855 */
856 int consumer_send_msg(struct consumer_socket *sock, const struct lttcomm_consumer_msg *msg)
857 {
858 int ret;
859
860 LTTNG_ASSERT(msg);
861 LTTNG_ASSERT(sock);
862 LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY);
863
864 ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg));
865 if (ret < 0) {
866 goto error;
867 }
868
869 ret = consumer_recv_status_reply(sock);
870
871 error:
872 return ret;
873 }
874
875 /*
876 * Consumer send channel communication message structure to consumer.
877 *
878 * The consumer socket lock must be held by the caller.
879 */
880 int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg)
881 {
882 int ret;
883
884 LTTNG_ASSERT(msg);
885 LTTNG_ASSERT(sock);
886
887 ret = consumer_send_msg(sock, msg);
888 if (ret < 0) {
889 goto error;
890 }
891
892 error:
893 return ret;
894 }
895
896 /*
897 * Populate the given consumer msg structure with the ask_channel command
898 * information.
899 */
900 void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
901 uint64_t subbuf_size,
902 uint64_t num_subbuf,
903 int overwrite,
904 unsigned int switch_timer_interval,
905 unsigned int read_timer_interval,
906 unsigned int live_timer_interval,
907 bool is_in_live_session,
908 unsigned int monitor_timer_interval,
909 int output,
910 int type,
911 uint64_t session_id,
912 const char *pathname,
913 const char *name,
914 uint64_t relayd_id,
915 uint64_t key,
916 const lttng_uuid& uuid,
917 uint32_t chan_id,
918 uint64_t tracefile_size,
919 uint64_t tracefile_count,
920 uint64_t session_id_per_pid,
921 unsigned int monitor,
922 uint32_t ust_app_uid,
923 int64_t blocking_timeout,
924 const char *root_shm_path,
925 const char *shm_path,
926 struct lttng_trace_chunk *trace_chunk,
927 const struct lttng_credentials *buffer_credentials)
928 {
929 LTTNG_ASSERT(msg);
930
931 /* Zeroed structure */
932 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
933 msg->u.ask_channel.buffer_credentials.uid = UINT32_MAX;
934 msg->u.ask_channel.buffer_credentials.gid = UINT32_MAX;
935
936 if (trace_chunk) {
937 uint64_t chunk_id;
938 enum lttng_trace_chunk_status chunk_status;
939
940 chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id);
941 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
942 LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id);
943 }
944 msg->u.ask_channel.buffer_credentials.uid = lttng_credentials_get_uid(buffer_credentials);
945 msg->u.ask_channel.buffer_credentials.gid = lttng_credentials_get_gid(buffer_credentials);
946
947 msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION;
948 msg->u.ask_channel.subbuf_size = subbuf_size;
949 msg->u.ask_channel.num_subbuf = num_subbuf;
950 msg->u.ask_channel.overwrite = overwrite;
951 msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
952 msg->u.ask_channel.read_timer_interval = read_timer_interval;
953 msg->u.ask_channel.live_timer_interval = live_timer_interval;
954 msg->u.ask_channel.is_live = is_in_live_session;
955 msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval;
956 msg->u.ask_channel.output = output;
957 msg->u.ask_channel.type = type;
958 msg->u.ask_channel.session_id = session_id;
959 msg->u.ask_channel.session_id_per_pid = session_id_per_pid;
960 msg->u.ask_channel.relayd_id = relayd_id;
961 msg->u.ask_channel.key = key;
962 msg->u.ask_channel.chan_id = chan_id;
963 msg->u.ask_channel.tracefile_size = tracefile_size;
964 msg->u.ask_channel.tracefile_count = tracefile_count;
965 msg->u.ask_channel.monitor = monitor;
966 msg->u.ask_channel.ust_app_uid = ust_app_uid;
967 msg->u.ask_channel.blocking_timeout = blocking_timeout;
968
969 std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid);
970
971 if (pathname) {
972 strncpy(msg->u.ask_channel.pathname, pathname, sizeof(msg->u.ask_channel.pathname));
973 msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname) - 1] = '\0';
974 }
975
976 strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name));
977 msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0';
978
979 if (root_shm_path) {
980 strncpy(msg->u.ask_channel.root_shm_path,
981 root_shm_path,
982 sizeof(msg->u.ask_channel.root_shm_path));
983 msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] =
984 '\0';
985 }
986 if (shm_path) {
987 strncpy(msg->u.ask_channel.shm_path, shm_path, sizeof(msg->u.ask_channel.shm_path));
988 msg->u.ask_channel.shm_path[sizeof(msg->u.ask_channel.shm_path) - 1] = '\0';
989 }
990 }
991
992 /*
993 * Init channel communication message structure.
994 */
995 void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
996 uint64_t channel_key,
997 uint64_t session_id,
998 const char *pathname,
999 uint64_t relayd_id,
1000 const char *name,
1001 unsigned int nb_init_streams,
1002 enum lttng_event_output output,
1003 int type,
1004 uint64_t tracefile_size,
1005 uint64_t tracefile_count,
1006 unsigned int monitor,
1007 unsigned int live_timer_interval,
1008 bool is_in_live_session,
1009 unsigned int monitor_timer_interval,
1010 struct lttng_trace_chunk *trace_chunk)
1011 {
1012 LTTNG_ASSERT(msg);
1013
1014 /* Zeroed structure */
1015 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1016
1017 if (trace_chunk) {
1018 uint64_t chunk_id;
1019 enum lttng_trace_chunk_status chunk_status;
1020
1021 chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id);
1022 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
1023 LTTNG_OPTIONAL_SET(&msg->u.channel.chunk_id, chunk_id);
1024 }
1025
1026 /* Send channel */
1027 msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
1028 msg->u.channel.channel_key = channel_key;
1029 msg->u.channel.session_id = session_id;
1030 msg->u.channel.relayd_id = relayd_id;
1031 msg->u.channel.nb_init_streams = nb_init_streams;
1032 msg->u.channel.output = output;
1033 msg->u.channel.type = type;
1034 msg->u.channel.tracefile_size = tracefile_size;
1035 msg->u.channel.tracefile_count = tracefile_count;
1036 msg->u.channel.monitor = monitor;
1037 msg->u.channel.live_timer_interval = live_timer_interval;
1038 msg->u.channel.is_live = is_in_live_session;
1039 msg->u.channel.monitor_timer_interval = monitor_timer_interval;
1040
1041 strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname));
1042 msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0';
1043
1044 strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name));
1045 msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0';
1046 }
1047
1048 /*
1049 * Init stream communication message structure.
1050 */
1051 void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg,
1052 uint64_t channel_key,
1053 uint64_t stream_key,
1054 int32_t cpu)
1055 {
1056 LTTNG_ASSERT(msg);
1057
1058 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1059
1060 msg->cmd_type = LTTNG_CONSUMER_ADD_STREAM;
1061 msg->u.stream.channel_key = channel_key;
1062 msg->u.stream.stream_key = stream_key;
1063 msg->u.stream.cpu = cpu;
1064 }
1065
1066 void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
1067 enum lttng_consumer_command cmd,
1068 uint64_t channel_key,
1069 uint64_t net_seq_idx)
1070 {
1071 LTTNG_ASSERT(msg);
1072
1073 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1074
1075 msg->cmd_type = cmd;
1076 msg->u.sent_streams.channel_key = channel_key;
1077 msg->u.sent_streams.net_seq_idx = net_seq_idx;
1078 }
1079
1080 /*
1081 * Send stream communication structure to the consumer.
1082 */
1083 int consumer_send_stream(struct consumer_socket *sock,
1084 struct consumer_output *dst,
1085 struct lttcomm_consumer_msg *msg,
1086 const int *fds,
1087 size_t nb_fd)
1088 {
1089 int ret;
1090
1091 LTTNG_ASSERT(msg);
1092 LTTNG_ASSERT(dst);
1093 LTTNG_ASSERT(sock);
1094 LTTNG_ASSERT(fds);
1095
1096 ret = consumer_send_msg(sock, msg);
1097 if (ret < 0) {
1098 goto error;
1099 }
1100
1101 ret = consumer_send_fds(sock, fds, nb_fd);
1102 if (ret < 0) {
1103 goto error;
1104 }
1105
1106 error:
1107 return ret;
1108 }
1109
1110 /*
1111 * Send relayd socket to consumer associated with a session name.
1112 *
1113 * The consumer socket lock must be held by the caller.
1114 *
1115 * On success return positive value. On error, negative value.
1116 */
1117 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
1118 struct lttcomm_relayd_sock *rsock,
1119 struct consumer_output *consumer,
1120 enum lttng_stream_type type,
1121 uint64_t session_id,
1122 const char *session_name,
1123 const char *hostname,
1124 const char *base_path,
1125 int session_live_timer,
1126 const uint64_t *current_chunk_id,
1127 time_t session_creation_time,
1128 bool session_name_contains_creation_time)
1129 {
1130 int ret;
1131 int fd;
1132 struct lttcomm_consumer_msg msg;
1133
1134 /* Code flow error. Safety net. */
1135 LTTNG_ASSERT(rsock);
1136 LTTNG_ASSERT(consumer);
1137 LTTNG_ASSERT(consumer_sock);
1138
1139 memset(&msg, 0, sizeof(msg));
1140 /* Bail out if consumer is disabled */
1141 if (!consumer->enabled) {
1142 ret = LTTNG_OK;
1143 goto error;
1144 }
1145
1146 if (type == LTTNG_STREAM_CONTROL) {
1147 char output_path[LTTNG_PATH_MAX] = {};
1148 uint64_t relayd_session_id;
1149
1150 ret = relayd_create_session(rsock,
1151 &relayd_session_id,
1152 session_name,
1153 hostname,
1154 base_path,
1155 session_live_timer,
1156 consumer->snapshot,
1157 session_id,
1158 the_sessiond_uuid,
1159 current_chunk_id,
1160 session_creation_time,
1161 session_name_contains_creation_time,
1162 output_path);
1163 if (ret < 0) {
1164 /* Close the control socket. */
1165 (void) relayd_close(rsock);
1166 goto error;
1167 }
1168 msg.u.relayd_sock.relayd_session_id = relayd_session_id;
1169 DBG("Created session on relay, output path reply: %s", output_path);
1170 }
1171
1172 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
1173 /*
1174 * Assign network consumer output index using the temporary consumer since
1175 * this call should only be made from within a set_consumer_uri() function
1176 * call in the session daemon.
1177 */
1178 msg.u.relayd_sock.net_index = consumer->net_seq_index;
1179 msg.u.relayd_sock.type = type;
1180 msg.u.relayd_sock.session_id = session_id;
1181 msg.u.relayd_sock.major = rsock->major;
1182 msg.u.relayd_sock.minor = rsock->minor;
1183 msg.u.relayd_sock.relayd_socket_protocol = rsock->sock.proto;
1184
1185 DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd_ptr);
1186 ret = consumer_send_msg(consumer_sock, &msg);
1187 if (ret < 0) {
1188 goto error;
1189 }
1190
1191 DBG3("Sending relayd socket file descriptor to consumer");
1192 fd = rsock->sock.fd;
1193 ret = consumer_send_fds(consumer_sock, &fd, 1);
1194 if (ret < 0) {
1195 goto error;
1196 }
1197
1198 DBG2("Consumer relayd socket sent");
1199
1200 error:
1201 return ret;
1202 }
1203
1204 static int
1205 consumer_send_pipe(struct consumer_socket *consumer_sock, enum lttng_consumer_command cmd, int pipe)
1206 {
1207 int ret;
1208 struct lttcomm_consumer_msg msg;
1209 const char *pipe_name;
1210 const char *command_name;
1211
1212 switch (cmd) {
1213 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1214 pipe_name = "channel monitor";
1215 command_name = "SET_CHANNEL_MONITOR_PIPE";
1216 break;
1217 default:
1218 ERR("Unexpected command received in %s (cmd = %d)", __func__, (int) cmd);
1219 abort();
1220 }
1221
1222 /* Code flow error. Safety net. */
1223
1224 memset(&msg, 0, sizeof(msg));
1225 msg.cmd_type = cmd;
1226
1227 pthread_mutex_lock(consumer_sock->lock);
1228 DBG3("Sending %s command to consumer", command_name);
1229 ret = consumer_send_msg(consumer_sock, &msg);
1230 if (ret < 0) {
1231 goto error;
1232 }
1233
1234 DBG3("Sending %s pipe %d to consumer on socket %d", pipe_name, pipe, *consumer_sock->fd_ptr);
1235 ret = consumer_send_fds(consumer_sock, &pipe, 1);
1236 if (ret < 0) {
1237 goto error;
1238 }
1239
1240 DBG2("%s pipe successfully sent", pipe_name);
1241 error:
1242 pthread_mutex_unlock(consumer_sock->lock);
1243 return ret;
1244 }
1245
1246 int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, int pipe)
1247 {
1248 return consumer_send_pipe(consumer_sock, LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe);
1249 }
1250
1251 /*
1252 * Ask the consumer if the data is pending for the specific session id.
1253 * Returns 1 if data is pending, 0 otherwise, or < 0 on error.
1254 */
1255 int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer)
1256 {
1257 int ret;
1258 int32_t ret_code = 0; /* Default is that the data is NOT pending */
1259 struct consumer_socket *socket;
1260 struct lttng_ht_iter iter;
1261 struct lttcomm_consumer_msg msg;
1262
1263 LTTNG_ASSERT(consumer);
1264
1265 DBG3("Consumer data pending for id %" PRIu64, session_id);
1266
1267 memset(&msg, 0, sizeof(msg));
1268 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
1269 msg.u.data_pending.session_id = session_id;
1270
1271 /* Send command for each consumer */
1272 rcu_read_lock();
1273 cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
1274 pthread_mutex_lock(socket->lock);
1275 ret = consumer_socket_send(socket, &msg, sizeof(msg));
1276 if (ret < 0) {
1277 pthread_mutex_unlock(socket->lock);
1278 goto error_unlock;
1279 }
1280
1281 /*
1282 * No need for a recv reply status because the answer to the command is
1283 * the reply status message.
1284 */
1285
1286 ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
1287 if (ret < 0) {
1288 pthread_mutex_unlock(socket->lock);
1289 goto error_unlock;
1290 }
1291 pthread_mutex_unlock(socket->lock);
1292
1293 if (ret_code == 1) {
1294 break;
1295 }
1296 }
1297 rcu_read_unlock();
1298
1299 DBG("Consumer data is %s pending for session id %" PRIu64,
1300 ret_code == 1 ? "" : "NOT",
1301 session_id);
1302 return ret_code;
1303
1304 error_unlock:
1305 rcu_read_unlock();
1306 return -1;
1307 }
1308
1309 /*
1310 * Send a flush command to consumer using the given channel key.
1311 *
1312 * Return 0 on success else a negative value.
1313 */
1314 int consumer_flush_channel(struct consumer_socket *socket, uint64_t key)
1315 {
1316 int ret;
1317 struct lttcomm_consumer_msg msg;
1318
1319 LTTNG_ASSERT(socket);
1320
1321 DBG2("Consumer flush channel key %" PRIu64, key);
1322
1323 memset(&msg, 0, sizeof(msg));
1324 msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL;
1325 msg.u.flush_channel.key = key;
1326
1327 pthread_mutex_lock(socket->lock);
1328 health_code_update();
1329
1330 ret = consumer_send_msg(socket, &msg);
1331 if (ret < 0) {
1332 goto end;
1333 }
1334
1335 end:
1336 health_code_update();
1337 pthread_mutex_unlock(socket->lock);
1338 return ret;
1339 }
1340
1341 /*
1342 * Send a clear quiescent command to consumer using the given channel key.
1343 *
1344 * Return 0 on success else a negative value.
1345 */
1346 int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key)
1347 {
1348 int ret;
1349 struct lttcomm_consumer_msg msg;
1350
1351 LTTNG_ASSERT(socket);
1352
1353 DBG2("Consumer clear quiescent channel key %" PRIu64, key);
1354
1355 memset(&msg, 0, sizeof(msg));
1356 msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL;
1357 msg.u.clear_quiescent_channel.key = key;
1358
1359 pthread_mutex_lock(socket->lock);
1360 health_code_update();
1361
1362 ret = consumer_send_msg(socket, &msg);
1363 if (ret < 0) {
1364 goto end;
1365 }
1366
1367 end:
1368 health_code_update();
1369 pthread_mutex_unlock(socket->lock);
1370 return ret;
1371 }
1372
1373 /*
1374 * Send a close metadata command to consumer using the given channel key.
1375 * Called with registry lock held.
1376 *
1377 * Return 0 on success else a negative value.
1378 */
1379 int consumer_close_metadata(struct consumer_socket *socket, uint64_t metadata_key)
1380 {
1381 int ret;
1382 struct lttcomm_consumer_msg msg;
1383
1384 LTTNG_ASSERT(socket);
1385
1386 DBG2("Consumer close metadata channel key %" PRIu64, metadata_key);
1387
1388 memset(&msg, 0, sizeof(msg));
1389 msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
1390 msg.u.close_metadata.key = metadata_key;
1391
1392 pthread_mutex_lock(socket->lock);
1393 health_code_update();
1394
1395 ret = consumer_send_msg(socket, &msg);
1396 if (ret < 0) {
1397 goto end;
1398 }
1399
1400 end:
1401 health_code_update();
1402 pthread_mutex_unlock(socket->lock);
1403 return ret;
1404 }
1405
1406 /*
1407 * Send a setup metdata command to consumer using the given channel key.
1408 *
1409 * Return 0 on success else a negative value.
1410 */
1411 int consumer_setup_metadata(struct consumer_socket *socket, uint64_t metadata_key)
1412 {
1413 int ret;
1414 struct lttcomm_consumer_msg msg;
1415
1416 LTTNG_ASSERT(socket);
1417
1418 DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key);
1419
1420 memset(&msg, 0, sizeof(msg));
1421 msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
1422 msg.u.setup_metadata.key = metadata_key;
1423
1424 pthread_mutex_lock(socket->lock);
1425 health_code_update();
1426
1427 ret = consumer_send_msg(socket, &msg);
1428 if (ret < 0) {
1429 goto end;
1430 }
1431
1432 end:
1433 health_code_update();
1434 pthread_mutex_unlock(socket->lock);
1435 return ret;
1436 }
1437
1438 /*
1439 * Send metadata string to consumer.
1440 * RCU read-side lock must be held to guarantee existence of socket.
1441 *
1442 * Return 0 on success else a negative value.
1443 */
1444 int consumer_push_metadata(struct consumer_socket *socket,
1445 uint64_t metadata_key,
1446 char *metadata_str,
1447 size_t len,
1448 size_t target_offset,
1449 uint64_t version)
1450 {
1451 int ret;
1452 struct lttcomm_consumer_msg msg;
1453
1454 LTTNG_ASSERT(socket);
1455 ASSERT_RCU_READ_LOCKED();
1456
1457 DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr);
1458
1459 pthread_mutex_lock(socket->lock);
1460
1461 memset(&msg, 0, sizeof(msg));
1462 msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
1463 msg.u.push_metadata.key = metadata_key;
1464 msg.u.push_metadata.target_offset = target_offset;
1465 msg.u.push_metadata.len = len;
1466 msg.u.push_metadata.version = version;
1467
1468 health_code_update();
1469 ret = consumer_send_msg(socket, &msg);
1470 if (ret < 0 || len == 0) {
1471 goto end;
1472 }
1473
1474 DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd_ptr, len);
1475
1476 ret = consumer_socket_send(socket, metadata_str, len);
1477 if (ret < 0) {
1478 goto end;
1479 }
1480
1481 health_code_update();
1482 ret = consumer_recv_status_reply(socket);
1483 if (ret < 0) {
1484 goto end;
1485 }
1486
1487 end:
1488 pthread_mutex_unlock(socket->lock);
1489 health_code_update();
1490 return ret;
1491 }
1492
1493 /*
1494 * Ask the consumer to snapshot a specific channel using the key.
1495 *
1496 * Returns LTTNG_OK on success or else an LTTng error code.
1497 */
1498 enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket,
1499 uint64_t key,
1500 const struct consumer_output *output,
1501 int metadata,
1502 const char *channel_path,
1503 uint64_t nb_packets_per_stream)
1504 {
1505 int ret;
1506 enum lttng_error_code status = LTTNG_OK;
1507 struct lttcomm_consumer_msg msg;
1508
1509 LTTNG_ASSERT(socket);
1510 LTTNG_ASSERT(output);
1511
1512 DBG("Consumer snapshot channel key %" PRIu64, key);
1513
1514 memset(&msg, 0, sizeof(msg));
1515 msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
1516 msg.u.snapshot_channel.key = key;
1517 msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
1518 msg.u.snapshot_channel.metadata = metadata;
1519
1520 if (output->type == CONSUMER_DST_NET) {
1521 msg.u.snapshot_channel.relayd_id = output->net_seq_index;
1522 msg.u.snapshot_channel.use_relayd = 1;
1523 } else {
1524 msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL;
1525 }
1526 ret = lttng_strncpy(msg.u.snapshot_channel.pathname,
1527 channel_path,
1528 sizeof(msg.u.snapshot_channel.pathname));
1529 if (ret < 0) {
1530 ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%zu bytes required) with path \"%s\"",
1531 sizeof(msg.u.snapshot_channel.pathname),
1532 strlen(channel_path),
1533 channel_path);
1534 status = LTTNG_ERR_SNAPSHOT_FAIL;
1535 goto error;
1536 }
1537
1538 health_code_update();
1539 pthread_mutex_lock(socket->lock);
1540 ret = consumer_send_msg(socket, &msg);
1541 pthread_mutex_unlock(socket->lock);
1542 if (ret < 0) {
1543 switch (-ret) {
1544 case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
1545 status = LTTNG_ERR_CHAN_NOT_FOUND;
1546 break;
1547 default:
1548 status = LTTNG_ERR_SNAPSHOT_FAIL;
1549 break;
1550 }
1551 goto error;
1552 }
1553
1554 error:
1555 health_code_update();
1556 return status;
1557 }
1558
1559 /*
1560 * Ask the consumer the number of discarded events for a channel.
1561 */
1562 int consumer_get_discarded_events(uint64_t session_id,
1563 uint64_t channel_key,
1564 struct consumer_output *consumer,
1565 uint64_t *discarded)
1566 {
1567 int ret;
1568 struct consumer_socket *socket;
1569 struct lttng_ht_iter iter;
1570 struct lttcomm_consumer_msg msg;
1571
1572 LTTNG_ASSERT(consumer);
1573
1574 DBG3("Consumer discarded events id %" PRIu64, session_id);
1575
1576 memset(&msg, 0, sizeof(msg));
1577 msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS;
1578 msg.u.discarded_events.session_id = session_id;
1579 msg.u.discarded_events.channel_key = channel_key;
1580
1581 *discarded = 0;
1582
1583 /* Send command for each consumer */
1584 rcu_read_lock();
1585 cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
1586 uint64_t consumer_discarded = 0;
1587 pthread_mutex_lock(socket->lock);
1588 ret = consumer_socket_send(socket, &msg, sizeof(msg));
1589 if (ret < 0) {
1590 pthread_mutex_unlock(socket->lock);
1591 goto end;
1592 }
1593
1594 /*
1595 * No need for a recv reply status because the answer to the
1596 * command is the reply status message.
1597 */
1598 ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
1599 if (ret < 0) {
1600 ERR("get discarded events");
1601 pthread_mutex_unlock(socket->lock);
1602 goto end;
1603 }
1604 pthread_mutex_unlock(socket->lock);
1605 *discarded += consumer_discarded;
1606 }
1607 ret = 0;
1608 DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id);
1609
1610 end:
1611 rcu_read_unlock();
1612 return ret;
1613 }
1614
1615 /*
1616 * Ask the consumer the number of lost packets for a channel.
1617 */
1618 int consumer_get_lost_packets(uint64_t session_id,
1619 uint64_t channel_key,
1620 struct consumer_output *consumer,
1621 uint64_t *lost)
1622 {
1623 int ret;
1624 struct consumer_socket *socket;
1625 struct lttng_ht_iter iter;
1626 struct lttcomm_consumer_msg msg;
1627
1628 LTTNG_ASSERT(consumer);
1629
1630 DBG3("Consumer lost packets id %" PRIu64, session_id);
1631
1632 memset(&msg, 0, sizeof(msg));
1633 msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS;
1634 msg.u.lost_packets.session_id = session_id;
1635 msg.u.lost_packets.channel_key = channel_key;
1636
1637 *lost = 0;
1638
1639 /* Send command for each consumer */
1640 rcu_read_lock();
1641 cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
1642 uint64_t consumer_lost = 0;
1643 pthread_mutex_lock(socket->lock);
1644 ret = consumer_socket_send(socket, &msg, sizeof(msg));
1645 if (ret < 0) {
1646 pthread_mutex_unlock(socket->lock);
1647 goto end;
1648 }
1649
1650 /*
1651 * No need for a recv reply status because the answer to the
1652 * command is the reply status message.
1653 */
1654 ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
1655 if (ret < 0) {
1656 ERR("get lost packets");
1657 pthread_mutex_unlock(socket->lock);
1658 goto end;
1659 }
1660 pthread_mutex_unlock(socket->lock);
1661 *lost += consumer_lost;
1662 }
1663 ret = 0;
1664 DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id);
1665
1666 end:
1667 rcu_read_unlock();
1668 return ret;
1669 }
1670
1671 /*
1672 * Ask the consumer to rotate a channel.
1673 *
1674 * The new_chunk_id is the session->rotate_count that has been incremented
1675 * when the rotation started. On the relay, this allows to keep track in which
1676 * chunk each stream is currently writing to (for the rotate_pending operation).
1677 */
1678 int consumer_rotate_channel(struct consumer_socket *socket,
1679 uint64_t key,
1680 struct consumer_output *output,
1681 bool is_metadata_channel)
1682 {
1683 int ret;
1684 struct lttcomm_consumer_msg msg;
1685
1686 LTTNG_ASSERT(socket);
1687
1688 DBG("Consumer rotate channel key %" PRIu64, key);
1689
1690 pthread_mutex_lock(socket->lock);
1691 memset(&msg, 0, sizeof(msg));
1692 msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL;
1693 msg.u.rotate_channel.key = key;
1694 msg.u.rotate_channel.metadata = !!is_metadata_channel;
1695
1696 if (output->type == CONSUMER_DST_NET) {
1697 msg.u.rotate_channel.relayd_id = output->net_seq_index;
1698 } else {
1699 msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
1700 }
1701
1702 health_code_update();
1703 ret = consumer_send_msg(socket, &msg);
1704 if (ret < 0) {
1705 switch (-ret) {
1706 case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
1707 ret = -LTTNG_ERR_CHAN_NOT_FOUND;
1708 break;
1709 default:
1710 ret = -LTTNG_ERR_ROTATION_FAIL_CONSUMER;
1711 break;
1712 }
1713 goto error;
1714 }
1715 error:
1716 pthread_mutex_unlock(socket->lock);
1717 health_code_update();
1718 return ret;
1719 }
1720
1721 int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key)
1722 {
1723 int ret;
1724 lttcomm_consumer_msg msg = {
1725 .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
1726 .u = {},
1727 };
1728 msg.u.open_channel_packets.key = key;
1729
1730 LTTNG_ASSERT(socket);
1731
1732 DBG("Consumer open channel packets: channel key = %" PRIu64, key);
1733
1734 health_code_update();
1735
1736 pthread_mutex_lock(socket->lock);
1737 ret = consumer_send_msg(socket, &msg);
1738 pthread_mutex_unlock(socket->lock);
1739 if (ret < 0) {
1740 goto error_socket;
1741 }
1742
1743 error_socket:
1744 health_code_update();
1745 return ret;
1746 }
1747
1748 int consumer_clear_channel(struct consumer_socket *socket, uint64_t key)
1749 {
1750 int ret;
1751 struct lttcomm_consumer_msg msg;
1752
1753 LTTNG_ASSERT(socket);
1754
1755 DBG("Consumer clear channel %" PRIu64, key);
1756
1757 memset(&msg, 0, sizeof(msg));
1758 msg.cmd_type = LTTNG_CONSUMER_CLEAR_CHANNEL;
1759 msg.u.clear_channel.key = key;
1760
1761 health_code_update();
1762
1763 pthread_mutex_lock(socket->lock);
1764 ret = consumer_send_msg(socket, &msg);
1765 if (ret < 0) {
1766 goto error_socket;
1767 }
1768
1769 error_socket:
1770 pthread_mutex_unlock(socket->lock);
1771
1772 health_code_update();
1773 return ret;
1774 }
1775
1776 int consumer_init(struct consumer_socket *socket, const lttng_uuid& sessiond_uuid)
1777 {
1778 int ret;
1779 struct lttcomm_consumer_msg msg = {
1780 .cmd_type = LTTNG_CONSUMER_INIT,
1781 .u = {},
1782 };
1783
1784 LTTNG_ASSERT(socket);
1785
1786 DBG("Sending consumer initialization command");
1787 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg.u.init.sessiond_uuid);
1788
1789 health_code_update();
1790 ret = consumer_send_msg(socket, &msg);
1791 if (ret < 0) {
1792 goto error;
1793 }
1794
1795 error:
1796 health_code_update();
1797 return ret;
1798 }
1799
1800 /*
1801 * Ask the consumer to create a new chunk for a given session.
1802 *
1803 * Called with the consumer socket lock held.
1804 */
1805 int consumer_create_trace_chunk(struct consumer_socket *socket,
1806 uint64_t relayd_id,
1807 uint64_t session_id,
1808 struct lttng_trace_chunk *chunk,
1809 const char *domain_subdir)
1810 {
1811 int ret;
1812 enum lttng_trace_chunk_status chunk_status;
1813 struct lttng_credentials chunk_credentials;
1814 const struct lttng_directory_handle *chunk_directory_handle = nullptr;
1815 struct lttng_directory_handle *domain_handle = nullptr;
1816 int domain_dirfd;
1817 const char *chunk_name;
1818 bool chunk_name_overridden;
1819 uint64_t chunk_id;
1820 time_t creation_timestamp;
1821 char creation_timestamp_buffer[ISO8601_STR_LEN];
1822 const char *creation_timestamp_str = "(none)";
1823 const bool chunk_has_local_output = relayd_id == -1ULL;
1824 enum lttng_trace_chunk_status tc_status;
1825 struct lttcomm_consumer_msg msg = {
1826 .cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK,
1827 .u = {},
1828 };
1829 msg.u.create_trace_chunk.session_id = session_id;
1830
1831 LTTNG_ASSERT(socket);
1832 LTTNG_ASSERT(chunk);
1833
1834 if (relayd_id != -1ULL) {
1835 LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, relayd_id);
1836 }
1837
1838 chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, &chunk_name_overridden);
1839 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1840 chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1841 ERR("Failed to get name of trace chunk");
1842 ret = -LTTNG_ERR_FATAL;
1843 goto error;
1844 }
1845 if (chunk_name_overridden) {
1846 ret = lttng_strncpy(msg.u.create_trace_chunk.override_name,
1847 chunk_name,
1848 sizeof(msg.u.create_trace_chunk.override_name));
1849 if (ret) {
1850 ERR("Trace chunk name \"%s\" exceeds the maximal length allowed by the consumer protocol",
1851 chunk_name);
1852 ret = -LTTNG_ERR_FATAL;
1853 goto error;
1854 }
1855 }
1856
1857 chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, &creation_timestamp);
1858 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1859 ret = -LTTNG_ERR_FATAL;
1860 goto error;
1861 }
1862 msg.u.create_trace_chunk.creation_timestamp = (uint64_t) creation_timestamp;
1863 /* Only used for logging purposes. */
1864 ret = time_to_iso8601_str(
1865 creation_timestamp, creation_timestamp_buffer, sizeof(creation_timestamp_buffer));
1866 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
1867
1868 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1869 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1870 /*
1871 * Anonymous trace chunks should never be transmitted
1872 * to remote peers (consumerd and relayd). They are used
1873 * internally for backward-compatibility purposes.
1874 */
1875 ret = -LTTNG_ERR_FATAL;
1876 goto error;
1877 }
1878 msg.u.create_trace_chunk.chunk_id = chunk_id;
1879
1880 if (chunk_has_local_output) {
1881 chunk_status = lttng_trace_chunk_borrow_chunk_directory_handle(
1882 chunk, &chunk_directory_handle);
1883 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1884 ret = -LTTNG_ERR_FATAL;
1885 goto error;
1886 }
1887 chunk_status = lttng_trace_chunk_get_credentials(chunk, &chunk_credentials);
1888 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1889 /*
1890 * Not associating credentials to a sessiond chunk is a
1891 * fatal internal error.
1892 */
1893 ret = -LTTNG_ERR_FATAL;
1894 goto error;
1895 }
1896 tc_status = lttng_trace_chunk_create_subdirectory(chunk, domain_subdir);
1897 if (tc_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1898 PERROR("Failed to create chunk domain output directory \"%s\"",
1899 domain_subdir);
1900 ret = -LTTNG_ERR_FATAL;
1901 goto error;
1902 }
1903 domain_handle = lttng_directory_handle_create_from_handle(domain_subdir,
1904 chunk_directory_handle);
1905 if (!domain_handle) {
1906 ret = -LTTNG_ERR_FATAL;
1907 goto error;
1908 }
1909
1910 /*
1911 * This will only compile on platforms that support
1912 * dirfd (POSIX.2008). This is fine as the session daemon
1913 * is only built for such platforms.
1914 *
1915 * The ownership of the chunk directory handle's is maintained
1916 * by the trace chunk.
1917 */
1918 domain_dirfd = lttng_directory_handle_get_dirfd(domain_handle);
1919 LTTNG_ASSERT(domain_dirfd >= 0);
1920
1921 msg.u.create_trace_chunk.credentials.value.uid =
1922 lttng_credentials_get_uid(&chunk_credentials);
1923 msg.u.create_trace_chunk.credentials.value.gid =
1924 lttng_credentials_get_gid(&chunk_credentials);
1925 msg.u.create_trace_chunk.credentials.is_set = 1;
1926 }
1927
1928 DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64
1929 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", creation_timestamp = %s",
1930 relayd_id,
1931 session_id,
1932 chunk_id,
1933 creation_timestamp_str);
1934 health_code_update();
1935 ret = consumer_send_msg(socket, &msg);
1936 health_code_update();
1937 if (ret < 0) {
1938 ERR("Trace chunk creation error on consumer");
1939 ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER;
1940 goto error;
1941 }
1942
1943 if (chunk_has_local_output) {
1944 DBG("Sending trace chunk domain directory fd to consumer");
1945 health_code_update();
1946 ret = consumer_send_fds(socket, &domain_dirfd, 1);
1947 health_code_update();
1948 if (ret < 0) {
1949 ERR("Trace chunk creation error on consumer");
1950 ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER;
1951 goto error;
1952 }
1953 }
1954 error:
1955 lttng_directory_handle_put(domain_handle);
1956 return ret;
1957 }
1958
1959 /*
1960 * Ask the consumer to close a trace chunk for a given session.
1961 *
1962 * Called with the consumer socket lock held.
1963 */
1964 int consumer_close_trace_chunk(struct consumer_socket *socket,
1965 uint64_t relayd_id,
1966 uint64_t session_id,
1967 struct lttng_trace_chunk *chunk,
1968 char *closed_trace_chunk_path)
1969 {
1970 int ret;
1971 enum lttng_trace_chunk_status chunk_status;
1972 lttcomm_consumer_msg msg = {
1973 .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
1974 .u = {},
1975 };
1976 msg.u.close_trace_chunk.session_id = session_id;
1977
1978 struct lttcomm_consumer_close_trace_chunk_reply reply;
1979 uint64_t chunk_id;
1980 time_t close_timestamp;
1981 enum lttng_trace_chunk_command_type close_command;
1982 const char *close_command_name = "none";
1983 struct lttng_dynamic_buffer path_reception_buffer;
1984
1985 LTTNG_ASSERT(socket);
1986 lttng_dynamic_buffer_init(&path_reception_buffer);
1987
1988 if (relayd_id != -1ULL) {
1989 LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id, relayd_id);
1990 }
1991
1992 chunk_status = lttng_trace_chunk_get_close_command(chunk, &close_command);
1993 switch (chunk_status) {
1994 case LTTNG_TRACE_CHUNK_STATUS_OK:
1995 LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command,
1996 (uint32_t) close_command);
1997 break;
1998 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1999 break;
2000 default:
2001 ERR("Failed to get trace chunk close command");
2002 ret = -1;
2003 goto error;
2004 }
2005
2006 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
2007 /*
2008 * Anonymous trace chunks should never be transmitted to remote peers
2009 * (consumerd and relayd). They are used internally for
2010 * backward-compatibility purposes.
2011 */
2012 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
2013 msg.u.close_trace_chunk.chunk_id = chunk_id;
2014
2015 chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
2016 /*
2017 * A trace chunk should be closed locally before being closed remotely.
2018 * Otherwise, the close timestamp would never be transmitted to the
2019 * peers.
2020 */
2021 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
2022 msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp;
2023
2024 if (msg.u.close_trace_chunk.close_command.is_set) {
2025 close_command_name = lttng_trace_chunk_command_type_get_name(close_command);
2026 }
2027 DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64
2028 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = \"%s\"",
2029 relayd_id,
2030 session_id,
2031 chunk_id,
2032 close_command_name);
2033
2034 health_code_update();
2035 ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg));
2036 if (ret < 0) {
2037 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
2038 goto error;
2039 }
2040 ret = consumer_socket_recv(socket, &reply, sizeof(reply));
2041 if (ret < 0) {
2042 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
2043 goto error;
2044 }
2045 if (reply.path_length >= LTTNG_PATH_MAX) {
2046 ERR("Invalid path returned by relay daemon: %" PRIu32
2047 "bytes exceeds maximal allowed length of %d bytes",
2048 reply.path_length,
2049 LTTNG_PATH_MAX);
2050 ret = -LTTNG_ERR_INVALID_PROTOCOL;
2051 goto error;
2052 }
2053 ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, reply.path_length);
2054 if (ret) {
2055 ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command");
2056 ret = -LTTNG_ERR_NOMEM;
2057 goto error;
2058 }
2059 ret = consumer_socket_recv(socket, path_reception_buffer.data, path_reception_buffer.size);
2060 if (ret < 0) {
2061 ERR("Communication error while receiving path of closed trace chunk");
2062 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
2063 goto error;
2064 }
2065 if (path_reception_buffer.data[path_reception_buffer.size - 1] != '\0') {
2066 ERR("Invalid path returned by relay daemon: not null-terminated");
2067 ret = -LTTNG_ERR_INVALID_PROTOCOL;
2068 goto error;
2069 }
2070 if (closed_trace_chunk_path) {
2071 /*
2072 * closed_trace_chunk_path is assumed to have a length >=
2073 * LTTNG_PATH_MAX
2074 */
2075 memcpy(closed_trace_chunk_path,
2076 path_reception_buffer.data,
2077 path_reception_buffer.size);
2078 }
2079 error:
2080 lttng_dynamic_buffer_reset(&path_reception_buffer);
2081 health_code_update();
2082 return ret;
2083 }
2084
2085 /*
2086 * Ask the consumer if a trace chunk exists.
2087 *
2088 * Called with the consumer socket lock held.
2089 * Returns 0 on success, or a negative value on error.
2090 */
2091 int consumer_trace_chunk_exists(struct consumer_socket *socket,
2092 uint64_t relayd_id,
2093 uint64_t session_id,
2094 struct lttng_trace_chunk *chunk,
2095 enum consumer_trace_chunk_exists_status *result)
2096 {
2097 int ret;
2098 enum lttng_trace_chunk_status chunk_status;
2099 lttcomm_consumer_msg msg = {
2100 .cmd_type = LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
2101 .u = {},
2102 };
2103 msg.u.trace_chunk_exists.session_id = session_id;
2104
2105 uint64_t chunk_id;
2106 const char *consumer_reply_str;
2107
2108 LTTNG_ASSERT(socket);
2109
2110 if (relayd_id != -1ULL) {
2111 LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, relayd_id);
2112 }
2113
2114 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
2115 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
2116 /*
2117 * Anonymous trace chunks should never be transmitted
2118 * to remote peers (consumerd and relayd). They are used
2119 * internally for backward-compatibility purposes.
2120 */
2121 ret = -LTTNG_ERR_FATAL;
2122 goto error;
2123 }
2124 msg.u.trace_chunk_exists.chunk_id = chunk_id;
2125
2126 DBG("Sending consumer trace chunk exists command: relayd_id = %" PRId64
2127 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64,
2128 relayd_id,
2129 session_id,
2130 chunk_id);
2131
2132 health_code_update();
2133 ret = consumer_send_msg(socket, &msg);
2134 switch (-ret) {
2135 case LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK:
2136 consumer_reply_str = "unknown trace chunk";
2137 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK;
2138 break;
2139 case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL:
2140 consumer_reply_str = "trace chunk exists locally";
2141 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_LOCAL;
2142 break;
2143 case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE:
2144 consumer_reply_str = "trace chunk exists on remote peer";
2145 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_REMOTE;
2146 break;
2147 default:
2148 ERR("Consumer returned an error from TRACE_CHUNK_EXISTS command");
2149 ret = -1;
2150 goto error;
2151 }
2152 DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", consumer_reply_str);
2153 ret = 0;
2154 error:
2155 health_code_update();
2156 return ret;
2157 }
This page took 0.105737 seconds and 4 git commands to generate.