Clean-up: replace uses of `int found` as bool by `bool found`
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.cpp
CommitLineData
00e2e675 1/*
ab5be9fa
MJ
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
00e2e675 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
00e2e675 6 *
00e2e675
DG
7 */
8
6c1c0768 9#define _LGPL_SOURCE
28ab034a
JG
10#include "consumer.hpp"
11#include "health-sessiond.hpp"
12#include "lttng-sessiond.hpp"
13#include "ust-app.hpp"
14#include "utils.hpp"
00e2e675 15
c9e313bc
SM
16#include <common/common.hpp>
17#include <common/defaults.hpp>
c9e313bc
SM
18#include <common/relayd/relayd.hpp>
19#include <common/string-utils/format.hpp>
28ab034a 20#include <common/uri.hpp>
00e2e675 21
28ab034a
JG
22#include <inttypes.h>
23#include <stdio.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/stat.h>
27#include <sys/types.h>
28#include <unistd.h>
00e2e675 29
3b967712
MD
30/*
31 * Return allocated full pathname of the session using the consumer trace path
32 * and subdir if available.
33 *
34 * The caller can safely free(3) the returned value. On error, NULL is
35 * returned.
36 */
37char *setup_channel_trace_path(struct consumer_output *consumer,
28ab034a
JG
38 const char *session_path,
39 size_t *consumer_path_offset)
3b967712
MD
40{
41 int ret;
42 char *pathname;
43
a0377dfe
FD
44 LTTNG_ASSERT(consumer);
45 LTTNG_ASSERT(session_path);
3b967712
MD
46
47 health_code_update();
48
49 /*
50 * Allocate the string ourself to make sure we never exceed
51 * LTTNG_PATH_MAX.
52 */
64803277 53 pathname = calloc<char>(LTTNG_PATH_MAX);
3b967712
MD
54 if (!pathname) {
55 goto error;
56 }
57
58 /* Get correct path name destination */
28ab034a
JG
59 if (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 &&
60 consumer->relay_minor_version < 11) {
61 ret = snprintf(pathname,
62 LTTNG_PATH_MAX,
63 "%s%s/%s/%s",
64 consumer->dst.net.base_dir,
65 consumer->chunk_path,
66 consumer->domain_subdir,
67 session_path);
5da88b0f 68 *consumer_path_offset = 0;
3b967712 69 } else {
28ab034a
JG
70 ret = snprintf(
71 pathname, LTTNG_PATH_MAX, "%s/%s", consumer->domain_subdir, session_path);
5da88b0f 72 *consumer_path_offset = strlen(consumer->domain_subdir) + 1;
3b967712 73 }
28ab034a 74 DBG3("Consumer trace path relative to current trace chunk: \"%s\"", pathname);
3b967712
MD
75 if (ret < 0) {
76 PERROR("Failed to format channel path");
77 goto error;
78 } else if (ret >= LTTNG_PATH_MAX) {
79 ERR("Truncation occurred while formatting channel path");
3b967712
MD
80 goto error;
81 }
82
83 return pathname;
84error:
85 free(pathname);
cd9adb8b 86 return nullptr;
3b967712
MD
87}
88
52898cb1
DG
89/*
90 * Send a data payload using a given consumer socket of size len.
91 *
92 * The consumer socket lock MUST be acquired before calling this since this
93 * function can change the fd value.
94 *
95 * Return 0 on success else a negative value on error.
96 */
28ab034a 97int consumer_socket_send(struct consumer_socket *socket, const void *msg, size_t len)
52898cb1
DG
98{
99 int fd;
100 ssize_t size;
101
a0377dfe
FD
102 LTTNG_ASSERT(socket);
103 LTTNG_ASSERT(socket->fd_ptr);
104 LTTNG_ASSERT(msg);
52898cb1
DG
105
106 /* Consumer socket is invalid. Stopping. */
9363801e 107 fd = *socket->fd_ptr;
52898cb1
DG
108 if (fd < 0) {
109 goto error;
110 }
111
112 size = lttcomm_send_unix_sock(fd, msg, len);
113 if (size < 0) {
114 /* The above call will print a PERROR on error. */
115 DBG("Error when sending data to consumer on sock %d", fd);
116 /*
92db7cdc
DG
117 * At this point, the socket is not usable anymore thus closing it and
118 * setting the file descriptor to -1 so it is not reused.
52898cb1
DG
119 */
120
121 /* This call will PERROR on error. */
122 (void) lttcomm_close_unix_sock(fd);
9363801e 123 *socket->fd_ptr = -1;
52898cb1
DG
124 goto error;
125 }
126
127 return 0;
128
129error:
130 return -1;
131}
132
133/*
134 * Receive a data payload using a given consumer socket of size len.
135 *
136 * The consumer socket lock MUST be acquired before calling this since this
137 * function can change the fd value.
138 *
139 * Return 0 on success else a negative value on error.
140 */
141int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len)
142{
143 int fd;
144 ssize_t size;
145
a0377dfe
FD
146 LTTNG_ASSERT(socket);
147 LTTNG_ASSERT(socket->fd_ptr);
148 LTTNG_ASSERT(msg);
52898cb1
DG
149
150 /* Consumer socket is invalid. Stopping. */
9363801e 151 fd = *socket->fd_ptr;
52898cb1
DG
152 if (fd < 0) {
153 goto error;
154 }
155
156 size = lttcomm_recv_unix_sock(fd, msg, len);
157 if (size <= 0) {
158 /* The above call will print a PERROR on error. */
159 DBG("Error when receiving data from the consumer socket %d", fd);
160 /*
92db7cdc
DG
161 * At this point, the socket is not usable anymore thus closing it and
162 * setting the file descriptor to -1 so it is not reused.
52898cb1
DG
163 */
164
165 /* This call will PERROR on error. */
166 (void) lttcomm_close_unix_sock(fd);
9363801e 167 *socket->fd_ptr = -1;
52898cb1
DG
168 goto error;
169 }
170
171 return 0;
172
173error:
174 return -1;
175}
176
f50f23d9
DG
177/*
178 * Receive a reply command status message from the consumer. Consumer socket
179 * lock MUST be acquired before calling this function.
180 *
181 * Return 0 on success, -1 on recv error or a negative lttng error code which
182 * was possibly returned by the consumer.
183 */
184int consumer_recv_status_reply(struct consumer_socket *sock)
185{
186 int ret;
187 struct lttcomm_consumer_status_msg reply;
188
a0377dfe 189 LTTNG_ASSERT(sock);
f50f23d9 190
52898cb1
DG
191 ret = consumer_socket_recv(sock, &reply, sizeof(reply));
192 if (ret < 0) {
f50f23d9
DG
193 goto end;
194 }
195
0c759fc9 196 if (reply.ret_code == LTTCOMM_CONSUMERD_SUCCESS) {
f50f23d9
DG
197 /* All good. */
198 ret = 0;
199 } else {
200 ret = -reply.ret_code;
ffe60014 201 DBG("Consumer ret code %d", ret);
f50f23d9
DG
202 }
203
204end:
205 return ret;
206}
207
ffe60014
DG
208/*
209 * Once the ASK_CHANNEL command is sent to the consumer, the channel
210 * information are sent back. This call receives that data and populates key
211 * and stream_count.
212 *
213 * On success return 0 and both key and stream_count are set. On error, a
214 * negative value is sent back and both parameters are untouched.
215 */
216int consumer_recv_status_channel(struct consumer_socket *sock,
28ab034a
JG
217 uint64_t *key,
218 unsigned int *stream_count)
ffe60014
DG
219{
220 int ret;
221 struct lttcomm_consumer_status_channel reply;
222
a0377dfe
FD
223 LTTNG_ASSERT(sock);
224 LTTNG_ASSERT(stream_count);
225 LTTNG_ASSERT(key);
ffe60014 226
52898cb1
DG
227 ret = consumer_socket_recv(sock, &reply, sizeof(reply));
228 if (ret < 0) {
ffe60014
DG
229 goto end;
230 }
231
232 /* An error is possible so don't touch the key and stream_count. */
0c759fc9 233 if (reply.ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
ffe60014
DG
234 ret = -1;
235 goto end;
236 }
237
238 *key = reply.key;
239 *stream_count = reply.stream_count;
0c759fc9 240 ret = 0;
ffe60014
DG
241
242end:
243 return ret;
244}
245
2f77fc4b
DG
246/*
247 * Send destroy relayd command to consumer.
248 *
249 * On success return positive value. On error, negative value.
250 */
28ab034a 251int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer)
2f77fc4b
DG
252{
253 int ret;
254 struct lttcomm_consumer_msg msg;
255
a0377dfe
FD
256 LTTNG_ASSERT(consumer);
257 LTTNG_ASSERT(sock);
2f77fc4b 258
9363801e 259 DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd_ptr);
2f77fc4b 260
53efb85a 261 memset(&msg, 0, sizeof(msg));
2f77fc4b
DG
262 msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
263 msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
264
265 pthread_mutex_lock(sock->lock);
52898cb1 266 ret = consumer_socket_send(sock, &msg, sizeof(msg));
2f77fc4b 267 if (ret < 0) {
52898cb1 268 goto error;
2f77fc4b
DG
269 }
270
f50f23d9
DG
271 /* Don't check the return value. The caller will do it. */
272 ret = consumer_recv_status_reply(sock);
273
2f77fc4b
DG
274 DBG2("Consumer send destroy relayd command done");
275
276error:
52898cb1 277 pthread_mutex_unlock(sock->lock);
2f77fc4b
DG
278 return ret;
279}
280
281/*
282 * For each consumer socket in the consumer output object, send a destroy
283 * relayd command.
284 */
285void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
286{
2f77fc4b
DG
287 struct lttng_ht_iter iter;
288 struct consumer_socket *socket;
289
a0377dfe 290 LTTNG_ASSERT(consumer);
2f77fc4b
DG
291
292 /* Destroy any relayd connection */
6dc3064a 293 if (consumer->type == CONSUMER_DST_NET) {
2f77fc4b 294 rcu_read_lock();
28ab034a 295 cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
c617c0c6
MD
296 int ret;
297
2f77fc4b
DG
298 /* Send destroy relayd command */
299 ret = consumer_send_destroy_relayd(socket, consumer);
300 if (ret < 0) {
c5c45efa 301 DBG("Unable to send destroy relayd command to consumer");
2f77fc4b
DG
302 /* Continue since we MUST delete everything at this point. */
303 }
304 }
305 rcu_read_unlock();
306 }
307}
308
a4b92340
DG
309/*
310 * From a consumer_data structure, allocate and add a consumer socket to the
311 * consumer output.
312 *
313 * Return 0 on success, else negative value on error
314 */
28ab034a 315int consumer_create_socket(struct consumer_data *data, struct consumer_output *output)
a4b92340
DG
316{
317 int ret = 0;
318 struct consumer_socket *socket;
319
a0377dfe 320 LTTNG_ASSERT(data);
a4b92340 321
cd9adb8b 322 if (output == nullptr || data->cmd_sock < 0) {
a4b92340
DG
323 /*
324 * Not an error. Possible there is simply not spawned consumer or it's
325 * disabled for the tracing session asking the socket.
326 */
327 goto error;
328 }
329
330 rcu_read_lock();
331 socket = consumer_find_socket(data->cmd_sock, output);
332 rcu_read_unlock();
cd9adb8b 333 if (socket == nullptr) {
4ce514c4 334 socket = consumer_allocate_socket(&data->cmd_sock);
cd9adb8b 335 if (socket == nullptr) {
a4b92340
DG
336 ret = -1;
337 goto error;
338 }
339
2f77fc4b 340 socket->registered = 0;
a4b92340
DG
341 socket->lock = &data->lock;
342 rcu_read_lock();
343 consumer_add_socket(socket, output);
344 rcu_read_unlock();
345 }
346
6dc3064a
DG
347 socket->type = data->type;
348
28ab034a 349 DBG3("Consumer socket created (fd: %d) and added to output", data->cmd_sock);
a4b92340
DG
350
351error:
352 return ret;
353}
354
7972aab2
DG
355/*
356 * Return the consumer socket from the given consumer output with the right
357 * bitness. On error, returns NULL.
358 *
359 * The caller MUST acquire a rcu read side lock and keep it until the socket
360 * object reference is not needed anymore.
361 */
362struct consumer_socket *consumer_find_socket_by_bitness(int bits,
28ab034a 363 const struct consumer_output *consumer)
7972aab2
DG
364{
365 int consumer_fd;
cd9adb8b 366 struct consumer_socket *socket = nullptr;
7972aab2 367
48b7cdc2
FD
368 ASSERT_RCU_READ_LOCKED();
369
7972aab2
DG
370 switch (bits) {
371 case 64:
412d7227 372 consumer_fd = uatomic_read(&the_ust_consumerd64_fd);
7972aab2
DG
373 break;
374 case 32:
412d7227 375 consumer_fd = uatomic_read(&the_ust_consumerd32_fd);
7972aab2
DG
376 break;
377 default:
a0377dfe 378 abort();
7972aab2
DG
379 goto end;
380 }
381
382 socket = consumer_find_socket(consumer_fd, consumer);
383 if (!socket) {
28ab034a 384 ERR("Consumer socket fd %d not found in consumer obj %p", consumer_fd, consumer);
7972aab2
DG
385 }
386
387end:
388 return socket;
389}
390
173af62f
DG
391/*
392 * Find a consumer_socket in a consumer_output hashtable. Read side lock must
393 * be acquired before calling this function and across use of the
394 * returned consumer_socket.
395 */
28ab034a 396struct consumer_socket *consumer_find_socket(int key, const struct consumer_output *consumer)
173af62f
DG
397{
398 struct lttng_ht_iter iter;
399 struct lttng_ht_node_ulong *node;
cd9adb8b 400 struct consumer_socket *socket = nullptr;
173af62f 401
48b7cdc2
FD
402 ASSERT_RCU_READ_LOCKED();
403
173af62f 404 /* Negative keys are lookup failures */
cd9adb8b
JG
405 if (key < 0 || consumer == nullptr) {
406 return nullptr;
173af62f
DG
407 }
408
28ab034a 409 lttng_ht_lookup(consumer->socks, (void *) ((unsigned long) key), &iter);
173af62f 410 node = lttng_ht_iter_get_node_ulong(&iter);
cd9adb8b 411 if (node != nullptr) {
0114db0e 412 socket = lttng::utils::container_of(node, &consumer_socket::node);
173af62f
DG
413 }
414
415 return socket;
416}
417
418/*
419 * Allocate a new consumer_socket and return the pointer.
420 */
4ce514c4 421struct consumer_socket *consumer_allocate_socket(int *fd)
173af62f 422{
cd9adb8b 423 struct consumer_socket *socket = nullptr;
173af62f 424
a0377dfe 425 LTTNG_ASSERT(fd);
4ce514c4 426
64803277 427 socket = zmalloc<consumer_socket>();
cd9adb8b 428 if (socket == nullptr) {
173af62f
DG
429 PERROR("zmalloc consumer socket");
430 goto error;
431 }
432
9363801e 433 socket->fd_ptr = fd;
4ce514c4 434 lttng_ht_node_init_ulong(&socket->node, *fd);
173af62f
DG
435
436error:
437 return socket;
438}
439
440/*
441 * Add consumer socket to consumer output object. Read side lock must be
442 * acquired before calling this function.
443 */
28ab034a 444void consumer_add_socket(struct consumer_socket *sock, struct consumer_output *consumer)
173af62f 445{
a0377dfe
FD
446 LTTNG_ASSERT(sock);
447 LTTNG_ASSERT(consumer);
48b7cdc2 448 ASSERT_RCU_READ_LOCKED();
173af62f
DG
449
450 lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
451}
452
453/*
348a81dc 454 * Delete consumer socket to consumer output object. Read side lock must be
173af62f
DG
455 * acquired before calling this function.
456 */
28ab034a 457void consumer_del_socket(struct consumer_socket *sock, struct consumer_output *consumer)
173af62f
DG
458{
459 int ret;
460 struct lttng_ht_iter iter;
461
a0377dfe
FD
462 LTTNG_ASSERT(sock);
463 LTTNG_ASSERT(consumer);
48b7cdc2 464 ASSERT_RCU_READ_LOCKED();
173af62f
DG
465
466 iter.iter.node = &sock->node.node;
467 ret = lttng_ht_del(consumer->socks, &iter);
a0377dfe 468 LTTNG_ASSERT(!ret);
173af62f
DG
469}
470
471/*
472 * RCU destroy call function.
473 */
474static void destroy_socket_rcu(struct rcu_head *head)
475{
476 struct lttng_ht_node_ulong *node =
0114db0e 477 lttng::utils::container_of(head, &lttng_ht_node_ulong::head);
28ab034a 478 struct consumer_socket *socket = lttng::utils::container_of(node, &consumer_socket::node);
173af62f
DG
479
480 free(socket);
481}
482
483/*
48b7cdc2
FD
484 * Destroy and free socket pointer in a call RCU. The call must either:
485 * - have acquired the read side lock before calling this function, or
486 * - guarantee the validity of the `struct consumer_socket` object for the
487 * duration of the call.
173af62f
DG
488 */
489void consumer_destroy_socket(struct consumer_socket *sock)
490{
a0377dfe 491 LTTNG_ASSERT(sock);
173af62f
DG
492
493 /*
494 * We DO NOT close the file descriptor here since it is global to the
2f77fc4b
DG
495 * session daemon and is closed only if the consumer dies or a custom
496 * consumer was registered,
173af62f 497 */
2f77fc4b 498 if (sock->registered) {
9363801e
DG
499 DBG3("Consumer socket was registered. Closing fd %d", *sock->fd_ptr);
500 lttcomm_close_unix_sock(*sock->fd_ptr);
2f77fc4b 501 }
173af62f
DG
502
503 call_rcu(&sock->node.head, destroy_socket_rcu);
504}
505
00e2e675
DG
506/*
507 * Allocate and assign data to a consumer_output object.
508 *
509 * Return pointer to structure.
510 */
511struct consumer_output *consumer_create_output(enum consumer_dst_type type)
512{
cd9adb8b 513 struct consumer_output *output = nullptr;
00e2e675 514
64803277 515 output = zmalloc<consumer_output>();
cd9adb8b 516 if (output == nullptr) {
00e2e675
DG
517 PERROR("zmalloc consumer_output");
518 goto error;
519 }
520
521 /* By default, consumer output is enabled */
522 output->enabled = 1;
523 output->type = type;
d88aee68 524 output->net_seq_index = (uint64_t) -1ULL;
6addfa37 525 urcu_ref_init(&output->ref);
173af62f
DG
526
527 output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675
DG
528
529error:
530 return output;
531}
532
af706bb7
DG
533/*
534 * Iterate over the consumer output socket hash table and destroy them. The
535 * socket file descriptor are only closed if the consumer output was
536 * registered meaning it's an external consumer.
537 */
538void consumer_destroy_output_sockets(struct consumer_output *obj)
539{
540 struct lttng_ht_iter iter;
541 struct consumer_socket *socket;
542
543 if (!obj->socks) {
544 return;
545 }
546
547 rcu_read_lock();
28ab034a 548 cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
af706bb7
DG
549 consumer_del_socket(socket, obj);
550 consumer_destroy_socket(socket);
551 }
552 rcu_read_unlock();
553}
554
00e2e675
DG
555/*
556 * Delete the consumer_output object from the list and free the ptr.
557 */
6addfa37 558static void consumer_release_output(struct urcu_ref *ref)
00e2e675 559{
28ab034a 560 struct consumer_output *obj = lttng::utils::container_of(ref, &consumer_output::ref);
00e2e675 561
af706bb7 562 consumer_destroy_output_sockets(obj);
2f77fc4b 563
af706bb7 564 if (obj->socks) {
2f77fc4b 565 /* Finally destroy HT */
3c339053 566 lttng_ht_destroy(obj->socks);
00e2e675 567 }
173af62f 568
00e2e675
DG
569 free(obj);
570}
571
6addfa37
MD
572/*
573 * Get the consumer_output object.
574 */
575void consumer_output_get(struct consumer_output *obj)
576{
577 urcu_ref_get(&obj->ref);
578}
579
580/*
581 * Put the consumer_output object.
6addfa37
MD
582 */
583void consumer_output_put(struct consumer_output *obj)
584{
585 if (!obj) {
586 return;
587 }
588 urcu_ref_put(&obj->ref, consumer_release_output);
589}
590
00e2e675
DG
591/*
592 * Copy consumer output and returned the newly allocated copy.
593 */
b178f53e 594struct consumer_output *consumer_copy_output(struct consumer_output *src)
00e2e675 595{
6dc3064a 596 int ret;
00e2e675
DG
597 struct consumer_output *output;
598
a0377dfe 599 LTTNG_ASSERT(src);
00e2e675 600
b178f53e 601 output = consumer_create_output(src->type);
cd9adb8b 602 if (output == nullptr) {
6addfa37 603 goto end;
00e2e675 604 }
b178f53e
JG
605 output->enabled = src->enabled;
606 output->net_seq_index = src->net_seq_index;
28ab034a 607 memcpy(output->domain_subdir, src->domain_subdir, sizeof(output->domain_subdir));
b178f53e
JG
608 output->snapshot = src->snapshot;
609 output->relay_major_version = src->relay_major_version;
610 output->relay_minor_version = src->relay_minor_version;
eacb7b6f 611 output->relay_allows_clear = src->relay_allows_clear;
b178f53e
JG
612 memcpy(&output->dst, &src->dst, sizeof(output->dst));
613 ret = consumer_copy_sockets(output, src);
6dc3064a 614 if (ret < 0) {
6addfa37 615 goto error_put;
6dc3064a 616 }
6addfa37 617end:
6dc3064a
DG
618 return output;
619
6addfa37
MD
620error_put:
621 consumer_output_put(output);
cd9adb8b 622 return nullptr;
6dc3064a
DG
623}
624
625/*
626 * Copy consumer sockets from src to dst.
627 *
628 * Return 0 on success or else a negative value.
629 */
28ab034a 630int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src)
6dc3064a
DG
631{
632 int ret = 0;
633 struct lttng_ht_iter iter;
634 struct consumer_socket *socket, *copy_sock;
635
a0377dfe
FD
636 LTTNG_ASSERT(dst);
637 LTTNG_ASSERT(src);
6dc3064a 638
b82c5c4d 639 rcu_read_lock();
28ab034a 640 cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
6dc3064a 641 /* Ignore socket that are already there. */
9363801e 642 copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
6dc3064a
DG
643 if (copy_sock) {
644 continue;
645 }
646
173af62f 647 /* Create new socket object. */
9363801e 648 copy_sock = consumer_allocate_socket(socket->fd_ptr);
cd9adb8b 649 if (copy_sock == nullptr) {
b82c5c4d 650 rcu_read_unlock();
6dc3064a
DG
651 ret = -ENOMEM;
652 goto error;
173af62f
DG
653 }
654
09a90bcd 655 copy_sock->registered = socket->registered;
6dc3064a
DG
656 /*
657 * This is valid because this lock is shared accross all consumer
658 * object being the global lock of the consumer data structure of the
659 * session daemon.
660 */
173af62f 661 copy_sock->lock = socket->lock;
6dc3064a 662 consumer_add_socket(copy_sock, dst);
173af62f 663 }
b82c5c4d 664 rcu_read_unlock();
173af62f 665
00e2e675 666error:
6dc3064a 667 return ret;
00e2e675
DG
668}
669
670/*
b178f53e 671 * Set network URI to the consumer output.
00e2e675 672 *
ad20f474
DG
673 * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
674 * error.
00e2e675 675 */
b178f53e 676int consumer_set_network_uri(const struct ltt_session *session,
28ab034a
JG
677 struct consumer_output *output,
678 struct lttng_uri *uri)
00e2e675
DG
679{
680 int ret;
cd9adb8b 681 struct lttng_uri *dst_uri = nullptr;
00e2e675
DG
682
683 /* Code flow error safety net. */
a0377dfe
FD
684 LTTNG_ASSERT(output);
685 LTTNG_ASSERT(uri);
00e2e675
DG
686
687 switch (uri->stype) {
688 case LTTNG_STREAM_CONTROL:
b178f53e
JG
689 dst_uri = &output->dst.net.control;
690 output->dst.net.control_isset = 1;
00e2e675
DG
691 if (uri->port == 0) {
692 /* Assign default port. */
693 uri->port = DEFAULT_NETWORK_CONTROL_PORT;
a74934ba 694 } else {
28ab034a 695 if (output->dst.net.data_isset && uri->port == output->dst.net.data.port) {
a74934ba
DG
696 ret = -LTTNG_ERR_INVALID;
697 goto error;
698 }
00e2e675 699 }
ad20f474 700 DBG3("Consumer control URI set with port %d", uri->port);
00e2e675
DG
701 break;
702 case LTTNG_STREAM_DATA:
b178f53e
JG
703 dst_uri = &output->dst.net.data;
704 output->dst.net.data_isset = 1;
00e2e675
DG
705 if (uri->port == 0) {
706 /* Assign default port. */
707 uri->port = DEFAULT_NETWORK_DATA_PORT;
a74934ba 708 } else {
28ab034a
JG
709 if (output->dst.net.control_isset &&
710 uri->port == output->dst.net.control.port) {
a74934ba
DG
711 ret = -LTTNG_ERR_INVALID;
712 goto error;
713 }
00e2e675 714 }
ad20f474 715 DBG3("Consumer data URI set with port %d", uri->port);
00e2e675
DG
716 break;
717 default:
718 ERR("Set network uri type unknown %d", uri->stype);
a74934ba 719 ret = -LTTNG_ERR_INVALID;
00e2e675
DG
720 goto error;
721 }
722
723 ret = uri_compare(dst_uri, uri);
724 if (!ret) {
725 /* Same URI, don't touch it and return success. */
726 DBG3("URI network compare are the same");
ad20f474 727 goto equal;
00e2e675
DG
728 }
729
730 /* URIs were not equal, replacing it. */
00e2e675 731 memcpy(dst_uri, uri, sizeof(struct lttng_uri));
b178f53e
JG
732 output->type = CONSUMER_DST_NET;
733 if (dst_uri->stype != LTTNG_STREAM_CONTROL) {
734 /* Only the control uri needs to contain the path. */
735 goto end;
736 }
00e2e675 737
b178f53e
JG
738 /*
739 * If the user has specified a subdir as part of the control
740 * URL, the session's base output directory is:
741 * /RELAYD_OUTPUT_PATH/HOSTNAME/USER_SPECIFIED_DIR
742 *
743 * Hence, the "base_dir" from which all stream files and
744 * session rotation chunks are created takes the form
745 * /HOSTNAME/USER_SPECIFIED_DIR
746 *
747 * If the user has not specified an output directory as part of
748 * the control URL, the base output directory has the form:
749 * /RELAYD_OUTPUT_PATH/HOSTNAME/SESSION_NAME-CREATION_TIME
750 *
751 * Hence, the "base_dir" from which all stream files and
752 * session rotation chunks are created takes the form
753 * /HOSTNAME/SESSION_NAME-CREATION_TIME
754 *
755 * Note that automatically generated session names already
756 * contain the session's creation time. In that case, the
757 * creation time is omitted to prevent it from being duplicated
758 * in the final directory hierarchy.
759 */
760 if (*uri->subdir) {
761 if (strstr(uri->subdir, "../")) {
762 ERR("Network URI subdirs are not allowed to walk up the path hierarchy");
763 ret = -LTTNG_ERR_INVALID;
00e2e675
DG
764 goto error;
765 }
b178f53e 766 ret = snprintf(output->dst.net.base_dir,
28ab034a
JG
767 sizeof(output->dst.net.base_dir),
768 "/%s/%s/",
769 session->hostname,
770 uri->subdir);
b178f53e
JG
771 } else {
772 if (session->has_auto_generated_name) {
773 ret = snprintf(output->dst.net.base_dir,
28ab034a
JG
774 sizeof(output->dst.net.base_dir),
775 "/%s/%s/",
776 session->hostname,
777 session->name);
b178f53e
JG
778 } else {
779 char session_creation_datetime[16];
780 size_t strftime_ret;
781 struct tm *timeinfo;
00e2e675 782
b178f53e
JG
783 timeinfo = localtime(&session->creation_time);
784 if (!timeinfo) {
785 ret = -LTTNG_ERR_FATAL;
786 goto error;
787 }
788 strftime_ret = strftime(session_creation_datetime,
28ab034a
JG
789 sizeof(session_creation_datetime),
790 "%Y%m%d-%H%M%S",
791 timeinfo);
b178f53e
JG
792 if (strftime_ret == 0) {
793 ERR("Failed to format session creation timestamp while setting network URI");
794 ret = -LTTNG_ERR_FATAL;
795 goto error;
796 }
797 ret = snprintf(output->dst.net.base_dir,
28ab034a
JG
798 sizeof(output->dst.net.base_dir),
799 "/%s/%s-%s/",
800 session->hostname,
801 session->name,
802 session_creation_datetime);
bfc6eff0 803 }
00e2e675 804 }
b178f53e
JG
805 if (ret >= sizeof(output->dst.net.base_dir)) {
806 ret = -LTTNG_ERR_INVALID;
807 ERR("Truncation occurred while setting network output base directory");
808 goto error;
809 } else if (ret == -1) {
810 ret = -LTTNG_ERR_INVALID;
811 PERROR("Error occurred while setting network output base directory");
812 goto error;
813 }
814
28ab034a 815 DBG3("Consumer set network uri base_dir path %s", output->dst.net.base_dir);
00e2e675 816
b178f53e 817end:
00e2e675 818 return 0;
ad20f474
DG
819equal:
820 return 1;
00e2e675 821error:
a74934ba 822 return ret;
00e2e675
DG
823}
824
825/*
826 * Send file descriptor to consumer via sock.
9a318688
JG
827 *
828 * The consumer socket lock must be held by the caller.
00e2e675 829 */
28ab034a 830int consumer_send_fds(struct consumer_socket *sock, const int *fds, size_t nb_fd)
00e2e675
DG
831{
832 int ret;
833
a0377dfe
FD
834 LTTNG_ASSERT(fds);
835 LTTNG_ASSERT(sock);
836 LTTNG_ASSERT(nb_fd > 0);
837 LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY);
00e2e675 838
9363801e 839 ret = lttcomm_send_fds_unix_sock(*sock->fd_ptr, fds, nb_fd);
00e2e675 840 if (ret < 0) {
3448e266 841 /* The above call will print a PERROR on error. */
9363801e 842 DBG("Error when sending consumer fds on sock %d", *sock->fd_ptr);
00e2e675
DG
843 goto error;
844 }
845
f50f23d9 846 ret = consumer_recv_status_reply(sock);
00e2e675
DG
847error:
848 return ret;
849}
850
ffe60014
DG
851/*
852 * Consumer send communication message structure to consumer.
9a318688
JG
853 *
854 * The consumer socket lock must be held by the caller.
ffe60014 855 */
28ab034a 856int consumer_send_msg(struct consumer_socket *sock, const struct lttcomm_consumer_msg *msg)
ffe60014
DG
857{
858 int ret;
859
a0377dfe
FD
860 LTTNG_ASSERT(msg);
861 LTTNG_ASSERT(sock);
862 LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY);
ffe60014 863
52898cb1 864 ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg));
ffe60014 865 if (ret < 0) {
ffe60014
DG
866 goto error;
867 }
868
869 ret = consumer_recv_status_reply(sock);
870
871error:
872 return ret;
873}
874
00e2e675
DG
875/*
876 * Consumer send channel communication message structure to consumer.
9a318688
JG
877 *
878 * The consumer socket lock must be held by the caller.
00e2e675 879 */
28ab034a 880int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg)
00e2e675
DG
881{
882 int ret;
883
a0377dfe
FD
884 LTTNG_ASSERT(msg);
885 LTTNG_ASSERT(sock);
00e2e675 886
52898cb1 887 ret = consumer_send_msg(sock, msg);
00e2e675 888 if (ret < 0) {
00e2e675
DG
889 goto error;
890 }
891
892error:
893 return ret;
894}
895
ffe60014
DG
896/*
897 * Populate the given consumer msg structure with the ask_channel command
898 * information.
899 */
900void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
28ab034a
JG
901 uint64_t subbuf_size,
902 uint64_t num_subbuf,
903 int overwrite,
904 unsigned int switch_timer_interval,
905 unsigned int read_timer_interval,
906 unsigned int live_timer_interval,
907 bool is_in_live_session,
908 unsigned int monitor_timer_interval,
909 int output,
910 int type,
911 uint64_t session_id,
912 const char *pathname,
913 const char *name,
914 uint64_t relayd_id,
915 uint64_t key,
916 const lttng_uuid& uuid,
917 uint32_t chan_id,
918 uint64_t tracefile_size,
919 uint64_t tracefile_count,
920 uint64_t session_id_per_pid,
921 unsigned int monitor,
922 uint32_t ust_app_uid,
923 int64_t blocking_timeout,
924 const char *root_shm_path,
925 const char *shm_path,
926 struct lttng_trace_chunk *trace_chunk,
927 const struct lttng_credentials *buffer_credentials)
ffe60014 928{
a0377dfe 929 LTTNG_ASSERT(msg);
ffe60014 930
26c468bb 931 /* Zeroed structure */
ffe60014 932 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
d2956687
JG
933 msg->u.ask_channel.buffer_credentials.uid = UINT32_MAX;
934 msg->u.ask_channel.buffer_credentials.gid = UINT32_MAX;
935
26c468bb 936 if (trace_chunk) {
d2956687
JG
937 uint64_t chunk_id;
938 enum lttng_trace_chunk_status chunk_status;
d2956687
JG
939
940 chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id);
a0377dfe 941 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 942 LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id);
26c468bb 943 }
28ab034a
JG
944 msg->u.ask_channel.buffer_credentials.uid = lttng_credentials_get_uid(buffer_credentials);
945 msg->u.ask_channel.buffer_credentials.gid = lttng_credentials_get_gid(buffer_credentials);
ffe60014
DG
946
947 msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION;
948 msg->u.ask_channel.subbuf_size = subbuf_size;
28ab034a 949 msg->u.ask_channel.num_subbuf = num_subbuf;
ffe60014
DG
950 msg->u.ask_channel.overwrite = overwrite;
951 msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
952 msg->u.ask_channel.read_timer_interval = read_timer_interval;
ecc48a90 953 msg->u.ask_channel.live_timer_interval = live_timer_interval;
a2814ea7 954 msg->u.ask_channel.is_live = is_in_live_session;
e9404c27 955 msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval;
ffe60014
DG
956 msg->u.ask_channel.output = output;
957 msg->u.ask_channel.type = type;
958 msg->u.ask_channel.session_id = session_id;
1950109e 959 msg->u.ask_channel.session_id_per_pid = session_id_per_pid;
ffe60014
DG
960 msg->u.ask_channel.relayd_id = relayd_id;
961 msg->u.ask_channel.key = key;
7972aab2 962 msg->u.ask_channel.chan_id = chan_id;
1624d5b7
JD
963 msg->u.ask_channel.tracefile_size = tracefile_size;
964 msg->u.ask_channel.tracefile_count = tracefile_count;
2bba9e53 965 msg->u.ask_channel.monitor = monitor;
567eb353 966 msg->u.ask_channel.ust_app_uid = ust_app_uid;
491d1539 967 msg->u.ask_channel.blocking_timeout = blocking_timeout;
ffe60014 968
328c2fe7 969 std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid);
ffe60014 970
10a50311 971 if (pathname) {
28ab034a
JG
972 strncpy(msg->u.ask_channel.pathname, pathname, sizeof(msg->u.ask_channel.pathname));
973 msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname) - 1] = '\0';
10a50311 974 }
ffe60014
DG
975
976 strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name));
977 msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0';
d7ba1388 978
3d071855 979 if (root_shm_path) {
28ab034a
JG
980 strncpy(msg->u.ask_channel.root_shm_path,
981 root_shm_path,
3d071855 982 sizeof(msg->u.ask_channel.root_shm_path));
28ab034a
JG
983 msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] =
984 '\0';
3d071855 985 }
d7ba1388 986 if (shm_path) {
28ab034a 987 strncpy(msg->u.ask_channel.shm_path, shm_path, sizeof(msg->u.ask_channel.shm_path));
d7ba1388
MD
988 msg->u.ask_channel.shm_path[sizeof(msg->u.ask_channel.shm_path) - 1] = '\0';
989 }
ffe60014
DG
990}
991
00e2e675
DG
992/*
993 * Init channel communication message structure.
994 */
638e7b4e 995void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
28ab034a
JG
996 uint64_t channel_key,
997 uint64_t session_id,
998 const char *pathname,
999 uint64_t relayd_id,
1000 const char *name,
1001 unsigned int nb_init_streams,
1002 enum lttng_event_output output,
1003 int type,
1004 uint64_t tracefile_size,
1005 uint64_t tracefile_count,
1006 unsigned int monitor,
1007 unsigned int live_timer_interval,
1008 bool is_in_live_session,
1009 unsigned int monitor_timer_interval,
1010 struct lttng_trace_chunk *trace_chunk)
00e2e675 1011{
a0377dfe 1012 LTTNG_ASSERT(msg);
00e2e675 1013
00e2e675
DG
1014 /* Zeroed structure */
1015 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1016
26c468bb 1017 if (trace_chunk) {
d2956687
JG
1018 uint64_t chunk_id;
1019 enum lttng_trace_chunk_status chunk_status;
1020
1021 chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id);
a0377dfe 1022 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 1023 LTTNG_OPTIONAL_SET(&msg->u.channel.chunk_id, chunk_id);
26c468bb 1024 }
d2956687 1025
00e2e675 1026 /* Send channel */
638e7b4e 1027 msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
00e2e675 1028 msg->u.channel.channel_key = channel_key;
ffe60014 1029 msg->u.channel.session_id = session_id;
ffe60014 1030 msg->u.channel.relayd_id = relayd_id;
c30aaa51 1031 msg->u.channel.nb_init_streams = nb_init_streams;
ffe60014
DG
1032 msg->u.channel.output = output;
1033 msg->u.channel.type = type;
1624d5b7
JD
1034 msg->u.channel.tracefile_size = tracefile_size;
1035 msg->u.channel.tracefile_count = tracefile_count;
2bba9e53 1036 msg->u.channel.monitor = monitor;
ecc48a90 1037 msg->u.channel.live_timer_interval = live_timer_interval;
a2814ea7 1038 msg->u.channel.is_live = is_in_live_session;
e9404c27 1039 msg->u.channel.monitor_timer_interval = monitor_timer_interval;
ffe60014 1040
28ab034a 1041 strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname));
ffe60014
DG
1042 msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0';
1043
1044 strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name));
1045 msg->u.channel.name[sizeof(msg->u.channel.name) - 1] = '\0';
00e2e675
DG
1046}
1047
1048/*
1049 * Init stream communication message structure.
1050 */
e098433c 1051void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg,
28ab034a
JG
1052 uint64_t channel_key,
1053 uint64_t stream_key,
1054 int32_t cpu)
00e2e675 1055{
a0377dfe 1056 LTTNG_ASSERT(msg);
00e2e675
DG
1057
1058 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1059
e098433c 1060 msg->cmd_type = LTTNG_CONSUMER_ADD_STREAM;
00e2e675
DG
1061 msg->u.stream.channel_key = channel_key;
1062 msg->u.stream.stream_key = stream_key;
ffe60014 1063 msg->u.stream.cpu = cpu;
00e2e675
DG
1064}
1065
a4baae1b 1066void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
28ab034a
JG
1067 enum lttng_consumer_command cmd,
1068 uint64_t channel_key,
1069 uint64_t net_seq_idx)
a4baae1b 1070{
a0377dfe 1071 LTTNG_ASSERT(msg);
a4baae1b
JD
1072
1073 memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
1074
1075 msg->cmd_type = cmd;
1076 msg->u.sent_streams.channel_key = channel_key;
1077 msg->u.sent_streams.net_seq_idx = net_seq_idx;
1078}
1079
00e2e675
DG
1080/*
1081 * Send stream communication structure to the consumer.
1082 */
f50f23d9 1083int consumer_send_stream(struct consumer_socket *sock,
28ab034a
JG
1084 struct consumer_output *dst,
1085 struct lttcomm_consumer_msg *msg,
1086 const int *fds,
1087 size_t nb_fd)
00e2e675
DG
1088{
1089 int ret;
1090
a0377dfe
FD
1091 LTTNG_ASSERT(msg);
1092 LTTNG_ASSERT(dst);
1093 LTTNG_ASSERT(sock);
1094 LTTNG_ASSERT(fds);
00e2e675 1095
52898cb1 1096 ret = consumer_send_msg(sock, msg);
f50f23d9
DG
1097 if (ret < 0) {
1098 goto error;
1099 }
1100
00e2e675
DG
1101 ret = consumer_send_fds(sock, fds, nb_fd);
1102 if (ret < 0) {
1103 goto error;
1104 }
1105
1106error:
1107 return ret;
1108}
37278a1e
DG
1109
1110/*
1111 * Send relayd socket to consumer associated with a session name.
1112 *
43fade62
JG
1113 * The consumer socket lock must be held by the caller.
1114 *
37278a1e
DG
1115 * On success return positive value. On error, negative value.
1116 */
f50f23d9 1117int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
28ab034a
JG
1118 struct lttcomm_relayd_sock *rsock,
1119 struct consumer_output *consumer,
1120 enum lttng_stream_type type,
1121 uint64_t session_id,
1122 const char *session_name,
1123 const char *hostname,
1124 const char *base_path,
1125 int session_live_timer,
1126 const uint64_t *current_chunk_id,
1127 time_t session_creation_time,
1128 bool session_name_contains_creation_time)
37278a1e
DG
1129{
1130 int ret;
7966af57 1131 int fd;
37278a1e
DG
1132 struct lttcomm_consumer_msg msg;
1133
1134 /* Code flow error. Safety net. */
a0377dfe
FD
1135 LTTNG_ASSERT(rsock);
1136 LTTNG_ASSERT(consumer);
1137 LTTNG_ASSERT(consumer_sock);
37278a1e 1138
53efb85a 1139 memset(&msg, 0, sizeof(msg));
37278a1e
DG
1140 /* Bail out if consumer is disabled */
1141 if (!consumer->enabled) {
f73fabfd 1142 ret = LTTNG_OK;
37278a1e
DG
1143 goto error;
1144 }
1145
d3e2ba59 1146 if (type == LTTNG_STREAM_CONTROL) {
ecd1a12f 1147 char output_path[LTTNG_PATH_MAX] = {};
07aa2e42 1148 uint64_t relayd_session_id;
ecd1a12f 1149
28ab034a
JG
1150 ret = relayd_create_session(rsock,
1151 &relayd_session_id,
1152 session_name,
1153 hostname,
1154 base_path,
1155 session_live_timer,
1156 consumer->snapshot,
1157 session_id,
1158 the_sessiond_uuid,
1159 current_chunk_id,
1160 session_creation_time,
1161 session_name_contains_creation_time,
1162 output_path);
d3e2ba59
JD
1163 if (ret < 0) {
1164 /* Close the control socket. */
1165 (void) relayd_close(rsock);
1166 goto error;
1167 }
07aa2e42 1168 msg.u.relayd_sock.relayd_session_id = relayd_session_id;
28ab034a 1169 DBG("Created session on relay, output path reply: %s", output_path);
d3e2ba59
JD
1170 }
1171
37278a1e
DG
1172 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
1173 /*
1174 * Assign network consumer output index using the temporary consumer since
1175 * this call should only be made from within a set_consumer_uri() function
1176 * call in the session daemon.
1177 */
1178 msg.u.relayd_sock.net_index = consumer->net_seq_index;
1179 msg.u.relayd_sock.type = type;
46e6455f 1180 msg.u.relayd_sock.session_id = session_id;
4222116f
JR
1181 msg.u.relayd_sock.major = rsock->major;
1182 msg.u.relayd_sock.minor = rsock->minor;
1183 msg.u.relayd_sock.relayd_socket_protocol = rsock->sock.proto;
37278a1e 1184
9363801e 1185 DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd_ptr);
52898cb1 1186 ret = consumer_send_msg(consumer_sock, &msg);
f50f23d9
DG
1187 if (ret < 0) {
1188 goto error;
1189 }
1190
37278a1e 1191 DBG3("Sending relayd socket file descriptor to consumer");
7966af57
SM
1192 fd = rsock->sock.fd;
1193 ret = consumer_send_fds(consumer_sock, &fd, 1);
37278a1e
DG
1194 if (ret < 0) {
1195 goto error;
1196 }
1197
1198 DBG2("Consumer relayd socket sent");
1199
1200error:
1201 return ret;
1202}
173af62f 1203
28ab034a
JG
1204static int
1205consumer_send_pipe(struct consumer_socket *consumer_sock, enum lttng_consumer_command cmd, int pipe)
e9404c27
JG
1206{
1207 int ret;
1208 struct lttcomm_consumer_msg msg;
62c43103
JD
1209 const char *pipe_name;
1210 const char *command_name;
1211
1212 switch (cmd) {
1213 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1214 pipe_name = "channel monitor";
1215 command_name = "SET_CHANNEL_MONITOR_PIPE";
1216 break;
62c43103 1217 default:
28ab034a 1218 ERR("Unexpected command received in %s (cmd = %d)", __func__, (int) cmd);
62c43103
JD
1219 abort();
1220 }
e9404c27
JG
1221
1222 /* Code flow error. Safety net. */
1223
1224 memset(&msg, 0, sizeof(msg));
62c43103 1225 msg.cmd_type = cmd;
e9404c27 1226
3e4dc117 1227 pthread_mutex_lock(consumer_sock->lock);
62c43103 1228 DBG3("Sending %s command to consumer", command_name);
e9404c27
JG
1229 ret = consumer_send_msg(consumer_sock, &msg);
1230 if (ret < 0) {
1231 goto error;
1232 }
1233
28ab034a 1234 DBG3("Sending %s pipe %d to consumer on socket %d", pipe_name, pipe, *consumer_sock->fd_ptr);
e9404c27
JG
1235 ret = consumer_send_fds(consumer_sock, &pipe, 1);
1236 if (ret < 0) {
1237 goto error;
1238 }
1239
62c43103 1240 DBG2("%s pipe successfully sent", pipe_name);
e9404c27 1241error:
3e4dc117 1242 pthread_mutex_unlock(consumer_sock->lock);
e9404c27
JG
1243 return ret;
1244}
1245
28ab034a 1246int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, int pipe)
62c43103 1247{
28ab034a 1248 return consumer_send_pipe(consumer_sock, LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe);
62c43103
JD
1249}
1250
806e2684 1251/*
5e280d77
MD
1252 * Ask the consumer if the data is pending for the specific session id.
1253 * Returns 1 if data is pending, 0 otherwise, or < 0 on error.
806e2684 1254 */
28ab034a 1255int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer)
806e2684
DG
1256{
1257 int ret;
28ab034a 1258 int32_t ret_code = 0; /* Default is that the data is NOT pending */
806e2684
DG
1259 struct consumer_socket *socket;
1260 struct lttng_ht_iter iter;
1261 struct lttcomm_consumer_msg msg;
1262
a0377dfe 1263 LTTNG_ASSERT(consumer);
806e2684 1264
53efb85a 1265 DBG3("Consumer data pending for id %" PRIu64, session_id);
806e2684 1266
53efb85a
MD
1267 memset(&msg, 0, sizeof(msg));
1268 msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
d88aee68 1269 msg.u.data_pending.session_id = session_id;
806e2684 1270
c8f59ee5 1271 /* Send command for each consumer */
b82c5c4d 1272 rcu_read_lock();
28ab034a 1273 cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
806e2684 1274 pthread_mutex_lock(socket->lock);
52898cb1 1275 ret = consumer_socket_send(socket, &msg, sizeof(msg));
806e2684 1276 if (ret < 0) {
806e2684 1277 pthread_mutex_unlock(socket->lock);
b82c5c4d 1278 goto error_unlock;
806e2684
DG
1279 }
1280
f50f23d9
DG
1281 /*
1282 * No need for a recv reply status because the answer to the command is
1283 * the reply status message.
1284 */
1285
52898cb1
DG
1286 ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
1287 if (ret < 0) {
806e2684 1288 pthread_mutex_unlock(socket->lock);
b82c5c4d 1289 goto error_unlock;
806e2684 1290 }
806e2684
DG
1291 pthread_mutex_unlock(socket->lock);
1292
6d805429 1293 if (ret_code == 1) {
806e2684
DG
1294 break;
1295 }
1296 }
b82c5c4d 1297 rcu_read_unlock();
806e2684 1298
d88aee68 1299 DBG("Consumer data is %s pending for session id %" PRIu64,
28ab034a
JG
1300 ret_code == 1 ? "" : "NOT",
1301 session_id);
806e2684
DG
1302 return ret_code;
1303
b82c5c4d
DG
1304error_unlock:
1305 rcu_read_unlock();
806e2684
DG
1306 return -1;
1307}
7972aab2
DG
1308
1309/*
1310 * Send a flush command to consumer using the given channel key.
1311 *
1312 * Return 0 on success else a negative value.
1313 */
1314int consumer_flush_channel(struct consumer_socket *socket, uint64_t key)
1315{
1316 int ret;
1317 struct lttcomm_consumer_msg msg;
1318
a0377dfe 1319 LTTNG_ASSERT(socket);
7972aab2
DG
1320
1321 DBG2("Consumer flush channel key %" PRIu64, key);
1322
53efb85a 1323 memset(&msg, 0, sizeof(msg));
7972aab2
DG
1324 msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL;
1325 msg.u.flush_channel.key = key;
1326
1327 pthread_mutex_lock(socket->lock);
1328 health_code_update();
1329
1330 ret = consumer_send_msg(socket, &msg);
1331 if (ret < 0) {
1332 goto end;
1333 }
1334
1335end:
1336 health_code_update();
1337 pthread_mutex_unlock(socket->lock);
1338 return ret;
1339}
1340
0dd01979
MD
1341/*
1342 * Send a clear quiescent command to consumer using the given channel key.
1343 *
1344 * Return 0 on success else a negative value.
1345 */
1346int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key)
1347{
1348 int ret;
1349 struct lttcomm_consumer_msg msg;
1350
a0377dfe 1351 LTTNG_ASSERT(socket);
0dd01979
MD
1352
1353 DBG2("Consumer clear quiescent channel key %" PRIu64, key);
1354
1355 memset(&msg, 0, sizeof(msg));
1356 msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL;
1357 msg.u.clear_quiescent_channel.key = key;
1358
1359 pthread_mutex_lock(socket->lock);
1360 health_code_update();
1361
1362 ret = consumer_send_msg(socket, &msg);
1363 if (ret < 0) {
1364 goto end;
1365 }
1366
1367end:
1368 health_code_update();
1369 pthread_mutex_unlock(socket->lock);
1370 return ret;
1371}
1372
7972aab2 1373/*
dc2bbdae
MD
1374 * Send a close metadata command to consumer using the given channel key.
1375 * Called with registry lock held.
7972aab2
DG
1376 *
1377 * Return 0 on success else a negative value.
1378 */
28ab034a 1379int consumer_close_metadata(struct consumer_socket *socket, uint64_t metadata_key)
7972aab2
DG
1380{
1381 int ret;
1382 struct lttcomm_consumer_msg msg;
1383
a0377dfe 1384 LTTNG_ASSERT(socket);
7972aab2
DG
1385
1386 DBG2("Consumer close metadata channel key %" PRIu64, metadata_key);
1387
53efb85a 1388 memset(&msg, 0, sizeof(msg));
7972aab2
DG
1389 msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
1390 msg.u.close_metadata.key = metadata_key;
1391
1392 pthread_mutex_lock(socket->lock);
1393 health_code_update();
1394
1395 ret = consumer_send_msg(socket, &msg);
1396 if (ret < 0) {
1397 goto end;
1398 }
1399
1400end:
1401 health_code_update();
1402 pthread_mutex_unlock(socket->lock);
1403 return ret;
1404}
1405
1406/*
1407 * Send a setup metdata command to consumer using the given channel key.
1408 *
1409 * Return 0 on success else a negative value.
1410 */
28ab034a 1411int consumer_setup_metadata(struct consumer_socket *socket, uint64_t metadata_key)
7972aab2
DG
1412{
1413 int ret;
1414 struct lttcomm_consumer_msg msg;
1415
a0377dfe 1416 LTTNG_ASSERT(socket);
7972aab2
DG
1417
1418 DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key);
1419
53efb85a 1420 memset(&msg, 0, sizeof(msg));
7972aab2
DG
1421 msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
1422 msg.u.setup_metadata.key = metadata_key;
1423
1424 pthread_mutex_lock(socket->lock);
1425 health_code_update();
1426
1427 ret = consumer_send_msg(socket, &msg);
1428 if (ret < 0) {
1429 goto end;
1430 }
1431
1432end:
1433 health_code_update();
1434 pthread_mutex_unlock(socket->lock);
1435 return ret;
1436}
1437
1438/*
dc2bbdae
MD
1439 * Send metadata string to consumer.
1440 * RCU read-side lock must be held to guarantee existence of socket.
7972aab2
DG
1441 *
1442 * Return 0 on success else a negative value.
1443 */
1444int consumer_push_metadata(struct consumer_socket *socket,
28ab034a
JG
1445 uint64_t metadata_key,
1446 char *metadata_str,
1447 size_t len,
1448 size_t target_offset,
1449 uint64_t version)
7972aab2
DG
1450{
1451 int ret;
1452 struct lttcomm_consumer_msg msg;
1453
a0377dfe 1454 LTTNG_ASSERT(socket);
48b7cdc2 1455 ASSERT_RCU_READ_LOCKED();
7972aab2 1456
9363801e 1457 DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr);
7972aab2 1458
dc2bbdae
MD
1459 pthread_mutex_lock(socket->lock);
1460
53efb85a 1461 memset(&msg, 0, sizeof(msg));
7972aab2
DG
1462 msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
1463 msg.u.push_metadata.key = metadata_key;
1464 msg.u.push_metadata.target_offset = target_offset;
1465 msg.u.push_metadata.len = len;
93ec662e 1466 msg.u.push_metadata.version = version;
7972aab2 1467
7972aab2
DG
1468 health_code_update();
1469 ret = consumer_send_msg(socket, &msg);
331744e3 1470 if (ret < 0 || len == 0) {
7972aab2
DG
1471 goto end;
1472 }
1473
28ab034a 1474 DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd_ptr, len);
7972aab2 1475
52898cb1 1476 ret = consumer_socket_send(socket, metadata_str, len);
7972aab2
DG
1477 if (ret < 0) {
1478 goto end;
1479 }
1480
1481 health_code_update();
1482 ret = consumer_recv_status_reply(socket);
1483 if (ret < 0) {
1484 goto end;
1485 }
1486
1487end:
dc2bbdae 1488 pthread_mutex_unlock(socket->lock);
7972aab2 1489 health_code_update();
7972aab2
DG
1490 return ret;
1491}
6dc3064a
DG
1492
1493/*
1494 * Ask the consumer to snapshot a specific channel using the key.
1495 *
9a654598 1496 * Returns LTTNG_OK on success or else an LTTng error code.
6dc3064a 1497 */
9a654598 1498enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket,
28ab034a
JG
1499 uint64_t key,
1500 const struct consumer_output *output,
1501 int metadata,
1502 const char *channel_path,
1503 uint64_t nb_packets_per_stream)
6dc3064a
DG
1504{
1505 int ret;
9a654598 1506 enum lttng_error_code status = LTTNG_OK;
6dc3064a
DG
1507 struct lttcomm_consumer_msg msg;
1508
a0377dfe
FD
1509 LTTNG_ASSERT(socket);
1510 LTTNG_ASSERT(output);
6dc3064a
DG
1511
1512 DBG("Consumer snapshot channel key %" PRIu64, key);
1513
ee91bab2 1514 memset(&msg, 0, sizeof(msg));
6dc3064a
DG
1515 msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
1516 msg.u.snapshot_channel.key = key;
d07ceecd 1517 msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
6dc3064a
DG
1518 msg.u.snapshot_channel.metadata = metadata;
1519
348a81dc 1520 if (output->type == CONSUMER_DST_NET) {
28ab034a 1521 msg.u.snapshot_channel.relayd_id = output->net_seq_index;
6dc3064a 1522 msg.u.snapshot_channel.use_relayd = 1;
6dc3064a 1523 } else {
07b86b52 1524 msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL;
d2956687
JG
1525 }
1526 ret = lttng_strncpy(msg.u.snapshot_channel.pathname,
28ab034a
JG
1527 channel_path,
1528 sizeof(msg.u.snapshot_channel.pathname));
d2956687
JG
1529 if (ret < 0) {
1530 ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%zu bytes required) with path \"%s\"",
28ab034a
JG
1531 sizeof(msg.u.snapshot_channel.pathname),
1532 strlen(channel_path),
1533 channel_path);
d2956687
JG
1534 status = LTTNG_ERR_SNAPSHOT_FAIL;
1535 goto error;
6dc3064a
DG
1536 }
1537
1538 health_code_update();
9d1103e6 1539 pthread_mutex_lock(socket->lock);
6dc3064a 1540 ret = consumer_send_msg(socket, &msg);
9d1103e6 1541 pthread_mutex_unlock(socket->lock);
6dc3064a 1542 if (ret < 0) {
9bbfb88c
MD
1543 switch (-ret) {
1544 case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
9a654598 1545 status = LTTNG_ERR_CHAN_NOT_FOUND;
9bbfb88c
MD
1546 break;
1547 default:
9a654598 1548 status = LTTNG_ERR_SNAPSHOT_FAIL;
9bbfb88c
MD
1549 break;
1550 }
6dc3064a
DG
1551 goto error;
1552 }
1553
1554error:
1555 health_code_update();
9a654598 1556 return status;
6dc3064a 1557}
fb83fe64
JD
1558
1559/*
1560 * Ask the consumer the number of discarded events for a channel.
1561 */
28ab034a
JG
1562int consumer_get_discarded_events(uint64_t session_id,
1563 uint64_t channel_key,
1564 struct consumer_output *consumer,
1565 uint64_t *discarded)
fb83fe64
JD
1566{
1567 int ret;
1568 struct consumer_socket *socket;
1569 struct lttng_ht_iter iter;
1570 struct lttcomm_consumer_msg msg;
1571
a0377dfe 1572 LTTNG_ASSERT(consumer);
fb83fe64
JD
1573
1574 DBG3("Consumer discarded events id %" PRIu64, session_id);
1575
1576 memset(&msg, 0, sizeof(msg));
1577 msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS;
1578 msg.u.discarded_events.session_id = session_id;
1579 msg.u.discarded_events.channel_key = channel_key;
1580
1581 *discarded = 0;
1582
1583 /* Send command for each consumer */
1584 rcu_read_lock();
28ab034a 1585 cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
fb83fe64
JD
1586 uint64_t consumer_discarded = 0;
1587 pthread_mutex_lock(socket->lock);
1588 ret = consumer_socket_send(socket, &msg, sizeof(msg));
1589 if (ret < 0) {
1590 pthread_mutex_unlock(socket->lock);
1591 goto end;
1592 }
1593
1594 /*
1595 * No need for a recv reply status because the answer to the
1596 * command is the reply status message.
1597 */
28ab034a 1598 ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
fb83fe64
JD
1599 if (ret < 0) {
1600 ERR("get discarded events");
1601 pthread_mutex_unlock(socket->lock);
1602 goto end;
1603 }
1604 pthread_mutex_unlock(socket->lock);
1605 *discarded += consumer_discarded;
1606 }
1607 ret = 0;
28ab034a 1608 DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id);
fb83fe64
JD
1609
1610end:
1611 rcu_read_unlock();
1612 return ret;
1613}
1614
1615/*
1616 * Ask the consumer the number of lost packets for a channel.
1617 */
28ab034a
JG
1618int consumer_get_lost_packets(uint64_t session_id,
1619 uint64_t channel_key,
1620 struct consumer_output *consumer,
1621 uint64_t *lost)
fb83fe64
JD
1622{
1623 int ret;
1624 struct consumer_socket *socket;
1625 struct lttng_ht_iter iter;
1626 struct lttcomm_consumer_msg msg;
1627
a0377dfe 1628 LTTNG_ASSERT(consumer);
fb83fe64
JD
1629
1630 DBG3("Consumer lost packets id %" PRIu64, session_id);
1631
1632 memset(&msg, 0, sizeof(msg));
1633 msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS;
1634 msg.u.lost_packets.session_id = session_id;
1635 msg.u.lost_packets.channel_key = channel_key;
1636
1637 *lost = 0;
1638
1639 /* Send command for each consumer */
1640 rcu_read_lock();
28ab034a 1641 cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
fb83fe64
JD
1642 uint64_t consumer_lost = 0;
1643 pthread_mutex_lock(socket->lock);
1644 ret = consumer_socket_send(socket, &msg, sizeof(msg));
1645 if (ret < 0) {
1646 pthread_mutex_unlock(socket->lock);
1647 goto end;
1648 }
1649
1650 /*
1651 * No need for a recv reply status because the answer to the
1652 * command is the reply status message.
1653 */
28ab034a 1654 ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
fb83fe64
JD
1655 if (ret < 0) {
1656 ERR("get lost packets");
1657 pthread_mutex_unlock(socket->lock);
1658 goto end;
1659 }
1660 pthread_mutex_unlock(socket->lock);
1661 *lost += consumer_lost;
1662 }
1663 ret = 0;
28ab034a 1664 DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id);
fb83fe64
JD
1665
1666end:
1667 rcu_read_unlock();
1668 return ret;
1669}
a1ae2ea5 1670
5c408ad8
JD
1671/*
1672 * Ask the consumer to rotate a channel.
5c408ad8
JD
1673 *
1674 * The new_chunk_id is the session->rotate_count that has been incremented
1675 * when the rotation started. On the relay, this allows to keep track in which
1676 * chunk each stream is currently writing to (for the rotate_pending operation).
1677 */
28ab034a
JG
1678int consumer_rotate_channel(struct consumer_socket *socket,
1679 uint64_t key,
1680 struct consumer_output *output,
1681 bool is_metadata_channel)
5c408ad8
JD
1682{
1683 int ret;
1684 struct lttcomm_consumer_msg msg;
1685
a0377dfe 1686 LTTNG_ASSERT(socket);
5c408ad8
JD
1687
1688 DBG("Consumer rotate channel key %" PRIu64, key);
1689
1690 pthread_mutex_lock(socket->lock);
1691 memset(&msg, 0, sizeof(msg));
1692 msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL;
1693 msg.u.rotate_channel.key = key;
1694 msg.u.rotate_channel.metadata = !!is_metadata_channel;
5c408ad8
JD
1695
1696 if (output->type == CONSUMER_DST_NET) {
1697 msg.u.rotate_channel.relayd_id = output->net_seq_index;
5c408ad8
JD
1698 } else {
1699 msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
5c408ad8
JD
1700 }
1701
1702 health_code_update();
1703 ret = consumer_send_msg(socket, &msg);
1704 if (ret < 0) {
20f37cb4
MD
1705 switch (-ret) {
1706 case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
1707 ret = -LTTNG_ERR_CHAN_NOT_FOUND;
1708 break;
1709 default:
1710 ret = -LTTNG_ERR_ROTATION_FAIL_CONSUMER;
1711 break;
1712 }
5c408ad8
JD
1713 goto error;
1714 }
5c408ad8
JD
1715error:
1716 pthread_mutex_unlock(socket->lock);
1717 health_code_update();
1718 return ret;
1719}
1720
04ed9e10
JG
1721int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key)
1722{
1723 int ret;
7966af57 1724 lttcomm_consumer_msg msg = {
04ed9e10 1725 .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
1c9a0b0e 1726 .u = {},
04ed9e10 1727 };
7966af57 1728 msg.u.open_channel_packets.key = key;
04ed9e10 1729
a0377dfe 1730 LTTNG_ASSERT(socket);
04ed9e10
JG
1731
1732 DBG("Consumer open channel packets: channel key = %" PRIu64, key);
1733
1734 health_code_update();
1735
1736 pthread_mutex_lock(socket->lock);
1737 ret = consumer_send_msg(socket, &msg);
1738 pthread_mutex_unlock(socket->lock);
1739 if (ret < 0) {
1740 goto error_socket;
1741 }
1742
1743error_socket:
1744 health_code_update();
1745 return ret;
1746}
1747
51a4828f
MD
1748int consumer_clear_channel(struct consumer_socket *socket, uint64_t key)
1749{
1750 int ret;
1751 struct lttcomm_consumer_msg msg;
1752
a0377dfe 1753 LTTNG_ASSERT(socket);
51a4828f
MD
1754
1755 DBG("Consumer clear channel %" PRIu64, key);
1756
1757 memset(&msg, 0, sizeof(msg));
1758 msg.cmd_type = LTTNG_CONSUMER_CLEAR_CHANNEL;
1759 msg.u.clear_channel.key = key;
1760
1761 health_code_update();
1762
1763 pthread_mutex_lock(socket->lock);
1764 ret = consumer_send_msg(socket, &msg);
1765 if (ret < 0) {
1766 goto error_socket;
1767 }
1768
1769error_socket:
1770 pthread_mutex_unlock(socket->lock);
1771
1772 health_code_update();
1773 return ret;
1774}
1775
28ab034a 1776int consumer_init(struct consumer_socket *socket, const lttng_uuid& sessiond_uuid)
00fb02ac
JD
1777{
1778 int ret;
d2956687
JG
1779 struct lttcomm_consumer_msg msg = {
1780 .cmd_type = LTTNG_CONSUMER_INIT,
1c9a0b0e 1781 .u = {},
d2956687 1782 };
00fb02ac 1783
a0377dfe 1784 LTTNG_ASSERT(socket);
00fb02ac 1785
d2956687 1786 DBG("Sending consumer initialization command");
328c2fe7 1787 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg.u.init.sessiond_uuid);
00fb02ac
JD
1788
1789 health_code_update();
1790 ret = consumer_send_msg(socket, &msg);
1791 if (ret < 0) {
1792 goto error;
1793 }
1794
d88744a4
JD
1795error:
1796 health_code_update();
1797 return ret;
1798}
1799
1800/*
d2956687 1801 * Ask the consumer to create a new chunk for a given session.
92816cc3 1802 *
d2956687 1803 * Called with the consumer socket lock held.
92816cc3 1804 */
d2956687 1805int consumer_create_trace_chunk(struct consumer_socket *socket,
28ab034a
JG
1806 uint64_t relayd_id,
1807 uint64_t session_id,
1808 struct lttng_trace_chunk *chunk,
1809 const char *domain_subdir)
92816cc3
JG
1810{
1811 int ret;
d2956687
JG
1812 enum lttng_trace_chunk_status chunk_status;
1813 struct lttng_credentials chunk_credentials;
cd9adb8b
JG
1814 const struct lttng_directory_handle *chunk_directory_handle = nullptr;
1815 struct lttng_directory_handle *domain_handle = nullptr;
5da88b0f 1816 int domain_dirfd;
d2956687 1817 const char *chunk_name;
913a542b 1818 bool chunk_name_overridden;
d2956687
JG
1819 uint64_t chunk_id;
1820 time_t creation_timestamp;
1821 char creation_timestamp_buffer[ISO8601_STR_LEN];
1822 const char *creation_timestamp_str = "(none)";
1823 const bool chunk_has_local_output = relayd_id == -1ULL;
69ebf37e 1824 enum lttng_trace_chunk_status tc_status;
d2956687
JG
1825 struct lttcomm_consumer_msg msg = {
1826 .cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK,
1c9a0b0e 1827 .u = {},
d2956687 1828 };
7966af57 1829 msg.u.create_trace_chunk.session_id = session_id;
92816cc3 1830
a0377dfe
FD
1831 LTTNG_ASSERT(socket);
1832 LTTNG_ASSERT(chunk);
92816cc3 1833
d2956687 1834 if (relayd_id != -1ULL) {
28ab034a 1835 LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, relayd_id);
d2956687 1836 }
92816cc3 1837
28ab034a 1838 chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, &chunk_name_overridden);
d2956687 1839 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK &&
28ab034a 1840 chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
d2956687
JG
1841 ERR("Failed to get name of trace chunk");
1842 ret = -LTTNG_ERR_FATAL;
92816cc3
JG
1843 goto error;
1844 }
913a542b 1845 if (chunk_name_overridden) {
d2956687 1846 ret = lttng_strncpy(msg.u.create_trace_chunk.override_name,
28ab034a
JG
1847 chunk_name,
1848 sizeof(msg.u.create_trace_chunk.override_name));
d2956687
JG
1849 if (ret) {
1850 ERR("Trace chunk name \"%s\" exceeds the maximal length allowed by the consumer protocol",
28ab034a 1851 chunk_name);
d2956687
JG
1852 ret = -LTTNG_ERR_FATAL;
1853 goto error;
1854 }
1855 }
92816cc3 1856
28ab034a 1857 chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, &creation_timestamp);
d2956687
JG
1858 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1859 ret = -LTTNG_ERR_FATAL;
92816cc3
JG
1860 goto error;
1861 }
28ab034a 1862 msg.u.create_trace_chunk.creation_timestamp = (uint64_t) creation_timestamp;
d2956687 1863 /* Only used for logging purposes. */
28ab034a
JG
1864 ret = time_to_iso8601_str(
1865 creation_timestamp, creation_timestamp_buffer, sizeof(creation_timestamp_buffer));
1866 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
d2956687
JG
1867
1868 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1869 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1870 /*
1871 * Anonymous trace chunks should never be transmitted
1872 * to remote peers (consumerd and relayd). They are used
1873 * internally for backward-compatibility purposes.
1874 */
1875 ret = -LTTNG_ERR_FATAL;
1876 goto error;
1877 }
1878 msg.u.create_trace_chunk.chunk_id = chunk_id;
92816cc3 1879
d2956687 1880 if (chunk_has_local_output) {
cbf53d23 1881 chunk_status = lttng_trace_chunk_borrow_chunk_directory_handle(
28ab034a 1882 chunk, &chunk_directory_handle);
d2956687
JG
1883 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1884 ret = -LTTNG_ERR_FATAL;
1885 goto error;
1886 }
28ab034a 1887 chunk_status = lttng_trace_chunk_get_credentials(chunk, &chunk_credentials);
e5add6d0
JG
1888 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1889 /*
1890 * Not associating credentials to a sessiond chunk is a
1891 * fatal internal error.
1892 */
1893 ret = -LTTNG_ERR_FATAL;
1894 goto error;
1895 }
28ab034a 1896 tc_status = lttng_trace_chunk_create_subdirectory(chunk, domain_subdir);
69ebf37e 1897 if (tc_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
5da88b0f 1898 PERROR("Failed to create chunk domain output directory \"%s\"",
28ab034a 1899 domain_subdir);
5da88b0f
MD
1900 ret = -LTTNG_ERR_FATAL;
1901 goto error;
1902 }
28ab034a
JG
1903 domain_handle = lttng_directory_handle_create_from_handle(domain_subdir,
1904 chunk_directory_handle);
5da88b0f
MD
1905 if (!domain_handle) {
1906 ret = -LTTNG_ERR_FATAL;
1907 goto error;
1908 }
1909
1910 /*
1911 * This will only compile on platforms that support
1912 * dirfd (POSIX.2008). This is fine as the session daemon
1913 * is only built for such platforms.
1914 *
1915 * The ownership of the chunk directory handle's is maintained
1916 * by the trace chunk.
1917 */
28ab034a 1918 domain_dirfd = lttng_directory_handle_get_dirfd(domain_handle);
a0377dfe 1919 LTTNG_ASSERT(domain_dirfd >= 0);
5da88b0f 1920
e5add6d0 1921 msg.u.create_trace_chunk.credentials.value.uid =
28ab034a 1922 lttng_credentials_get_uid(&chunk_credentials);
e5add6d0 1923 msg.u.create_trace_chunk.credentials.value.gid =
28ab034a 1924 lttng_credentials_get_gid(&chunk_credentials);
e5add6d0 1925 msg.u.create_trace_chunk.credentials.is_set = 1;
d2956687 1926 }
d88744a4 1927
d2956687 1928 DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64
28ab034a
JG
1929 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", creation_timestamp = %s",
1930 relayd_id,
1931 session_id,
1932 chunk_id,
1933 creation_timestamp_str);
d88744a4
JD
1934 health_code_update();
1935 ret = consumer_send_msg(socket, &msg);
d2956687 1936 health_code_update();
d88744a4 1937 if (ret < 0) {
d2956687
JG
1938 ERR("Trace chunk creation error on consumer");
1939 ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER;
d88744a4
JD
1940 goto error;
1941 }
1942
d2956687 1943 if (chunk_has_local_output) {
5da88b0f 1944 DBG("Sending trace chunk domain directory fd to consumer");
d2956687 1945 health_code_update();
5da88b0f 1946 ret = consumer_send_fds(socket, &domain_dirfd, 1);
d2956687
JG
1947 health_code_update();
1948 if (ret < 0) {
1949 ERR("Trace chunk creation error on consumer");
1950 ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER;
1951 goto error;
1952 }
d88744a4 1953 }
00fb02ac 1954error:
5da88b0f 1955 lttng_directory_handle_put(domain_handle);
00fb02ac
JD
1956 return ret;
1957}
1958
a1ae2ea5 1959/*
d2956687 1960 * Ask the consumer to close a trace chunk for a given session.
a1ae2ea5
JD
1961 *
1962 * Called with the consumer socket lock held.
1963 */
d2956687 1964int consumer_close_trace_chunk(struct consumer_socket *socket,
28ab034a
JG
1965 uint64_t relayd_id,
1966 uint64_t session_id,
1967 struct lttng_trace_chunk *chunk,
1968 char *closed_trace_chunk_path)
a1ae2ea5
JD
1969{
1970 int ret;
d2956687 1971 enum lttng_trace_chunk_status chunk_status;
7966af57
SM
1972 lttcomm_consumer_msg msg = {
1973 .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
1c9a0b0e 1974 .u = {},
d2956687 1975 };
7966af57
SM
1976 msg.u.close_trace_chunk.session_id = session_id;
1977
ecd1a12f 1978 struct lttcomm_consumer_close_trace_chunk_reply reply;
d2956687
JG
1979 uint64_t chunk_id;
1980 time_t close_timestamp;
bbc4768c
JG
1981 enum lttng_trace_chunk_command_type close_command;
1982 const char *close_command_name = "none";
ecd1a12f 1983 struct lttng_dynamic_buffer path_reception_buffer;
a1ae2ea5 1984
a0377dfe 1985 LTTNG_ASSERT(socket);
ecd1a12f 1986 lttng_dynamic_buffer_init(&path_reception_buffer);
a1ae2ea5 1987
d2956687 1988 if (relayd_id != -1ULL) {
28ab034a 1989 LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id, relayd_id);
bbc4768c
JG
1990 }
1991
28ab034a 1992 chunk_status = lttng_trace_chunk_get_close_command(chunk, &close_command);
bbc4768c
JG
1993 switch (chunk_status) {
1994 case LTTNG_TRACE_CHUNK_STATUS_OK:
1995 LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command,
28ab034a 1996 (uint32_t) close_command);
bbc4768c
JG
1997 break;
1998 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1999 break;
2000 default:
2001 ERR("Failed to get trace chunk close command");
2002 ret = -1;
2003 goto error;
a1ae2ea5
JD
2004 }
2005
d2956687
JG
2006 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
2007 /*
2008 * Anonymous trace chunks should never be transmitted to remote peers
2009 * (consumerd and relayd). They are used internally for
2010 * backward-compatibility purposes.
2011 */
a0377dfe 2012 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687
JG
2013 msg.u.close_trace_chunk.chunk_id = chunk_id;
2014
28ab034a 2015 chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
d2956687
JG
2016 /*
2017 * A trace chunk should be closed locally before being closed remotely.
2018 * Otherwise, the close timestamp would never be transmitted to the
2019 * peers.
2020 */
a0377dfe 2021 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687
JG
2022 msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp;
2023
bbc4768c 2024 if (msg.u.close_trace_chunk.close_command.is_set) {
28ab034a 2025 close_command_name = lttng_trace_chunk_command_type_get_name(close_command);
bbc4768c 2026 }
d2956687 2027 DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64
28ab034a
JG
2028 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = \"%s\"",
2029 relayd_id,
2030 session_id,
2031 chunk_id,
2032 close_command_name);
a1ae2ea5
JD
2033
2034 health_code_update();
ecd1a12f 2035 ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg));
a1ae2ea5 2036 if (ret < 0) {
d2956687 2037 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
a1ae2ea5
JD
2038 goto error;
2039 }
ecd1a12f
MD
2040 ret = consumer_socket_recv(socket, &reply, sizeof(reply));
2041 if (ret < 0) {
2042 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
2043 goto error;
2044 }
2045 if (reply.path_length >= LTTNG_PATH_MAX) {
28ab034a
JG
2046 ERR("Invalid path returned by relay daemon: %" PRIu32
2047 "bytes exceeds maximal allowed length of %d bytes",
2048 reply.path_length,
2049 LTTNG_PATH_MAX);
ecd1a12f
MD
2050 ret = -LTTNG_ERR_INVALID_PROTOCOL;
2051 goto error;
2052 }
28ab034a 2053 ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, reply.path_length);
ecd1a12f
MD
2054 if (ret) {
2055 ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command");
2056 ret = -LTTNG_ERR_NOMEM;
2057 goto error;
2058 }
28ab034a 2059 ret = consumer_socket_recv(socket, path_reception_buffer.data, path_reception_buffer.size);
ecd1a12f
MD
2060 if (ret < 0) {
2061 ERR("Communication error while receiving path of closed trace chunk");
2062 ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER;
2063 goto error;
2064 }
2065 if (path_reception_buffer.data[path_reception_buffer.size - 1] != '\0') {
2066 ERR("Invalid path returned by relay daemon: not null-terminated");
2067 ret = -LTTNG_ERR_INVALID_PROTOCOL;
2068 goto error;
2069 }
2070 if (closed_trace_chunk_path) {
2071 /*
2072 * closed_trace_chunk_path is assumed to have a length >=
2073 * LTTNG_PATH_MAX
2074 */
28ab034a
JG
2075 memcpy(closed_trace_chunk_path,
2076 path_reception_buffer.data,
2077 path_reception_buffer.size);
ecd1a12f 2078 }
a1ae2ea5 2079error:
ecd1a12f 2080 lttng_dynamic_buffer_reset(&path_reception_buffer);
a1ae2ea5
JD
2081 health_code_update();
2082 return ret;
2083}
3654ed19 2084
d2956687
JG
2085/*
2086 * Ask the consumer if a trace chunk exists.
2087 *
2088 * Called with the consumer socket lock held.
2089 * Returns 0 on success, or a negative value on error.
2090 */
2091int consumer_trace_chunk_exists(struct consumer_socket *socket,
28ab034a
JG
2092 uint64_t relayd_id,
2093 uint64_t session_id,
2094 struct lttng_trace_chunk *chunk,
2095 enum consumer_trace_chunk_exists_status *result)
3654ed19
JG
2096{
2097 int ret;
d2956687 2098 enum lttng_trace_chunk_status chunk_status;
7966af57 2099 lttcomm_consumer_msg msg = {
d2956687 2100 .cmd_type = LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
1c9a0b0e 2101 .u = {},
3654ed19 2102 };
7966af57
SM
2103 msg.u.trace_chunk_exists.session_id = session_id;
2104
d2956687
JG
2105 uint64_t chunk_id;
2106 const char *consumer_reply_str;
3654ed19 2107
a0377dfe 2108 LTTNG_ASSERT(socket);
3654ed19 2109
d2956687 2110 if (relayd_id != -1ULL) {
28ab034a 2111 LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, relayd_id);
d2956687
JG
2112 }
2113
2114 chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id);
2115 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
2116 /*
2117 * Anonymous trace chunks should never be transmitted
2118 * to remote peers (consumerd and relayd). They are used
2119 * internally for backward-compatibility purposes.
2120 */
2121 ret = -LTTNG_ERR_FATAL;
2122 goto error;
2123 }
2124 msg.u.trace_chunk_exists.chunk_id = chunk_id;
2125
2126 DBG("Sending consumer trace chunk exists command: relayd_id = %" PRId64
28ab034a
JG
2127 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64,
2128 relayd_id,
2129 session_id,
2130 chunk_id);
3654ed19
JG
2131
2132 health_code_update();
2133 ret = consumer_send_msg(socket, &msg);
d2956687
JG
2134 switch (-ret) {
2135 case LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK:
2136 consumer_reply_str = "unknown trace chunk";
2137 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK;
2138 break;
2139 case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL:
2140 consumer_reply_str = "trace chunk exists locally";
2141 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_LOCAL;
2142 break;
2143 case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE:
2144 consumer_reply_str = "trace chunk exists on remote peer";
2145 *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_REMOTE;
2146 break;
2147 default:
2148 ERR("Consumer returned an error from TRACE_CHUNK_EXISTS command");
2149 ret = -1;
3654ed19
JG
2150 goto error;
2151 }
28ab034a 2152 DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", consumer_reply_str);
d2956687 2153 ret = 0;
3654ed19
JG
2154error:
2155 health_code_update();
2156 return ret;
2157}
This page took 0.202413 seconds and 4 git commands to generate.