clang-tidy: add Chrome-inspired checks
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include "ust-consumer.hpp"
12
13 #include <common/common.hpp>
14 #include <common/compat/endian.hpp>
15 #include <common/compat/fcntl.hpp>
16 #include <common/consumer/consumer-metadata-cache.hpp>
17 #include <common/consumer/consumer-stream.hpp>
18 #include <common/consumer/consumer-timer.hpp>
19 #include <common/consumer/consumer.hpp>
20 #include <common/index/index.hpp>
21 #include <common/optional.hpp>
22 #include <common/relayd/relayd.hpp>
23 #include <common/sessiond-comm/sessiond-comm.hpp>
24 #include <common/shm.hpp>
25 #include <common/utils.hpp>
26
27 #include <lttng/ust-ctl.h>
28 #include <lttng/ust-sigbus.h>
29
30 #include <bin/lttng-consumerd/health-consumerd.hpp>
31 #include <inttypes.h>
32 #include <poll.h>
33 #include <pthread.h>
34 #include <signal.h>
35 #include <stdbool.h>
36 #include <stdint.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <sys/mman.h>
40 #include <sys/socket.h>
41 #include <sys/stat.h>
42 #include <sys/types.h>
43 #include <unistd.h>
44 #include <urcu/list.h>
45
46 #define INT_MAX_STR_LEN 12 /* includes \0 */
47
48 extern struct lttng_consumer_global_data the_consumer_data;
49 extern int consumer_poll_timeout;
50
51 LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE();
52
53 /*
54 * Add channel to internal consumer state.
55 *
56 * Returns 0 on success or else a negative value.
57 */
58 static int add_channel(struct lttng_consumer_channel *channel,
59 struct lttng_consumer_local_data *ctx)
60 {
61 int ret = 0;
62
63 LTTNG_ASSERT(channel);
64 LTTNG_ASSERT(ctx);
65
66 if (ctx->on_recv_channel != nullptr) {
67 ret = ctx->on_recv_channel(channel);
68 if (ret == 0) {
69 ret = consumer_add_channel(channel, ctx);
70 } else if (ret < 0) {
71 /* Most likely an ENOMEM. */
72 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
73 goto error;
74 }
75 } else {
76 ret = consumer_add_channel(channel, ctx);
77 }
78
79 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
80
81 error:
82 return ret;
83 }
84
85 /*
86 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
87 * error value if applicable is set in it else it is kept untouched.
88 *
89 * Return NULL on error else the newly allocated stream object.
90 */
91 static struct lttng_consumer_stream *allocate_stream(int cpu,
92 int key,
93 struct lttng_consumer_channel *channel,
94 struct lttng_consumer_local_data *ctx,
95 int *_alloc_ret)
96 {
97 int alloc_ret;
98 struct lttng_consumer_stream *stream = nullptr;
99
100 LTTNG_ASSERT(channel);
101 LTTNG_ASSERT(ctx);
102
103 stream = consumer_stream_create(channel,
104 channel->key,
105 key,
106 channel->name,
107 channel->relayd_id,
108 channel->session_id,
109 channel->trace_chunk,
110 cpu,
111 &alloc_ret,
112 channel->type,
113 channel->monitor);
114 if (stream == nullptr) {
115 switch (alloc_ret) {
116 case -ENOENT:
117 /*
118 * We could not find the channel. Can happen if cpu hotplug
119 * happens while tearing down.
120 */
121 DBG3("Could not find channel");
122 break;
123 case -ENOMEM:
124 case -EINVAL:
125 default:
126 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
127 break;
128 }
129 goto error;
130 }
131
132 consumer_stream_update_channel_attributes(stream, channel);
133
134 error:
135 if (_alloc_ret) {
136 *_alloc_ret = alloc_ret;
137 }
138 return stream;
139 }
140
141 /*
142 * Send the given stream pointer to the corresponding thread.
143 *
144 * Returns 0 on success else a negative value.
145 */
146 static int send_stream_to_thread(struct lttng_consumer_stream *stream,
147 struct lttng_consumer_local_data *ctx)
148 {
149 int ret;
150 struct lttng_pipe *stream_pipe;
151
152 /* Get the right pipe where the stream will be sent. */
153 if (stream->metadata_flag) {
154 consumer_add_metadata_stream(stream);
155 stream_pipe = ctx->consumer_metadata_pipe;
156 } else {
157 consumer_add_data_stream(stream);
158 stream_pipe = ctx->consumer_data_pipe;
159 }
160
161 /*
162 * From this point on, the stream's ownership has been moved away from
163 * the channel and it becomes globally visible. Hence, remove it from
164 * the local stream list to prevent the stream from being both local and
165 * global.
166 */
167 stream->globally_visible = 1;
168 cds_list_del_init(&stream->send_node);
169
170 ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
171 if (ret < 0) {
172 ERR("Consumer write %s stream to pipe %d",
173 stream->metadata_flag ? "metadata" : "data",
174 lttng_pipe_get_writefd(stream_pipe));
175 if (stream->metadata_flag) {
176 consumer_del_stream_for_metadata(stream);
177 } else {
178 consumer_del_stream_for_data(stream);
179 }
180 goto error;
181 }
182
183 error:
184 return ret;
185 }
186
187 static int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
188 {
189 char cpu_nr[INT_MAX_STR_LEN]; /* int max len */
190 int ret;
191
192 strncpy(stream_shm_path, shm_path, PATH_MAX);
193 stream_shm_path[PATH_MAX - 1] = '\0';
194 ret = snprintf(cpu_nr, INT_MAX_STR_LEN, "%i", cpu);
195 if (ret < 0) {
196 PERROR("snprintf");
197 goto end;
198 }
199 strncat(stream_shm_path, cpu_nr, PATH_MAX - strlen(stream_shm_path) - 1);
200 ret = 0;
201 end:
202 return ret;
203 }
204
205 /*
206 * Create streams for the given channel using liblttng-ust-ctl.
207 * The channel lock must be acquired by the caller.
208 *
209 * Return 0 on success else a negative value.
210 */
211 static int create_ust_streams(struct lttng_consumer_channel *channel,
212 struct lttng_consumer_local_data *ctx)
213 {
214 int ret, cpu = 0;
215 struct lttng_ust_ctl_consumer_stream *ustream;
216 struct lttng_consumer_stream *stream;
217 pthread_mutex_t *current_stream_lock = nullptr;
218
219 LTTNG_ASSERT(channel);
220 LTTNG_ASSERT(ctx);
221
222 /*
223 * While a stream is available from ustctl. When NULL is returned, we've
224 * reached the end of the possible stream for the channel.
225 */
226 while ((ustream = lttng_ust_ctl_create_stream(channel->uchan, cpu))) {
227 int wait_fd;
228 int ust_metadata_pipe[2];
229
230 health_code_update();
231
232 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
233 ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
234 if (ret < 0) {
235 ERR("Create ust metadata poll pipe");
236 goto error;
237 }
238 wait_fd = ust_metadata_pipe[0];
239 } else {
240 wait_fd = lttng_ust_ctl_stream_get_wait_fd(ustream);
241 }
242
243 /* Allocate consumer stream object. */
244 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
245 if (!stream) {
246 goto error_alloc;
247 }
248 stream->ustream = ustream;
249 /*
250 * Store it so we can save multiple function calls afterwards since
251 * this value is used heavily in the stream threads. This is UST
252 * specific so this is why it's done after allocation.
253 */
254 stream->wait_fd = wait_fd;
255
256 /*
257 * Increment channel refcount since the channel reference has now been
258 * assigned in the allocation process above.
259 */
260 if (stream->chan->monitor) {
261 uatomic_inc(&stream->chan->refcount);
262 }
263
264 pthread_mutex_lock(&stream->lock);
265 current_stream_lock = &stream->lock;
266 /*
267 * Order is important this is why a list is used. On error, the caller
268 * should clean this list.
269 */
270 cds_list_add_tail(&stream->send_node, &channel->streams.head);
271
272 ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, &stream->max_sb_size);
273 if (ret < 0) {
274 ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", stream->name);
275 goto error;
276 }
277
278 /* Do actions once stream has been received. */
279 if (ctx->on_recv_stream) {
280 ret = ctx->on_recv_stream(stream);
281 if (ret < 0) {
282 goto error;
283 }
284 }
285
286 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
287 stream->name,
288 stream->key,
289 stream->relayd_stream_id);
290
291 /* Set next CPU stream. */
292 channel->streams.count = ++cpu;
293
294 /* Keep stream reference when creating metadata. */
295 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
296 channel->metadata_stream = stream;
297 if (channel->monitor) {
298 /* Set metadata poll pipe if we created one */
299 memcpy(stream->ust_metadata_poll_pipe,
300 ust_metadata_pipe,
301 sizeof(ust_metadata_pipe));
302 }
303 }
304 pthread_mutex_unlock(&stream->lock);
305 current_stream_lock = nullptr;
306 }
307
308 return 0;
309
310 error:
311 error_alloc:
312 if (current_stream_lock) {
313 pthread_mutex_unlock(current_stream_lock);
314 }
315 return ret;
316 }
317
318 static int open_ust_stream_fd(struct lttng_consumer_channel *channel,
319 int cpu,
320 const struct lttng_credentials *session_credentials)
321 {
322 char shm_path[PATH_MAX];
323 int ret;
324
325 if (!channel->shm_path[0]) {
326 return shm_create_anonymous("ust-consumer");
327 }
328 ret = get_stream_shm_path(shm_path, channel->shm_path, cpu);
329 if (ret) {
330 goto error_shm_path;
331 }
332 return run_as_open(shm_path,
333 O_RDWR | O_CREAT | O_EXCL,
334 S_IRUSR | S_IWUSR,
335 lttng_credentials_get_uid(session_credentials),
336 lttng_credentials_get_gid(session_credentials));
337
338 error_shm_path:
339 return -1;
340 }
341
342 /*
343 * Create an UST channel with the given attributes and send it to the session
344 * daemon using the ust ctl API.
345 *
346 * Return 0 on success or else a negative value.
347 */
348 static int create_ust_channel(struct lttng_consumer_channel *channel,
349 struct lttng_ust_ctl_consumer_channel_attr *attr,
350 struct lttng_ust_ctl_consumer_channel **ust_chanp)
351 {
352 int ret, nr_stream_fds, i, j;
353 int *stream_fds;
354 struct lttng_ust_ctl_consumer_channel *ust_channel;
355
356 LTTNG_ASSERT(channel);
357 LTTNG_ASSERT(attr);
358 LTTNG_ASSERT(ust_chanp);
359 LTTNG_ASSERT(channel->buffer_credentials.is_set);
360
361 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
362 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
363 "switch_timer_interval: %u, read_timer_interval: %u, "
364 "output: %d, type: %d",
365 attr->overwrite,
366 attr->subbuf_size,
367 attr->num_subbuf,
368 attr->switch_timer_interval,
369 attr->read_timer_interval,
370 attr->output,
371 attr->type);
372
373 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA)
374 nr_stream_fds = 1;
375 else
376 nr_stream_fds = lttng_ust_ctl_get_nr_stream_per_channel();
377 stream_fds = calloc<int>(nr_stream_fds);
378 if (!stream_fds) {
379 ret = -1;
380 goto error_alloc;
381 }
382 for (i = 0; i < nr_stream_fds; i++) {
383 stream_fds[i] = open_ust_stream_fd(channel, i, &channel->buffer_credentials.value);
384 if (stream_fds[i] < 0) {
385 ret = -1;
386 goto error_open;
387 }
388 }
389 ust_channel = lttng_ust_ctl_create_channel(attr, stream_fds, nr_stream_fds);
390 if (!ust_channel) {
391 ret = -1;
392 goto error_create;
393 }
394 channel->nr_stream_fds = nr_stream_fds;
395 channel->stream_fds = stream_fds;
396 *ust_chanp = ust_channel;
397
398 return 0;
399
400 error_create:
401 error_open:
402 for (j = i - 1; j >= 0; j--) {
403 int closeret;
404
405 closeret = close(stream_fds[j]);
406 if (closeret) {
407 PERROR("close");
408 }
409 if (channel->shm_path[0]) {
410 char shm_path[PATH_MAX];
411
412 closeret = get_stream_shm_path(shm_path, channel->shm_path, j);
413 if (closeret) {
414 ERR("Cannot get stream shm path");
415 }
416 closeret = run_as_unlink(shm_path,
417 lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
418 channel->buffer_credentials)),
419 lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
420 channel->buffer_credentials)));
421 if (closeret) {
422 PERROR("unlink %s", shm_path);
423 }
424 }
425 }
426 /* Try to rmdir all directories under shm_path root. */
427 if (channel->root_shm_path[0]) {
428 (void) run_as_rmdir_recursive(channel->root_shm_path,
429 lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
430 channel->buffer_credentials)),
431 lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
432 channel->buffer_credentials)),
433 LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
434 }
435 free(stream_fds);
436 error_alloc:
437 return ret;
438 }
439
440 /*
441 * Send a single given stream to the session daemon using the sock.
442 *
443 * Return 0 on success else a negative value.
444 */
445 static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
446 {
447 int ret;
448
449 LTTNG_ASSERT(stream);
450 LTTNG_ASSERT(sock >= 0);
451
452 DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
453
454 /* Send stream to session daemon. */
455 ret = lttng_ust_ctl_send_stream_to_sessiond(sock, stream->ustream);
456 if (ret < 0) {
457 goto error;
458 }
459
460 error:
461 return ret;
462 }
463
464 /*
465 * Send channel to sessiond and relayd if applicable.
466 *
467 * Return 0 on success or else a negative value.
468 */
469 static int send_channel_to_sessiond_and_relayd(int sock,
470 struct lttng_consumer_channel *channel,
471 struct lttng_consumer_local_data *ctx,
472 int *relayd_error)
473 {
474 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
475 struct lttng_consumer_stream *stream;
476 uint64_t net_seq_idx = -1ULL;
477
478 LTTNG_ASSERT(channel);
479 LTTNG_ASSERT(ctx);
480 LTTNG_ASSERT(sock >= 0);
481
482 DBG("UST consumer sending channel %s to sessiond", channel->name);
483
484 if (channel->relayd_id != (uint64_t) -1ULL) {
485 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
486 health_code_update();
487
488 /* Try to send the stream to the relayd if one is available. */
489 DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd",
490 stream->key,
491 channel->name);
492 ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
493 if (ret < 0) {
494 /*
495 * Flag that the relayd was the problem here probably due to a
496 * communicaton error on the socket.
497 */
498 if (relayd_error) {
499 *relayd_error = 1;
500 }
501 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
502 }
503 if (net_seq_idx == -1ULL) {
504 net_seq_idx = stream->net_seq_idx;
505 }
506 }
507 }
508
509 /* Inform sessiond that we are about to send channel and streams. */
510 ret = consumer_send_status_msg(sock, ret_code);
511 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
512 /*
513 * Either the session daemon is not responding or the relayd died so we
514 * stop now.
515 */
516 goto error;
517 }
518
519 /* Send channel to sessiond. */
520 ret = lttng_ust_ctl_send_channel_to_sessiond(sock, channel->uchan);
521 if (ret < 0) {
522 goto error;
523 }
524
525 ret = lttng_ust_ctl_channel_close_wakeup_fd(channel->uchan);
526 if (ret < 0) {
527 goto error;
528 }
529
530 /* The channel was sent successfully to the sessiond at this point. */
531 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
532 health_code_update();
533
534 /* Send stream to session daemon. */
535 ret = send_sessiond_stream(sock, stream);
536 if (ret < 0) {
537 goto error;
538 }
539 }
540
541 /* Tell sessiond there is no more stream. */
542 ret = lttng_ust_ctl_send_stream_to_sessiond(sock, nullptr);
543 if (ret < 0) {
544 goto error;
545 }
546
547 DBG("UST consumer NULL stream sent to sessiond");
548
549 return 0;
550
551 error:
552 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
553 ret = -1;
554 }
555 return ret;
556 }
557
558 /*
559 * Creates a channel and streams and add the channel it to the channel internal
560 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
561 * received.
562 *
563 * Return 0 on success or else, a negative value is returned and the channel
564 * MUST be destroyed by consumer_del_channel().
565 */
566 static int ask_channel(struct lttng_consumer_local_data *ctx,
567 struct lttng_consumer_channel *channel,
568 struct lttng_ust_ctl_consumer_channel_attr *attr)
569 {
570 int ret;
571
572 LTTNG_ASSERT(ctx);
573 LTTNG_ASSERT(channel);
574 LTTNG_ASSERT(attr);
575
576 /*
577 * This value is still used by the kernel consumer since for the kernel,
578 * the stream ownership is not IN the consumer so we need to have the
579 * number of left stream that needs to be initialized so we can know when
580 * to delete the channel (see consumer.c).
581 *
582 * As for the user space tracer now, the consumer creates and sends the
583 * stream to the session daemon which only sends them to the application
584 * once every stream of a channel is received making this value useless
585 * because we they will be added to the poll thread before the application
586 * receives them. This ensures that a stream can not hang up during
587 * initilization of a channel.
588 */
589 channel->nb_init_stream_left = 0;
590
591 /* The reply msg status is handled in the following call. */
592 ret = create_ust_channel(channel, attr, &channel->uchan);
593 if (ret < 0) {
594 goto end;
595 }
596
597 channel->wait_fd = lttng_ust_ctl_channel_get_wait_fd(channel->uchan);
598
599 /*
600 * For the snapshots (no monitor), we create the metadata streams
601 * on demand, not during the channel creation.
602 */
603 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
604 ret = 0;
605 goto end;
606 }
607
608 /* Open all streams for this channel. */
609 pthread_mutex_lock(&channel->lock);
610 ret = create_ust_streams(channel, ctx);
611 pthread_mutex_unlock(&channel->lock);
612 if (ret < 0) {
613 goto end;
614 }
615
616 end:
617 return ret;
618 }
619
620 /*
621 * Send all stream of a channel to the right thread handling it.
622 *
623 * On error, return a negative value else 0 on success.
624 */
625 static int send_streams_to_thread(struct lttng_consumer_channel *channel,
626 struct lttng_consumer_local_data *ctx)
627 {
628 int ret = 0;
629 struct lttng_consumer_stream *stream, *stmp;
630
631 LTTNG_ASSERT(channel);
632 LTTNG_ASSERT(ctx);
633
634 /* Send streams to the corresponding thread. */
635 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
636 health_code_update();
637
638 /* Sending the stream to the thread. */
639 ret = send_stream_to_thread(stream, ctx);
640 if (ret < 0) {
641 /*
642 * If we are unable to send the stream to the thread, there is
643 * a big problem so just stop everything.
644 */
645 goto error;
646 }
647 }
648
649 error:
650 return ret;
651 }
652
653 /*
654 * Flush channel's streams using the given key to retrieve the channel.
655 *
656 * Return 0 on success else an LTTng error code.
657 */
658 static int flush_channel(uint64_t chan_key)
659 {
660 int ret = 0;
661 struct lttng_consumer_channel *channel;
662 struct lttng_consumer_stream *stream;
663 struct lttng_ht *ht;
664 struct lttng_ht_iter iter;
665
666 DBG("UST consumer flush channel key %" PRIu64, chan_key);
667
668 rcu_read_lock();
669 channel = consumer_find_channel(chan_key);
670 if (!channel) {
671 ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
672 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
673 goto error;
674 }
675
676 ht = the_consumer_data.stream_per_chan_id_ht;
677
678 /* For each stream of the channel id, flush it. */
679 cds_lfht_for_each_entry_duplicate(ht->ht,
680 ht->hash_fct(&channel->key, lttng_ht_seed),
681 ht->match_fct,
682 &channel->key,
683 &iter.iter,
684 stream,
685 node_channel_id.node)
686 {
687 health_code_update();
688
689 pthread_mutex_lock(&stream->lock);
690
691 /*
692 * Protect against concurrent teardown of a stream.
693 */
694 if (cds_lfht_is_node_deleted(&stream->node.node)) {
695 goto next;
696 }
697
698 if (!stream->quiescent) {
699 ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
700 if (ret) {
701 ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64
702 ", channel name = '%s'",
703 chan_key,
704 channel->name);
705 ret = LTTNG_ERR_BUFFER_FLUSH_FAILED;
706 pthread_mutex_unlock(&stream->lock);
707 goto error;
708 }
709 stream->quiescent = true;
710 }
711 next:
712 pthread_mutex_unlock(&stream->lock);
713 }
714
715 /*
716 * Send one last buffer statistics update to the session daemon. This
717 * ensures that the session daemon gets at least one statistics update
718 * per channel even in the case of short-lived channels, such as when a
719 * short-lived app is traced in per-pid mode.
720 */
721 sample_and_send_channel_buffer_stats(channel);
722 error:
723 rcu_read_unlock();
724 return ret;
725 }
726
727 /*
728 * Clear quiescent state from channel's streams using the given key to
729 * retrieve the channel.
730 *
731 * Return 0 on success else an LTTng error code.
732 */
733 static int clear_quiescent_channel(uint64_t chan_key)
734 {
735 int ret = 0;
736 struct lttng_consumer_channel *channel;
737 struct lttng_consumer_stream *stream;
738 struct lttng_ht *ht;
739 struct lttng_ht_iter iter;
740
741 DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
742
743 rcu_read_lock();
744 channel = consumer_find_channel(chan_key);
745 if (!channel) {
746 ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
747 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
748 goto error;
749 }
750
751 ht = the_consumer_data.stream_per_chan_id_ht;
752
753 /* For each stream of the channel id, clear quiescent state. */
754 cds_lfht_for_each_entry_duplicate(ht->ht,
755 ht->hash_fct(&channel->key, lttng_ht_seed),
756 ht->match_fct,
757 &channel->key,
758 &iter.iter,
759 stream,
760 node_channel_id.node)
761 {
762 health_code_update();
763
764 pthread_mutex_lock(&stream->lock);
765 stream->quiescent = false;
766 pthread_mutex_unlock(&stream->lock);
767 }
768 error:
769 rcu_read_unlock();
770 return ret;
771 }
772
773 /*
774 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
775 *
776 * Return 0 on success else an LTTng error code.
777 */
778 static int close_metadata(uint64_t chan_key)
779 {
780 int ret = 0;
781 struct lttng_consumer_channel *channel;
782 unsigned int channel_monitor;
783
784 DBG("UST consumer close metadata key %" PRIu64, chan_key);
785
786 channel = consumer_find_channel(chan_key);
787 if (!channel) {
788 /*
789 * This is possible if the metadata thread has issue a delete because
790 * the endpoint point of the stream hung up. There is no way the
791 * session daemon can know about it thus use a DBG instead of an actual
792 * error.
793 */
794 DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
795 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
796 goto error;
797 }
798
799 pthread_mutex_lock(&the_consumer_data.lock);
800 pthread_mutex_lock(&channel->lock);
801 channel_monitor = channel->monitor;
802 if (cds_lfht_is_node_deleted(&channel->node.node)) {
803 goto error_unlock;
804 }
805
806 lttng_ustconsumer_close_metadata(channel);
807 pthread_mutex_unlock(&channel->lock);
808 pthread_mutex_unlock(&the_consumer_data.lock);
809
810 /*
811 * The ownership of a metadata channel depends on the type of
812 * session to which it belongs. In effect, the monitor flag is checked
813 * to determine if this metadata channel is in "snapshot" mode or not.
814 *
815 * In the non-snapshot case, the metadata channel is created along with
816 * a single stream which will remain present until the metadata channel
817 * is destroyed (on the destruction of its session). In this case, the
818 * metadata stream in "monitored" by the metadata poll thread and holds
819 * the ownership of its channel.
820 *
821 * Closing the metadata will cause the metadata stream's "metadata poll
822 * pipe" to be closed. Closing this pipe will wake-up the metadata poll
823 * thread which will teardown the metadata stream which, in return,
824 * deletes the metadata channel.
825 *
826 * In the snapshot case, the metadata stream is created and destroyed
827 * on every snapshot record. Since the channel doesn't have an owner
828 * other than the session daemon, it is safe to destroy it immediately
829 * on reception of the CLOSE_METADATA command.
830 */
831 if (!channel_monitor) {
832 /*
833 * The channel and consumer_data locks must be
834 * released before this call since consumer_del_channel
835 * re-acquires the channel and consumer_data locks to teardown
836 * the channel and queue its reclamation by the "call_rcu"
837 * worker thread.
838 */
839 consumer_del_channel(channel);
840 }
841
842 return ret;
843 error_unlock:
844 pthread_mutex_unlock(&channel->lock);
845 pthread_mutex_unlock(&the_consumer_data.lock);
846 error:
847 return ret;
848 }
849
850 /*
851 * RCU read side lock MUST be acquired before calling this function.
852 *
853 * Return 0 on success else an LTTng error code.
854 */
855 static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
856 {
857 int ret;
858 struct lttng_consumer_channel *metadata;
859
860 ASSERT_RCU_READ_LOCKED();
861
862 DBG("UST consumer setup metadata key %" PRIu64, key);
863
864 metadata = consumer_find_channel(key);
865 if (!metadata) {
866 ERR("UST consumer push metadata %" PRIu64 " not found", key);
867 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
868 goto end;
869 }
870
871 /*
872 * In no monitor mode, the metadata channel has no stream(s) so skip the
873 * ownership transfer to the metadata thread.
874 */
875 if (!metadata->monitor) {
876 DBG("Metadata channel in no monitor");
877 ret = 0;
878 goto end;
879 }
880
881 /*
882 * Send metadata stream to relayd if one available. Availability is
883 * known if the stream is still in the list of the channel.
884 */
885 if (cds_list_empty(&metadata->streams.head)) {
886 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
887 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
888 goto error_no_stream;
889 }
890
891 /* Send metadata stream to relayd if needed. */
892 if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
893 ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname);
894 if (ret < 0) {
895 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
896 goto error;
897 }
898 ret = consumer_send_relayd_streams_sent(metadata->metadata_stream->net_seq_idx);
899 if (ret < 0) {
900 ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
901 goto error;
902 }
903 }
904
905 /*
906 * Ownership of metadata stream is passed along. Freeing is handled by
907 * the callee.
908 */
909 ret = send_streams_to_thread(metadata, ctx);
910 if (ret < 0) {
911 /*
912 * If we are unable to send the stream to the thread, there is
913 * a big problem so just stop everything.
914 */
915 ret = LTTCOMM_CONSUMERD_FATAL;
916 goto send_streams_error;
917 }
918 /* List MUST be empty after or else it could be reused. */
919 LTTNG_ASSERT(cds_list_empty(&metadata->streams.head));
920
921 ret = 0;
922 goto end;
923
924 error:
925 /*
926 * Delete metadata channel on error. At this point, the metadata stream can
927 * NOT be monitored by the metadata thread thus having the guarantee that
928 * the stream is still in the local stream list of the channel. This call
929 * will make sure to clean that list.
930 */
931 consumer_stream_destroy(metadata->metadata_stream, nullptr);
932 metadata->metadata_stream = nullptr;
933 send_streams_error:
934 error_no_stream:
935 end:
936 return ret;
937 }
938
939 /*
940 * Snapshot the whole metadata.
941 * RCU read-side lock must be held by the caller.
942 *
943 * Returns 0 on success, < 0 on error
944 */
945 static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
946 uint64_t key,
947 char *path,
948 uint64_t relayd_id,
949 struct lttng_consumer_local_data *ctx)
950 {
951 int ret = 0;
952 struct lttng_consumer_stream *metadata_stream;
953
954 LTTNG_ASSERT(path);
955 LTTNG_ASSERT(ctx);
956 ASSERT_RCU_READ_LOCKED();
957
958 DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
959
960 rcu_read_lock();
961
962 LTTNG_ASSERT(!metadata_channel->monitor);
963
964 health_code_update();
965
966 /*
967 * Ask the sessiond if we have new metadata waiting and update the
968 * consumer metadata cache.
969 */
970 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
971 if (ret < 0) {
972 goto error;
973 }
974
975 health_code_update();
976
977 /*
978 * The metadata stream is NOT created in no monitor mode when the channel
979 * is created on a sessiond ask channel command.
980 */
981 ret = create_ust_streams(metadata_channel, ctx);
982 if (ret < 0) {
983 goto error;
984 }
985
986 metadata_stream = metadata_channel->metadata_stream;
987 LTTNG_ASSERT(metadata_stream);
988
989 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
990 if (relayd_id != (uint64_t) -1ULL) {
991 metadata_stream->net_seq_idx = relayd_id;
992 ret = consumer_send_relayd_stream(metadata_stream, path);
993 } else {
994 ret = consumer_stream_create_output_files(metadata_stream, false);
995 }
996 if (ret < 0) {
997 goto error_stream;
998 }
999
1000 do {
1001 health_code_update();
1002 ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
1003 if (ret < 0) {
1004 goto error_stream;
1005 }
1006 } while (ret > 0);
1007
1008 error_stream:
1009 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
1010 /*
1011 * Clean up the stream completely because the next snapshot will use a
1012 * new metadata stream.
1013 */
1014 consumer_stream_destroy(metadata_stream, nullptr);
1015 metadata_channel->metadata_stream = nullptr;
1016
1017 error:
1018 rcu_read_unlock();
1019 return ret;
1020 }
1021
1022 static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr)
1023 {
1024 int ret;
1025 unsigned long mmap_offset;
1026 const char *mmap_base;
1027
1028 mmap_base = (const char *) lttng_ust_ctl_get_mmap_base(stream->ustream);
1029 if (!mmap_base) {
1030 ERR("Failed to get mmap base for stream `%s`", stream->name);
1031 ret = -EPERM;
1032 goto error;
1033 }
1034
1035 ret = lttng_ust_ctl_get_mmap_read_offset(stream->ustream, &mmap_offset);
1036 if (ret != 0) {
1037 ERR("Failed to get mmap offset for stream `%s`", stream->name);
1038 ret = -EINVAL;
1039 goto error;
1040 }
1041
1042 *addr = mmap_base + mmap_offset;
1043 error:
1044 return ret;
1045 }
1046
1047 /*
1048 * Take a snapshot of all the stream of a channel.
1049 * RCU read-side lock and the channel lock must be held by the caller.
1050 *
1051 * Returns 0 on success, < 0 on error
1052 */
1053 static int snapshot_channel(struct lttng_consumer_channel *channel,
1054 uint64_t key,
1055 char *path,
1056 uint64_t relayd_id,
1057 uint64_t nb_packets_per_stream,
1058 struct lttng_consumer_local_data *ctx)
1059 {
1060 int ret;
1061 unsigned use_relayd = 0;
1062 unsigned long consumed_pos, produced_pos;
1063 struct lttng_consumer_stream *stream;
1064
1065 LTTNG_ASSERT(path);
1066 LTTNG_ASSERT(ctx);
1067 ASSERT_RCU_READ_LOCKED();
1068
1069 rcu_read_lock();
1070
1071 if (relayd_id != (uint64_t) -1ULL) {
1072 use_relayd = 1;
1073 }
1074
1075 LTTNG_ASSERT(!channel->monitor);
1076 DBG("UST consumer snapshot channel %" PRIu64, key);
1077
1078 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
1079 health_code_update();
1080
1081 /* Lock stream because we are about to change its state. */
1082 pthread_mutex_lock(&stream->lock);
1083 LTTNG_ASSERT(channel->trace_chunk);
1084 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
1085 /*
1086 * Can't happen barring an internal error as the channel
1087 * holds a reference to the trace chunk.
1088 */
1089 ERR("Failed to acquire reference to channel's trace chunk");
1090 ret = -1;
1091 goto error_unlock;
1092 }
1093 LTTNG_ASSERT(!stream->trace_chunk);
1094 stream->trace_chunk = channel->trace_chunk;
1095
1096 stream->net_seq_idx = relayd_id;
1097
1098 if (use_relayd) {
1099 ret = consumer_send_relayd_stream(stream, path);
1100 if (ret < 0) {
1101 goto error_close_stream;
1102 }
1103 } else {
1104 ret = consumer_stream_create_output_files(stream, false);
1105 if (ret < 0) {
1106 goto error_close_stream;
1107 }
1108 DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
1109 }
1110
1111 /*
1112 * If tracing is active, we want to perform a "full" buffer flush.
1113 * Else, if quiescent, it has already been done by the prior stop.
1114 */
1115 if (!stream->quiescent) {
1116 ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
1117 if (ret < 0) {
1118 ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64
1119 ", channel name = '%s'",
1120 channel->key,
1121 channel->name);
1122 goto error_unlock;
1123 }
1124 }
1125
1126 ret = lttng_ustconsumer_take_snapshot(stream);
1127 if (ret < 0) {
1128 ERR("Taking UST snapshot");
1129 goto error_close_stream;
1130 }
1131
1132 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
1133 if (ret < 0) {
1134 ERR("Produced UST snapshot position");
1135 goto error_close_stream;
1136 }
1137
1138 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
1139 if (ret < 0) {
1140 ERR("Consumerd UST snapshot position");
1141 goto error_close_stream;
1142 }
1143
1144 /*
1145 * The original value is sent back if max stream size is larger than
1146 * the possible size of the snapshot. Also, we assume that the session
1147 * daemon should never send a maximum stream size that is lower than
1148 * subbuffer size.
1149 */
1150 consumed_pos = consumer_get_consume_start_pos(
1151 consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size);
1152
1153 while ((long) (consumed_pos - produced_pos) < 0) {
1154 ssize_t read_len;
1155 unsigned long len, padded_len;
1156 const char *subbuf_addr;
1157 struct lttng_buffer_view subbuf_view;
1158
1159 health_code_update();
1160
1161 DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
1162
1163 ret = lttng_ust_ctl_get_subbuf(stream->ustream, &consumed_pos);
1164 if (ret < 0) {
1165 if (ret != -EAGAIN) {
1166 PERROR("lttng_ust_ctl_get_subbuf snapshot");
1167 goto error_close_stream;
1168 }
1169 DBG("UST consumer get subbuf failed. Skipping it.");
1170 consumed_pos += stream->max_sb_size;
1171 stream->chan->lost_packets++;
1172 continue;
1173 }
1174
1175 ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len);
1176 if (ret < 0) {
1177 ERR("Snapshot lttng_ust_ctl_get_subbuf_size");
1178 goto error_put_subbuf;
1179 }
1180
1181 ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len);
1182 if (ret < 0) {
1183 ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size");
1184 goto error_put_subbuf;
1185 }
1186
1187 ret = get_current_subbuf_addr(stream, &subbuf_addr);
1188 if (ret) {
1189 goto error_put_subbuf;
1190 }
1191
1192 subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
1193 read_len = lttng_consumer_on_read_subbuffer_mmap(
1194 stream, &subbuf_view, padded_len - len);
1195 if (use_relayd) {
1196 if (read_len != len) {
1197 ret = -EPERM;
1198 goto error_put_subbuf;
1199 }
1200 } else {
1201 if (read_len != padded_len) {
1202 ret = -EPERM;
1203 goto error_put_subbuf;
1204 }
1205 }
1206
1207 ret = lttng_ust_ctl_put_subbuf(stream->ustream);
1208 if (ret < 0) {
1209 ERR("Snapshot lttng_ust_ctl_put_subbuf");
1210 goto error_close_stream;
1211 }
1212 consumed_pos += stream->max_sb_size;
1213 }
1214
1215 /* Simply close the stream so we can use it on the next snapshot. */
1216 consumer_stream_close_output(stream);
1217 pthread_mutex_unlock(&stream->lock);
1218 }
1219
1220 rcu_read_unlock();
1221 return 0;
1222
1223 error_put_subbuf:
1224 if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
1225 ERR("Snapshot lttng_ust_ctl_put_subbuf");
1226 }
1227 error_close_stream:
1228 consumer_stream_close_output(stream);
1229 error_unlock:
1230 pthread_mutex_unlock(&stream->lock);
1231 rcu_read_unlock();
1232 return ret;
1233 }
1234
1235 static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)
1236 {
1237 ASSERT_LOCKED(stream->lock);
1238
1239 DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id);
1240 stream->ust_metadata_pushed = 0;
1241 }
1242
1243 /*
1244 * Receive the metadata updates from the sessiond. Supports receiving
1245 * overlapping metadata, but is needs to always belong to a contiguous
1246 * range starting from 0.
1247 * Be careful about the locks held when calling this function: it needs
1248 * the metadata cache flush to concurrently progress in order to
1249 * complete.
1250 */
1251 int lttng_ustconsumer_recv_metadata(int sock,
1252 uint64_t key,
1253 uint64_t offset,
1254 uint64_t len,
1255 uint64_t version,
1256 struct lttng_consumer_channel *channel,
1257 int timer,
1258 int wait)
1259 {
1260 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1261 char *metadata_str;
1262 enum consumer_metadata_cache_write_status cache_write_status;
1263
1264 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
1265
1266 metadata_str = calloc<char>(len);
1267 if (!metadata_str) {
1268 PERROR("zmalloc metadata string");
1269 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
1270 goto end;
1271 }
1272
1273 health_code_update();
1274
1275 /* Receive metadata string. */
1276 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
1277 if (ret < 0) {
1278 /* Session daemon is dead so return gracefully. */
1279 ret_code = ret;
1280 goto end_free;
1281 }
1282
1283 health_code_update();
1284
1285 pthread_mutex_lock(&channel->metadata_cache->lock);
1286 cache_write_status = consumer_metadata_cache_write(
1287 channel->metadata_cache, offset, len, version, metadata_str);
1288 pthread_mutex_unlock(&channel->metadata_cache->lock);
1289 switch (cache_write_status) {
1290 case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
1291 /*
1292 * The write entirely overlapped with existing contents of the
1293 * same metadata version (same content); there is nothing to do.
1294 */
1295 break;
1296 case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
1297 /*
1298 * The metadata cache was invalidated (previously pushed
1299 * content has been overwritten). Reset the stream's consumed
1300 * metadata position to ensure the metadata poll thread consumes
1301 * the whole cache.
1302 */
1303
1304 /*
1305 * channel::metadata_stream can be null when the metadata
1306 * channel is under a snapshot session type. No need to update
1307 * the stream position in that scenario.
1308 */
1309 if (channel->metadata_stream != nullptr) {
1310 pthread_mutex_lock(&channel->metadata_stream->lock);
1311 metadata_stream_reset_cache_consumed_position(channel->metadata_stream);
1312 pthread_mutex_unlock(&channel->metadata_stream->lock);
1313 } else {
1314 /* Validate we are in snapshot mode. */
1315 LTTNG_ASSERT(!channel->monitor);
1316 }
1317 /* Fall-through. */
1318 case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
1319 /*
1320 * In both cases, the metadata poll thread has new data to
1321 * consume.
1322 */
1323 ret = consumer_metadata_wakeup_pipe(channel);
1324 if (ret) {
1325 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
1326 goto end_free;
1327 }
1328 break;
1329 case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
1330 /* Unable to handle metadata. Notify session daemon. */
1331 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
1332 /*
1333 * Skip metadata flush on write error since the offset and len might
1334 * not have been updated which could create an infinite loop below when
1335 * waiting for the metadata cache to be flushed.
1336 */
1337 goto end_free;
1338 default:
1339 abort();
1340 }
1341
1342 if (!wait) {
1343 goto end_free;
1344 }
1345 while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
1346 DBG("Waiting for metadata to be flushed");
1347
1348 health_code_update();
1349
1350 usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
1351 }
1352
1353 end_free:
1354 free(metadata_str);
1355 end:
1356 return ret_code;
1357 }
1358
1359 /*
1360 * Receive command from session daemon and process it.
1361 *
1362 * Return 1 on success else a negative value or 0.
1363 */
1364 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1365 int sock,
1366 struct pollfd *consumer_sockpoll)
1367 {
1368 int ret_func;
1369 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1370 struct lttcomm_consumer_msg msg;
1371 struct lttng_consumer_channel *channel = nullptr;
1372
1373 health_code_update();
1374
1375 {
1376 ssize_t ret_recv;
1377
1378 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
1379 if (ret_recv != sizeof(msg)) {
1380 DBG("Consumer received unexpected message size %zd (expects %zu)",
1381 ret_recv,
1382 sizeof(msg));
1383 /*
1384 * The ret value might 0 meaning an orderly shutdown but this is ok
1385 * since the caller handles this.
1386 */
1387 if (ret_recv > 0) {
1388 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
1389 ret_recv = -1;
1390 }
1391 return ret_recv;
1392 }
1393 }
1394
1395 health_code_update();
1396
1397 /* deprecated */
1398 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
1399
1400 health_code_update();
1401
1402 /* relayd needs RCU read-side lock */
1403 rcu_read_lock();
1404
1405 switch (msg.cmd_type) {
1406 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
1407 {
1408 uint32_t major = msg.u.relayd_sock.major;
1409 uint32_t minor = msg.u.relayd_sock.minor;
1410 enum lttcomm_sock_proto protocol =
1411 (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
1412
1413 /* Session daemon status message are handled in the following call. */
1414 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
1415 msg.u.relayd_sock.type,
1416 ctx,
1417 sock,
1418 consumer_sockpoll,
1419 msg.u.relayd_sock.session_id,
1420 msg.u.relayd_sock.relayd_session_id,
1421 major,
1422 minor,
1423 protocol);
1424 goto end_nosignal;
1425 }
1426 case LTTNG_CONSUMER_DESTROY_RELAYD:
1427 {
1428 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
1429 struct consumer_relayd_sock_pair *relayd;
1430
1431 DBG("UST consumer destroying relayd %" PRIu64, index);
1432
1433 /* Get relayd reference if exists. */
1434 relayd = consumer_find_relayd(index);
1435 if (relayd == nullptr) {
1436 DBG("Unable to find relayd %" PRIu64, index);
1437 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
1438 }
1439
1440 /*
1441 * Each relayd socket pair has a refcount of stream attached to it
1442 * which tells if the relayd is still active or not depending on the
1443 * refcount value.
1444 *
1445 * This will set the destroy flag of the relayd object and destroy it
1446 * if the refcount reaches zero when called.
1447 *
1448 * The destroy can happen either here or when a stream fd hangs up.
1449 */
1450 if (relayd) {
1451 consumer_flag_relayd_for_destroy(relayd);
1452 }
1453
1454 goto end_msg_sessiond;
1455 }
1456 case LTTNG_CONSUMER_UPDATE_STREAM:
1457 {
1458 rcu_read_unlock();
1459 return -ENOSYS;
1460 }
1461 case LTTNG_CONSUMER_DATA_PENDING:
1462 {
1463 int is_data_pending;
1464 ssize_t ret_send;
1465 uint64_t id = msg.u.data_pending.session_id;
1466
1467 DBG("UST consumer data pending command for id %" PRIu64, id);
1468
1469 is_data_pending = consumer_data_pending(id);
1470
1471 /* Send back returned value to session daemon */
1472 ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, sizeof(is_data_pending));
1473 if (ret_send < 0) {
1474 DBG("Error when sending the data pending ret code: %zd", ret_send);
1475 goto error_fatal;
1476 }
1477
1478 /*
1479 * No need to send back a status message since the data pending
1480 * returned value is the response.
1481 */
1482 break;
1483 }
1484 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
1485 {
1486 int ret_ask_channel, ret_add_channel, ret_send;
1487 struct lttng_ust_ctl_consumer_channel_attr attr;
1488 const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value;
1489 const struct lttng_credentials buffer_credentials = {
1490 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.uid),
1491 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.gid),
1492 };
1493
1494 /* Create a plain object and reserve a channel key. */
1495 channel = consumer_allocate_channel(
1496 msg.u.ask_channel.key,
1497 msg.u.ask_channel.session_id,
1498 msg.u.ask_channel.chunk_id.is_set ? &chunk_id : nullptr,
1499 msg.u.ask_channel.pathname,
1500 msg.u.ask_channel.name,
1501 msg.u.ask_channel.relayd_id,
1502 (enum lttng_event_output) msg.u.ask_channel.output,
1503 msg.u.ask_channel.tracefile_size,
1504 msg.u.ask_channel.tracefile_count,
1505 msg.u.ask_channel.session_id_per_pid,
1506 msg.u.ask_channel.monitor,
1507 msg.u.ask_channel.live_timer_interval,
1508 msg.u.ask_channel.is_live,
1509 msg.u.ask_channel.root_shm_path,
1510 msg.u.ask_channel.shm_path);
1511 if (!channel) {
1512 goto end_channel_error;
1513 }
1514
1515 LTTNG_OPTIONAL_SET(&channel->buffer_credentials, buffer_credentials);
1516
1517 /*
1518 * Assign UST application UID to the channel. This value is ignored for
1519 * per PID buffers. This is specific to UST thus setting this after the
1520 * allocation.
1521 */
1522 channel->ust_app_uid = msg.u.ask_channel.ust_app_uid;
1523
1524 /* Build channel attributes from received message. */
1525 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
1526 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
1527 attr.overwrite = msg.u.ask_channel.overwrite;
1528 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
1529 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
1530 attr.chan_id = msg.u.ask_channel.chan_id;
1531 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
1532 attr.blocking_timeout = msg.u.ask_channel.blocking_timeout;
1533
1534 /* Match channel buffer type to the UST abi. */
1535 switch (msg.u.ask_channel.output) {
1536 case LTTNG_EVENT_MMAP:
1537 default:
1538 attr.output = LTTNG_UST_ABI_MMAP;
1539 break;
1540 }
1541
1542 /* Translate and save channel type. */
1543 switch (msg.u.ask_channel.type) {
1544 case LTTNG_UST_ABI_CHAN_PER_CPU:
1545 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
1546 attr.type = LTTNG_UST_ABI_CHAN_PER_CPU;
1547 /*
1548 * Set refcount to 1 for owner. Below, we will
1549 * pass ownership to the
1550 * consumer_thread_channel_poll() thread.
1551 */
1552 channel->refcount = 1;
1553 break;
1554 case LTTNG_UST_ABI_CHAN_METADATA:
1555 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
1556 attr.type = LTTNG_UST_ABI_CHAN_METADATA;
1557 break;
1558 default:
1559 abort();
1560 goto error_fatal;
1561 };
1562
1563 health_code_update();
1564
1565 ret_ask_channel = ask_channel(ctx, channel, &attr);
1566 if (ret_ask_channel < 0) {
1567 goto end_channel_error;
1568 }
1569
1570 if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) {
1571 int ret_allocate;
1572
1573 ret_allocate = consumer_metadata_cache_allocate(channel);
1574 if (ret_allocate < 0) {
1575 ERR("Allocating metadata cache");
1576 goto end_channel_error;
1577 }
1578 consumer_timer_switch_start(channel, attr.switch_timer_interval);
1579 attr.switch_timer_interval = 0;
1580 } else {
1581 int monitor_start_ret;
1582
1583 consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval);
1584 monitor_start_ret = consumer_timer_monitor_start(
1585 channel, msg.u.ask_channel.monitor_timer_interval);
1586 if (monitor_start_ret < 0) {
1587 ERR("Starting channel monitoring timer failed");
1588 goto end_channel_error;
1589 }
1590 }
1591
1592 health_code_update();
1593
1594 /*
1595 * Add the channel to the internal state AFTER all streams were created
1596 * and successfully sent to session daemon. This way, all streams must
1597 * be ready before this channel is visible to the threads.
1598 * If add_channel succeeds, ownership of the channel is
1599 * passed to consumer_thread_channel_poll().
1600 */
1601 ret_add_channel = add_channel(channel, ctx);
1602 if (ret_add_channel < 0) {
1603 if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) {
1604 if (channel->switch_timer_enabled == 1) {
1605 consumer_timer_switch_stop(channel);
1606 }
1607 consumer_metadata_cache_destroy(channel);
1608 }
1609 if (channel->live_timer_enabled == 1) {
1610 consumer_timer_live_stop(channel);
1611 }
1612 if (channel->monitor_timer_enabled == 1) {
1613 consumer_timer_monitor_stop(channel);
1614 }
1615 goto end_channel_error;
1616 }
1617
1618 health_code_update();
1619
1620 /*
1621 * Channel and streams are now created. Inform the session daemon that
1622 * everything went well and should wait to receive the channel and
1623 * streams with ustctl API.
1624 */
1625 ret_send = consumer_send_status_channel(sock, channel);
1626 if (ret_send < 0) {
1627 /*
1628 * There is probably a problem on the socket.
1629 */
1630 goto error_fatal;
1631 }
1632
1633 break;
1634 }
1635 case LTTNG_CONSUMER_GET_CHANNEL:
1636 {
1637 int ret, relayd_err = 0;
1638 uint64_t key = msg.u.get_channel.key;
1639 struct lttng_consumer_channel *found_channel;
1640
1641 found_channel = consumer_find_channel(key);
1642 if (!found_channel) {
1643 ERR("UST consumer get channel key %" PRIu64 " not found", key);
1644 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1645 goto end_get_channel;
1646 }
1647
1648 health_code_update();
1649
1650 /* Send the channel to sessiond (and relayd, if applicable). */
1651 ret = send_channel_to_sessiond_and_relayd(sock, found_channel, ctx, &relayd_err);
1652 if (ret < 0) {
1653 if (relayd_err) {
1654 /*
1655 * We were unable to send to the relayd the stream so avoid
1656 * sending back a fatal error to the thread since this is OK
1657 * and the consumer can continue its work. The above call
1658 * has sent the error status message to the sessiond.
1659 */
1660 goto end_get_channel_nosignal;
1661 }
1662 /*
1663 * The communicaton was broken hence there is a bad state between
1664 * the consumer and sessiond so stop everything.
1665 */
1666 goto error_get_channel_fatal;
1667 }
1668
1669 health_code_update();
1670
1671 /*
1672 * In no monitor mode, the streams ownership is kept inside the channel
1673 * so don't send them to the data thread.
1674 */
1675 if (!found_channel->monitor) {
1676 goto end_get_channel;
1677 }
1678
1679 ret = send_streams_to_thread(found_channel, ctx);
1680 if (ret < 0) {
1681 /*
1682 * If we are unable to send the stream to the thread, there is
1683 * a big problem so just stop everything.
1684 */
1685 goto error_get_channel_fatal;
1686 }
1687 /* List MUST be empty after or else it could be reused. */
1688 LTTNG_ASSERT(cds_list_empty(&found_channel->streams.head));
1689 end_get_channel:
1690 goto end_msg_sessiond;
1691 error_get_channel_fatal:
1692 goto error_fatal;
1693 end_get_channel_nosignal:
1694 goto end_nosignal;
1695 }
1696 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1697 {
1698 uint64_t key = msg.u.destroy_channel.key;
1699
1700 /*
1701 * Only called if streams have not been sent to stream
1702 * manager thread. However, channel has been sent to
1703 * channel manager thread.
1704 */
1705 notify_thread_del_channel(ctx, key);
1706 goto end_msg_sessiond;
1707 }
1708 case LTTNG_CONSUMER_CLOSE_METADATA:
1709 {
1710 int ret;
1711
1712 ret = close_metadata(msg.u.close_metadata.key);
1713 if (ret != 0) {
1714 ret_code = (lttcomm_return_code) ret;
1715 }
1716
1717 goto end_msg_sessiond;
1718 }
1719 case LTTNG_CONSUMER_FLUSH_CHANNEL:
1720 {
1721 int ret;
1722
1723 ret = flush_channel(msg.u.flush_channel.key);
1724 if (ret != 0) {
1725 ret_code = (lttcomm_return_code) ret;
1726 }
1727
1728 goto end_msg_sessiond;
1729 }
1730 case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
1731 {
1732 int ret;
1733
1734 ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key);
1735 if (ret != 0) {
1736 ret_code = (lttcomm_return_code) ret;
1737 }
1738
1739 goto end_msg_sessiond;
1740 }
1741 case LTTNG_CONSUMER_PUSH_METADATA:
1742 {
1743 int ret;
1744 uint64_t len = msg.u.push_metadata.len;
1745 uint64_t key = msg.u.push_metadata.key;
1746 uint64_t offset = msg.u.push_metadata.target_offset;
1747 uint64_t version = msg.u.push_metadata.version;
1748 struct lttng_consumer_channel *found_channel;
1749
1750 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
1751
1752 found_channel = consumer_find_channel(key);
1753 if (!found_channel) {
1754 /*
1755 * This is possible if the metadata creation on the consumer side
1756 * is in flight vis-a-vis a concurrent push metadata from the
1757 * session daemon. Simply return that the channel failed and the
1758 * session daemon will handle that message correctly considering
1759 * that this race is acceptable thus the DBG() statement here.
1760 */
1761 DBG("UST consumer push metadata %" PRIu64 " not found", key);
1762 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
1763 goto end_push_metadata_msg_sessiond;
1764 }
1765
1766 health_code_update();
1767
1768 if (!len) {
1769 /*
1770 * There is nothing to receive. We have simply
1771 * checked whether the channel can be found.
1772 */
1773 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1774 goto end_push_metadata_msg_sessiond;
1775 }
1776
1777 /* Tell session daemon we are ready to receive the metadata. */
1778 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
1779 if (ret < 0) {
1780 /* Somehow, the session daemon is not responding anymore. */
1781 goto error_push_metadata_fatal;
1782 }
1783
1784 health_code_update();
1785
1786 /* Wait for more data. */
1787 health_poll_entry();
1788 ret = lttng_consumer_poll_socket(consumer_sockpoll);
1789 health_poll_exit();
1790 if (ret) {
1791 goto error_push_metadata_fatal;
1792 }
1793
1794 health_code_update();
1795
1796 ret = lttng_ustconsumer_recv_metadata(
1797 sock, key, offset, len, version, found_channel, 0, 1);
1798 if (ret < 0) {
1799 /* error receiving from sessiond */
1800 goto error_push_metadata_fatal;
1801 } else {
1802 ret_code = (lttcomm_return_code) ret;
1803 goto end_push_metadata_msg_sessiond;
1804 }
1805 end_push_metadata_msg_sessiond:
1806 goto end_msg_sessiond;
1807 error_push_metadata_fatal:
1808 goto error_fatal;
1809 }
1810 case LTTNG_CONSUMER_SETUP_METADATA:
1811 {
1812 int ret;
1813
1814 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
1815 if (ret) {
1816 ret_code = (lttcomm_return_code) ret;
1817 }
1818 goto end_msg_sessiond;
1819 }
1820 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
1821 {
1822 struct lttng_consumer_channel *found_channel;
1823 uint64_t key = msg.u.snapshot_channel.key;
1824 int ret_send;
1825
1826 found_channel = consumer_find_channel(key);
1827 if (!found_channel) {
1828 DBG("UST snapshot channel not found for key %" PRIu64, key);
1829 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1830 } else {
1831 if (msg.u.snapshot_channel.metadata) {
1832 int ret_snapshot;
1833
1834 ret_snapshot = snapshot_metadata(found_channel,
1835 key,
1836 msg.u.snapshot_channel.pathname,
1837 msg.u.snapshot_channel.relayd_id,
1838 ctx);
1839 if (ret_snapshot < 0) {
1840 ERR("Snapshot metadata failed");
1841 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
1842 }
1843 } else {
1844 int ret_snapshot;
1845
1846 ret_snapshot = snapshot_channel(
1847 found_channel,
1848 key,
1849 msg.u.snapshot_channel.pathname,
1850 msg.u.snapshot_channel.relayd_id,
1851 msg.u.snapshot_channel.nb_packets_per_stream,
1852 ctx);
1853 if (ret_snapshot < 0) {
1854 ERR("Snapshot channel failed");
1855 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
1856 }
1857 }
1858 }
1859 health_code_update();
1860 ret_send = consumer_send_status_msg(sock, ret_code);
1861 if (ret_send < 0) {
1862 /* Somehow, the session daemon is not responding anymore. */
1863 goto end_nosignal;
1864 }
1865 health_code_update();
1866 break;
1867 }
1868 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1869 {
1870 int ret = 0;
1871 uint64_t discarded_events;
1872 struct lttng_ht_iter iter;
1873 struct lttng_ht *ht;
1874 struct lttng_consumer_stream *stream;
1875 uint64_t id = msg.u.discarded_events.session_id;
1876 uint64_t key = msg.u.discarded_events.channel_key;
1877
1878 DBG("UST consumer discarded events command for session id %" PRIu64, id);
1879 rcu_read_lock();
1880 pthread_mutex_lock(&the_consumer_data.lock);
1881
1882 ht = the_consumer_data.stream_list_ht;
1883
1884 /*
1885 * We only need a reference to the channel, but they are not
1886 * directly indexed, so we just use the first matching stream
1887 * to extract the information we need, we default to 0 if not
1888 * found (no events are dropped if the channel is not yet in
1889 * use).
1890 */
1891 discarded_events = 0;
1892 cds_lfht_for_each_entry_duplicate(ht->ht,
1893 ht->hash_fct(&id, lttng_ht_seed),
1894 ht->match_fct,
1895 &id,
1896 &iter.iter,
1897 stream,
1898 node_session_id.node)
1899 {
1900 if (stream->chan->key == key) {
1901 discarded_events = stream->chan->discarded_events;
1902 break;
1903 }
1904 }
1905 pthread_mutex_unlock(&the_consumer_data.lock);
1906 rcu_read_unlock();
1907
1908 DBG("UST consumer discarded events command for session id %" PRIu64
1909 ", channel key %" PRIu64,
1910 id,
1911 key);
1912
1913 health_code_update();
1914
1915 /* Send back returned value to session daemon */
1916 ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
1917 if (ret < 0) {
1918 PERROR("send discarded events");
1919 goto error_fatal;
1920 }
1921
1922 break;
1923 }
1924 case LTTNG_CONSUMER_LOST_PACKETS:
1925 {
1926 int ret;
1927 uint64_t lost_packets;
1928 struct lttng_ht_iter iter;
1929 struct lttng_ht *ht;
1930 struct lttng_consumer_stream *stream;
1931 uint64_t id = msg.u.lost_packets.session_id;
1932 uint64_t key = msg.u.lost_packets.channel_key;
1933
1934 DBG("UST consumer lost packets command for session id %" PRIu64, id);
1935 rcu_read_lock();
1936 pthread_mutex_lock(&the_consumer_data.lock);
1937
1938 ht = the_consumer_data.stream_list_ht;
1939
1940 /*
1941 * We only need a reference to the channel, but they are not
1942 * directly indexed, so we just use the first matching stream
1943 * to extract the information we need, we default to 0 if not
1944 * found (no packets lost if the channel is not yet in use).
1945 */
1946 lost_packets = 0;
1947 cds_lfht_for_each_entry_duplicate(ht->ht,
1948 ht->hash_fct(&id, lttng_ht_seed),
1949 ht->match_fct,
1950 &id,
1951 &iter.iter,
1952 stream,
1953 node_session_id.node)
1954 {
1955 if (stream->chan->key == key) {
1956 lost_packets = stream->chan->lost_packets;
1957 break;
1958 }
1959 }
1960 pthread_mutex_unlock(&the_consumer_data.lock);
1961 rcu_read_unlock();
1962
1963 DBG("UST consumer lost packets command for session id %" PRIu64
1964 ", channel key %" PRIu64,
1965 id,
1966 key);
1967
1968 health_code_update();
1969
1970 /* Send back returned value to session daemon */
1971 ret = lttcomm_send_unix_sock(sock, &lost_packets, sizeof(lost_packets));
1972 if (ret < 0) {
1973 PERROR("send lost packets");
1974 goto error_fatal;
1975 }
1976
1977 break;
1978 }
1979 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1980 {
1981 int channel_monitor_pipe, ret_send, ret_set_channel_monitor_pipe;
1982 ssize_t ret_recv;
1983
1984 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1985 /* Successfully received the command's type. */
1986 ret_send = consumer_send_status_msg(sock, ret_code);
1987 if (ret_send < 0) {
1988 goto error_fatal;
1989 }
1990
1991 ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1);
1992 if (ret_recv != sizeof(channel_monitor_pipe)) {
1993 ERR("Failed to receive channel monitor pipe");
1994 goto error_fatal;
1995 }
1996
1997 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1998 ret_set_channel_monitor_pipe =
1999 consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe);
2000 if (!ret_set_channel_monitor_pipe) {
2001 int flags;
2002 int ret_fcntl;
2003
2004 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
2005 /* Set the pipe as non-blocking. */
2006 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
2007 if (ret_fcntl == -1) {
2008 PERROR("fcntl get flags of the channel monitoring pipe");
2009 goto error_fatal;
2010 }
2011 flags = ret_fcntl;
2012
2013 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK);
2014 if (ret_fcntl == -1) {
2015 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
2016 goto error_fatal;
2017 }
2018 DBG("Channel monitor pipe set as non-blocking");
2019 } else {
2020 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
2021 }
2022 goto end_msg_sessiond;
2023 }
2024 case LTTNG_CONSUMER_ROTATE_CHANNEL:
2025 {
2026 struct lttng_consumer_channel *found_channel;
2027 uint64_t key = msg.u.rotate_channel.key;
2028 int ret_send_status;
2029
2030 found_channel = consumer_find_channel(key);
2031 if (!found_channel) {
2032 DBG("Channel %" PRIu64 " not found", key);
2033 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2034 } else {
2035 int rotate_channel;
2036
2037 /*
2038 * Sample the rotate position of all the streams in
2039 * this channel.
2040 */
2041 rotate_channel = lttng_consumer_rotate_channel(
2042 found_channel, key, msg.u.rotate_channel.relayd_id);
2043 if (rotate_channel < 0) {
2044 ERR("Rotate channel failed");
2045 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
2046 }
2047
2048 health_code_update();
2049 }
2050
2051 ret_send_status = consumer_send_status_msg(sock, ret_code);
2052 if (ret_send_status < 0) {
2053 /* Somehow, the session daemon is not responding anymore. */
2054 goto end_rotate_channel_nosignal;
2055 }
2056
2057 /*
2058 * Rotate the streams that are ready right now.
2059 * FIXME: this is a second consecutive iteration over the
2060 * streams in a channel, there is probably a better way to
2061 * handle this, but it needs to be after the
2062 * consumer_send_status_msg() call.
2063 */
2064 if (found_channel) {
2065 int ret_rotate_read_streams;
2066
2067 ret_rotate_read_streams =
2068 lttng_consumer_rotate_ready_streams(found_channel, key);
2069 if (ret_rotate_read_streams < 0) {
2070 ERR("Rotate channel failed");
2071 }
2072 }
2073 break;
2074 end_rotate_channel_nosignal:
2075 goto end_nosignal;
2076 }
2077 case LTTNG_CONSUMER_CLEAR_CHANNEL:
2078 {
2079 struct lttng_consumer_channel *found_channel;
2080 uint64_t key = msg.u.clear_channel.key;
2081 int ret_send_status;
2082
2083 found_channel = consumer_find_channel(key);
2084 if (!found_channel) {
2085 DBG("Channel %" PRIu64 " not found", key);
2086 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2087 } else {
2088 int ret_clear_channel;
2089
2090 ret_clear_channel = lttng_consumer_clear_channel(found_channel);
2091 if (ret_clear_channel) {
2092 ERR("Clear channel failed key %" PRIu64, key);
2093 ret_code = (lttcomm_return_code) ret_clear_channel;
2094 }
2095
2096 health_code_update();
2097 }
2098 ret_send_status = consumer_send_status_msg(sock, ret_code);
2099 if (ret_send_status < 0) {
2100 /* Somehow, the session daemon is not responding anymore. */
2101 goto end_nosignal;
2102 }
2103 break;
2104 }
2105 case LTTNG_CONSUMER_INIT:
2106 {
2107 int ret_send_status;
2108 lttng_uuid sessiond_uuid;
2109
2110 std::copy(std::begin(msg.u.init.sessiond_uuid),
2111 std::end(msg.u.init.sessiond_uuid),
2112 sessiond_uuid.begin());
2113 ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
2114 health_code_update();
2115 ret_send_status = consumer_send_status_msg(sock, ret_code);
2116 if (ret_send_status < 0) {
2117 /* Somehow, the session daemon is not responding anymore. */
2118 goto end_nosignal;
2119 }
2120 break;
2121 }
2122 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
2123 {
2124 const struct lttng_credentials credentials = {
2125 .uid = LTTNG_OPTIONAL_INIT_VALUE(
2126 msg.u.create_trace_chunk.credentials.value.uid),
2127 .gid = LTTNG_OPTIONAL_INIT_VALUE(
2128 msg.u.create_trace_chunk.credentials.value.gid),
2129 };
2130 const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set;
2131 const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
2132 const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
2133 msg.u.create_trace_chunk.override_name :
2134 nullptr;
2135 struct lttng_directory_handle *chunk_directory_handle = nullptr;
2136
2137 /*
2138 * The session daemon will only provide a chunk directory file
2139 * descriptor for local traces.
2140 */
2141 if (is_local_trace) {
2142 int chunk_dirfd;
2143 int ret_send_status;
2144 ssize_t ret_recv;
2145
2146 /* Acnowledge the reception of the command. */
2147 ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
2148 if (ret_send_status < 0) {
2149 /* Somehow, the session daemon is not responding anymore. */
2150 goto end_nosignal;
2151 }
2152
2153 /*
2154 * Receive trace chunk domain dirfd.
2155 */
2156 ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
2157 if (ret_recv != sizeof(chunk_dirfd)) {
2158 ERR("Failed to receive trace chunk domain directory file descriptor");
2159 goto error_fatal;
2160 }
2161
2162 DBG("Received trace chunk domain directory fd (%d)", chunk_dirfd);
2163 chunk_directory_handle =
2164 lttng_directory_handle_create_from_dirfd(chunk_dirfd);
2165 if (!chunk_directory_handle) {
2166 ERR("Failed to initialize chunk domain directory handle from directory file descriptor");
2167 if (close(chunk_dirfd)) {
2168 PERROR("Failed to close chunk directory file descriptor");
2169 }
2170 goto error_fatal;
2171 }
2172 }
2173
2174 ret_code = lttng_consumer_create_trace_chunk(
2175 !is_local_trace ? &relayd_id : nullptr,
2176 msg.u.create_trace_chunk.session_id,
2177 msg.u.create_trace_chunk.chunk_id,
2178 (time_t) msg.u.create_trace_chunk.creation_timestamp,
2179 chunk_override_name,
2180 msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr,
2181 chunk_directory_handle);
2182 lttng_directory_handle_put(chunk_directory_handle);
2183 goto end_msg_sessiond;
2184 }
2185 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
2186 {
2187 enum lttng_trace_chunk_command_type close_command =
2188 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
2189 const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value;
2190 struct lttcomm_consumer_close_trace_chunk_reply reply;
2191 char closed_trace_chunk_path[LTTNG_PATH_MAX] = {};
2192 int ret;
2193
2194 ret_code = lttng_consumer_close_trace_chunk(
2195 msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr,
2196 msg.u.close_trace_chunk.session_id,
2197 msg.u.close_trace_chunk.chunk_id,
2198 (time_t) msg.u.close_trace_chunk.close_timestamp,
2199 msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr,
2200 closed_trace_chunk_path);
2201 reply.ret_code = ret_code;
2202 reply.path_length = strlen(closed_trace_chunk_path) + 1;
2203 ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
2204 if (ret != sizeof(reply)) {
2205 goto error_fatal;
2206 }
2207 ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, reply.path_length);
2208 if (ret != reply.path_length) {
2209 goto error_fatal;
2210 }
2211 goto end_nosignal;
2212 }
2213 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
2214 {
2215 const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
2216
2217 ret_code = lttng_consumer_trace_chunk_exists(
2218 msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr,
2219 msg.u.trace_chunk_exists.session_id,
2220 msg.u.trace_chunk_exists.chunk_id);
2221 goto end_msg_sessiond;
2222 }
2223 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
2224 {
2225 const uint64_t key = msg.u.open_channel_packets.key;
2226 struct lttng_consumer_channel *found_channel = consumer_find_channel(key);
2227
2228 if (found_channel) {
2229 pthread_mutex_lock(&found_channel->lock);
2230 ret_code = lttng_consumer_open_channel_packets(found_channel);
2231 pthread_mutex_unlock(&found_channel->lock);
2232 } else {
2233 /*
2234 * The channel could have disappeared in per-pid
2235 * buffering mode.
2236 */
2237 DBG("Channel %" PRIu64 " not found", key);
2238 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2239 }
2240
2241 health_code_update();
2242 goto end_msg_sessiond;
2243 }
2244 default:
2245 break;
2246 }
2247
2248 end_nosignal:
2249 /*
2250 * Return 1 to indicate success since the 0 value can be a socket
2251 * shutdown during the recv() or send() call.
2252 */
2253 ret_func = 1;
2254 goto end;
2255
2256 end_msg_sessiond:
2257 /*
2258 * The returned value here is not useful since either way we'll return 1 to
2259 * the caller because the session daemon socket management is done
2260 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
2261 */
2262 {
2263 int ret_send_status;
2264
2265 ret_send_status = consumer_send_status_msg(sock, ret_code);
2266 if (ret_send_status < 0) {
2267 goto error_fatal;
2268 }
2269 }
2270
2271 ret_func = 1;
2272 goto end;
2273
2274 end_channel_error:
2275 if (channel) {
2276 consumer_del_channel(channel);
2277 }
2278 /* We have to send a status channel message indicating an error. */
2279 {
2280 int ret_send_status;
2281
2282 ret_send_status = consumer_send_status_channel(sock, nullptr);
2283 if (ret_send_status < 0) {
2284 /* Stop everything if session daemon can not be notified. */
2285 goto error_fatal;
2286 }
2287 }
2288
2289 ret_func = 1;
2290 goto end;
2291
2292 error_fatal:
2293 /* This will issue a consumer stop. */
2294 ret_func = -1;
2295 goto end;
2296
2297 end:
2298 rcu_read_unlock();
2299 health_code_update();
2300 return ret_func;
2301 }
2302
2303 int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
2304 {
2305 LTTNG_ASSERT(stream);
2306 LTTNG_ASSERT(stream->ustream);
2307
2308 return lttng_ust_ctl_flush_buffer(stream->ustream, producer_active);
2309 }
2310
2311 /*
2312 * Take a snapshot for a specific stream.
2313 *
2314 * Returns 0 on success, < 0 on error
2315 */
2316 int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
2317 {
2318 LTTNG_ASSERT(stream);
2319 LTTNG_ASSERT(stream->ustream);
2320
2321 return lttng_ust_ctl_snapshot(stream->ustream);
2322 }
2323
2324 /*
2325 * Sample consumed and produced positions for a specific stream.
2326 *
2327 * Returns 0 on success, < 0 on error.
2328 */
2329 int lttng_ustconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
2330 {
2331 LTTNG_ASSERT(stream);
2332 LTTNG_ASSERT(stream->ustream);
2333
2334 return lttng_ust_ctl_snapshot_sample_positions(stream->ustream);
2335 }
2336
2337 /*
2338 * Get the produced position
2339 *
2340 * Returns 0 on success, < 0 on error
2341 */
2342 int lttng_ustconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
2343 unsigned long *pos)
2344 {
2345 LTTNG_ASSERT(stream);
2346 LTTNG_ASSERT(stream->ustream);
2347 LTTNG_ASSERT(pos);
2348
2349 return lttng_ust_ctl_snapshot_get_produced(stream->ustream, pos);
2350 }
2351
2352 /*
2353 * Get the consumed position
2354 *
2355 * Returns 0 on success, < 0 on error
2356 */
2357 int lttng_ustconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
2358 unsigned long *pos)
2359 {
2360 LTTNG_ASSERT(stream);
2361 LTTNG_ASSERT(stream->ustream);
2362 LTTNG_ASSERT(pos);
2363
2364 return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos);
2365 }
2366
2367 int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, int producer)
2368 {
2369 LTTNG_ASSERT(stream);
2370 LTTNG_ASSERT(stream->ustream);
2371
2372 return lttng_ust_ctl_flush_buffer(stream->ustream, producer);
2373 }
2374
2375 int lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream)
2376 {
2377 LTTNG_ASSERT(stream);
2378 LTTNG_ASSERT(stream->ustream);
2379
2380 return lttng_ust_ctl_clear_buffer(stream->ustream);
2381 }
2382
2383 int lttng_ustconsumer_get_current_timestamp(struct lttng_consumer_stream *stream, uint64_t *ts)
2384 {
2385 LTTNG_ASSERT(stream);
2386 LTTNG_ASSERT(stream->ustream);
2387 LTTNG_ASSERT(ts);
2388
2389 return lttng_ust_ctl_get_current_timestamp(stream->ustream, ts);
2390 }
2391
2392 int lttng_ustconsumer_get_sequence_number(struct lttng_consumer_stream *stream, uint64_t *seq)
2393 {
2394 LTTNG_ASSERT(stream);
2395 LTTNG_ASSERT(stream->ustream);
2396 LTTNG_ASSERT(seq);
2397
2398 return lttng_ust_ctl_get_sequence_number(stream->ustream, seq);
2399 }
2400
2401 /*
2402 * Called when the stream signals the consumer that it has hung up.
2403 */
2404 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
2405 {
2406 LTTNG_ASSERT(stream);
2407 LTTNG_ASSERT(stream->ustream);
2408
2409 pthread_mutex_lock(&stream->lock);
2410 if (!stream->quiescent) {
2411 if (lttng_ust_ctl_flush_buffer(stream->ustream, 0) < 0) {
2412 ERR("Failed to flush buffer on stream hang-up");
2413 } else {
2414 stream->quiescent = true;
2415 }
2416 }
2417
2418 stream->hangup_flush_done = 1;
2419 pthread_mutex_unlock(&stream->lock);
2420 }
2421
2422 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
2423 {
2424 int i;
2425
2426 LTTNG_ASSERT(chan);
2427 LTTNG_ASSERT(chan->uchan);
2428 LTTNG_ASSERT(chan->buffer_credentials.is_set);
2429
2430 if (chan->switch_timer_enabled == 1) {
2431 consumer_timer_switch_stop(chan);
2432 }
2433 for (i = 0; i < chan->nr_stream_fds; i++) {
2434 int ret;
2435
2436 ret = close(chan->stream_fds[i]);
2437 if (ret) {
2438 PERROR("close");
2439 }
2440 if (chan->shm_path[0]) {
2441 char shm_path[PATH_MAX];
2442
2443 ret = get_stream_shm_path(shm_path, chan->shm_path, i);
2444 if (ret) {
2445 ERR("Cannot get stream shm path");
2446 }
2447 ret = run_as_unlink(shm_path,
2448 lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
2449 chan->buffer_credentials)),
2450 lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
2451 chan->buffer_credentials)));
2452 if (ret) {
2453 PERROR("unlink %s", shm_path);
2454 }
2455 }
2456 }
2457 }
2458
2459 void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
2460 {
2461 LTTNG_ASSERT(chan);
2462 LTTNG_ASSERT(chan->uchan);
2463 LTTNG_ASSERT(chan->buffer_credentials.is_set);
2464
2465 consumer_metadata_cache_destroy(chan);
2466 lttng_ust_ctl_destroy_channel(chan->uchan);
2467 /* Try to rmdir all directories under shm_path root. */
2468 if (chan->root_shm_path[0]) {
2469 (void) run_as_rmdir_recursive(
2470 chan->root_shm_path,
2471 lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)),
2472 lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)),
2473 LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
2474 }
2475 free(chan->stream_fds);
2476 }
2477
2478 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
2479 {
2480 LTTNG_ASSERT(stream);
2481 LTTNG_ASSERT(stream->ustream);
2482
2483 if (stream->chan->switch_timer_enabled == 1) {
2484 consumer_timer_switch_stop(stream->chan);
2485 }
2486 lttng_ust_ctl_destroy_stream(stream->ustream);
2487 }
2488
2489 int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
2490 {
2491 LTTNG_ASSERT(stream);
2492 LTTNG_ASSERT(stream->ustream);
2493
2494 return lttng_ust_ctl_stream_get_wakeup_fd(stream->ustream);
2495 }
2496
2497 int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
2498 {
2499 LTTNG_ASSERT(stream);
2500 LTTNG_ASSERT(stream->ustream);
2501
2502 return lttng_ust_ctl_stream_close_wakeup_fd(stream->ustream);
2503 }
2504
2505 /*
2506 * Write up to one packet from the metadata cache to the channel.
2507 *
2508 * Returns the number of bytes pushed from the cache into the ring buffer, or a
2509 * negative value on error.
2510 */
2511 static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
2512 {
2513 ssize_t write_len;
2514 int ret;
2515
2516 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
2517 if (stream->chan->metadata_cache->contents.size == stream->ust_metadata_pushed) {
2518 /*
2519 * In the context of a user space metadata channel, a
2520 * change in version can be detected in two ways:
2521 * 1) During the pre-consume of the `read_subbuffer` loop,
2522 * 2) When populating the metadata ring buffer (i.e. here).
2523 *
2524 * This function is invoked when there is no metadata
2525 * available in the ring-buffer. If all data was consumed
2526 * up to the size of the metadata cache, there is no metadata
2527 * to insert in the ring-buffer.
2528 *
2529 * However, the metadata version could still have changed (a
2530 * regeneration without any new data will yield the same cache
2531 * size).
2532 *
2533 * The cache's version is checked for a version change and the
2534 * consumed position is reset if one occurred.
2535 *
2536 * This check is only necessary for the user space domain as
2537 * it has to manage the cache explicitly. If this reset was not
2538 * performed, no metadata would be consumed (and no reset would
2539 * occur as part of the pre-consume) until the metadata size
2540 * exceeded the cache size.
2541 */
2542 if (stream->metadata_version != stream->chan->metadata_cache->version) {
2543 metadata_stream_reset_cache_consumed_position(stream);
2544 consumer_stream_metadata_set_version(stream,
2545 stream->chan->metadata_cache->version);
2546 } else {
2547 ret = 0;
2548 goto end;
2549 }
2550 }
2551
2552 write_len = lttng_ust_ctl_write_one_packet_to_channel(
2553 stream->chan->uchan,
2554 &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed],
2555 stream->chan->metadata_cache->contents.size - stream->ust_metadata_pushed);
2556 LTTNG_ASSERT(write_len != 0);
2557 if (write_len < 0) {
2558 ERR("Writing one metadata packet");
2559 ret = write_len;
2560 goto end;
2561 }
2562 stream->ust_metadata_pushed += write_len;
2563
2564 LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
2565 ret = write_len;
2566
2567 /*
2568 * Switch packet (but don't open the next one) on every commit of
2569 * a metadata packet. Since the subbuffer is fully filled (with padding,
2570 * if needed), the stream is "quiescent" after this commit.
2571 */
2572 if (lttng_ust_ctl_flush_buffer(stream->ustream, 1)) {
2573 ERR("Failed to flush buffer while committing one metadata packet");
2574 ret = -EIO;
2575 } else {
2576 stream->quiescent = true;
2577 }
2578 end:
2579 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
2580 return ret;
2581 }
2582
2583 /*
2584 * Sync metadata meaning request them to the session daemon and snapshot to the
2585 * metadata thread can consumer them.
2586 *
2587 * Metadata stream lock is held here, but we need to release it when
2588 * interacting with sessiond, else we cause a deadlock with live
2589 * awaiting on metadata to be pushed out.
2590 *
2591 * The RCU read side lock must be held by the caller.
2592 */
2593 enum sync_metadata_status
2594 lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
2595 struct lttng_consumer_stream *metadata_stream)
2596 {
2597 int ret;
2598 enum sync_metadata_status status;
2599 struct lttng_consumer_channel *metadata_channel;
2600
2601 LTTNG_ASSERT(ctx);
2602 LTTNG_ASSERT(metadata_stream);
2603 ASSERT_RCU_READ_LOCKED();
2604
2605 metadata_channel = metadata_stream->chan;
2606 pthread_mutex_unlock(&metadata_stream->lock);
2607 /*
2608 * Request metadata from the sessiond, but don't wait for the flush
2609 * because we locked the metadata thread.
2610 */
2611 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
2612 pthread_mutex_lock(&metadata_stream->lock);
2613 if (ret < 0) {
2614 status = SYNC_METADATA_STATUS_ERROR;
2615 goto end;
2616 }
2617
2618 /*
2619 * The metadata stream and channel can be deleted while the
2620 * metadata stream lock was released. The streamed is checked
2621 * for deletion before we use it further.
2622 *
2623 * Note that it is safe to access a logically-deleted stream since its
2624 * existence is still guaranteed by the RCU read side lock. However,
2625 * it should no longer be used. The close/deletion of the metadata
2626 * channel and stream already guarantees that all metadata has been
2627 * consumed. Therefore, there is nothing left to do in this function.
2628 */
2629 if (consumer_stream_is_deleted(metadata_stream)) {
2630 DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
2631 metadata_stream->key);
2632 status = SYNC_METADATA_STATUS_NO_DATA;
2633 goto end;
2634 }
2635
2636 ret = commit_one_metadata_packet(metadata_stream);
2637 if (ret < 0) {
2638 status = SYNC_METADATA_STATUS_ERROR;
2639 goto end;
2640 } else if (ret > 0) {
2641 status = SYNC_METADATA_STATUS_NEW_DATA;
2642 } else /* ret == 0 */ {
2643 status = SYNC_METADATA_STATUS_NO_DATA;
2644 goto end;
2645 }
2646
2647 ret = lttng_ust_ctl_snapshot(metadata_stream->ustream);
2648 if (ret < 0) {
2649 ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d",
2650 ret);
2651 status = SYNC_METADATA_STATUS_ERROR;
2652 goto end;
2653 }
2654
2655 end:
2656 return status;
2657 }
2658
2659 /*
2660 * Return 0 on success else a negative value.
2661 */
2662 static int notify_if_more_data(struct lttng_consumer_stream *stream,
2663 struct lttng_consumer_local_data *ctx)
2664 {
2665 int ret;
2666 struct lttng_ust_ctl_consumer_stream *ustream;
2667
2668 LTTNG_ASSERT(stream);
2669 LTTNG_ASSERT(ctx);
2670
2671 ustream = stream->ustream;
2672
2673 /*
2674 * First, we are going to check if there is a new subbuffer available
2675 * before reading the stream wait_fd.
2676 */
2677 /* Get the next subbuffer */
2678 ret = lttng_ust_ctl_get_next_subbuf(ustream);
2679 if (ret) {
2680 /* No more data found, flag the stream. */
2681 stream->has_data = 0;
2682 ret = 0;
2683 goto end;
2684 }
2685
2686 ret = lttng_ust_ctl_put_subbuf(ustream);
2687 LTTNG_ASSERT(!ret);
2688
2689 /* This stream still has data. Flag it and wake up the data thread. */
2690 stream->has_data = 1;
2691
2692 if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
2693 ssize_t writelen;
2694
2695 writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
2696 if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2697 ret = writelen;
2698 goto end;
2699 }
2700
2701 /* The wake up pipe has been notified. */
2702 ctx->has_wakeup = 1;
2703 }
2704 ret = 0;
2705
2706 end:
2707 return ret;
2708 }
2709
2710 static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream)
2711 {
2712 int ret = 0;
2713
2714 /*
2715 * We can consume the 1 byte written into the wait_fd by
2716 * UST. Don't trigger error if we cannot read this one byte
2717 * (read returns 0), or if the error is EAGAIN or EWOULDBLOCK.
2718 *
2719 * This is only done when the stream is monitored by a thread,
2720 * before the flush is done after a hangup and if the stream
2721 * is not flagged with data since there might be nothing to
2722 * consume in the wait fd but still have data available
2723 * flagged by the consumer wake up pipe.
2724 */
2725 if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
2726 char dummy;
2727 ssize_t readlen;
2728
2729 readlen = lttng_read(stream->wait_fd, &dummy, 1);
2730 if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2731 ret = readlen;
2732 }
2733 }
2734
2735 return ret;
2736 }
2737
2738 static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
2739 struct stream_subbuffer *subbuf)
2740 {
2741 int ret;
2742
2743 ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &subbuf->info.data.subbuf_size);
2744 if (ret) {
2745 goto end;
2746 }
2747
2748 ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream,
2749 &subbuf->info.data.padded_subbuf_size);
2750 if (ret) {
2751 goto end;
2752 }
2753
2754 end:
2755 return ret;
2756 }
2757
2758 static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
2759 struct stream_subbuffer *subbuf)
2760 {
2761 int ret;
2762
2763 ret = extract_common_subbuffer_info(stream, subbuf);
2764 if (ret) {
2765 goto end;
2766 }
2767
2768 subbuf->info.metadata.version = stream->metadata_version;
2769
2770 end:
2771 return ret;
2772 }
2773
2774 static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
2775 struct stream_subbuffer *subbuf)
2776 {
2777 int ret;
2778
2779 ret = extract_common_subbuffer_info(stream, subbuf);
2780 if (ret) {
2781 goto end;
2782 }
2783
2784 ret = lttng_ust_ctl_get_packet_size(stream->ustream, &subbuf->info.data.packet_size);
2785 if (ret < 0) {
2786 PERROR("Failed to get sub-buffer packet size");
2787 goto end;
2788 }
2789
2790 ret = lttng_ust_ctl_get_content_size(stream->ustream, &subbuf->info.data.content_size);
2791 if (ret < 0) {
2792 PERROR("Failed to get sub-buffer content size");
2793 goto end;
2794 }
2795
2796 ret = lttng_ust_ctl_get_timestamp_begin(stream->ustream,
2797 &subbuf->info.data.timestamp_begin);
2798 if (ret < 0) {
2799 PERROR("Failed to get sub-buffer begin timestamp");
2800 goto end;
2801 }
2802
2803 ret = lttng_ust_ctl_get_timestamp_end(stream->ustream, &subbuf->info.data.timestamp_end);
2804 if (ret < 0) {
2805 PERROR("Failed to get sub-buffer end timestamp");
2806 goto end;
2807 }
2808
2809 ret = lttng_ust_ctl_get_events_discarded(stream->ustream,
2810 &subbuf->info.data.events_discarded);
2811 if (ret) {
2812 PERROR("Failed to get sub-buffer events discarded count");
2813 goto end;
2814 }
2815
2816 ret = lttng_ust_ctl_get_sequence_number(stream->ustream,
2817 &subbuf->info.data.sequence_number.value);
2818 if (ret) {
2819 /* May not be supported by older LTTng-modules. */
2820 if (ret != -ENOTTY) {
2821 PERROR("Failed to get sub-buffer sequence number");
2822 goto end;
2823 }
2824 } else {
2825 subbuf->info.data.sequence_number.is_set = true;
2826 }
2827
2828 ret = lttng_ust_ctl_get_stream_id(stream->ustream, &subbuf->info.data.stream_id);
2829 if (ret < 0) {
2830 PERROR("Failed to get stream id");
2831 goto end;
2832 }
2833
2834 ret = lttng_ust_ctl_get_instance_id(stream->ustream,
2835 &subbuf->info.data.stream_instance_id.value);
2836 if (ret) {
2837 /* May not be supported by older LTTng-modules. */
2838 if (ret != -ENOTTY) {
2839 PERROR("Failed to get stream instance id");
2840 goto end;
2841 }
2842 } else {
2843 subbuf->info.data.stream_instance_id.is_set = true;
2844 }
2845 end:
2846 return ret;
2847 }
2848
2849 static int get_next_subbuffer_common(struct lttng_consumer_stream *stream,
2850 struct stream_subbuffer *subbuffer)
2851 {
2852 int ret;
2853 const char *addr;
2854
2855 ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
2856 if (ret) {
2857 goto end;
2858 }
2859
2860 ret = get_current_subbuf_addr(stream, &addr);
2861 if (ret) {
2862 goto end;
2863 }
2864
2865 subbuffer->buffer.buffer =
2866 lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
2867 LTTNG_ASSERT(subbuffer->buffer.buffer.data != nullptr);
2868 end:
2869 return ret;
2870 }
2871
2872 static enum get_next_subbuffer_status get_next_subbuffer(struct lttng_consumer_stream *stream,
2873 struct stream_subbuffer *subbuffer)
2874 {
2875 int ret;
2876 enum get_next_subbuffer_status status;
2877
2878 ret = lttng_ust_ctl_get_next_subbuf(stream->ustream);
2879 switch (ret) {
2880 case 0:
2881 status = GET_NEXT_SUBBUFFER_STATUS_OK;
2882 break;
2883 case -ENODATA:
2884 case -EAGAIN:
2885 /*
2886 * The caller only expects -ENODATA when there is no data to
2887 * read, but the kernel tracer returns -EAGAIN when there is
2888 * currently no data for a non-finalized stream, and -ENODATA
2889 * when there is no data for a finalized stream. Those can be
2890 * combined into a -ENODATA return value.
2891 */
2892 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
2893 goto end;
2894 default:
2895 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
2896 goto end;
2897 }
2898
2899 ret = get_next_subbuffer_common(stream, subbuffer);
2900 if (ret) {
2901 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
2902 goto end;
2903 }
2904 end:
2905 return status;
2906 }
2907
2908 static enum get_next_subbuffer_status
2909 get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
2910 struct stream_subbuffer *subbuffer)
2911 {
2912 int ret;
2913 bool cache_empty;
2914 bool got_subbuffer;
2915 bool coherent;
2916 bool buffer_empty;
2917 unsigned long consumed_pos, produced_pos;
2918 enum get_next_subbuffer_status status;
2919
2920 do {
2921 ret = lttng_ust_ctl_get_next_subbuf(stream->ustream);
2922 if (ret == 0) {
2923 got_subbuffer = true;
2924 } else {
2925 got_subbuffer = false;
2926 if (ret != -EAGAIN) {
2927 /* Fatal error. */
2928 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
2929 goto end;
2930 }
2931 }
2932
2933 /*
2934 * Determine if the cache is empty and ensure that a sub-buffer
2935 * is made available if the cache is not empty.
2936 */
2937 if (!got_subbuffer) {
2938 ret = commit_one_metadata_packet(stream);
2939 if (ret < 0 && ret != -ENOBUFS) {
2940 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
2941 goto end;
2942 } else if (ret == 0) {
2943 /* Not an error, the cache is empty. */
2944 cache_empty = true;
2945 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
2946 goto end;
2947 } else {
2948 cache_empty = false;
2949 }
2950 } else {
2951 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
2952 cache_empty = stream->chan->metadata_cache->contents.size ==
2953 stream->ust_metadata_pushed;
2954 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
2955 }
2956 } while (!got_subbuffer);
2957
2958 /* Populate sub-buffer infos and view. */
2959 ret = get_next_subbuffer_common(stream, subbuffer);
2960 if (ret) {
2961 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
2962 goto end;
2963 }
2964
2965 ret = lttng_ustconsumer_sample_snapshot_positions(stream);
2966 if (ret < 0) {
2967 /*
2968 * -EAGAIN is not expected since we got a sub-buffer and haven't
2969 * pushed the consumption position yet (on put_next).
2970 */
2971 PERROR("Failed to take a snapshot of metadata buffer positions");
2972 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
2973 goto end;
2974 }
2975
2976 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
2977 if (ret) {
2978 PERROR("Failed to get metadata consumed position");
2979 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
2980 goto end;
2981 }
2982
2983 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
2984 if (ret) {
2985 PERROR("Failed to get metadata produced position");
2986 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
2987 goto end;
2988 }
2989
2990 /* Last sub-buffer of the ring buffer ? */
2991 buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos;
2992
2993 /*
2994 * The sessiond registry lock ensures that coherent units of metadata
2995 * are pushed to the consumer daemon at once. Hence, if a sub-buffer is
2996 * acquired, the cache is empty, and it is the only available sub-buffer
2997 * available, it is safe to assume that it is "coherent".
2998 */
2999 coherent = got_subbuffer && cache_empty && buffer_empty;
3000
3001 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
3002 status = GET_NEXT_SUBBUFFER_STATUS_OK;
3003 end:
3004 return status;
3005 }
3006
3007 static int put_next_subbuffer(struct lttng_consumer_stream *stream,
3008 struct stream_subbuffer *subbuffer __attribute__((unused)))
3009 {
3010 const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream);
3011
3012 LTTNG_ASSERT(ret == 0);
3013 return ret;
3014 }
3015
3016 static int signal_metadata(struct lttng_consumer_stream *stream,
3017 struct lttng_consumer_local_data *ctx __attribute__((unused)))
3018 {
3019 ASSERT_LOCKED(stream->metadata_rdv_lock);
3020 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
3021 }
3022
3023 static int lttng_ustconsumer_set_stream_ops(struct lttng_consumer_stream *stream)
3024 {
3025 int ret = 0;
3026
3027 stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
3028 if (stream->metadata_flag) {
3029 stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_metadata;
3030 stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info;
3031 stream->read_subbuffer_ops.reset_metadata =
3032 metadata_stream_reset_cache_consumed_position;
3033 if (stream->chan->is_live) {
3034 stream->read_subbuffer_ops.on_sleep = signal_metadata;
3035 ret = consumer_stream_enable_metadata_bucketization(stream);
3036 if (ret) {
3037 goto end;
3038 }
3039 }
3040 } else {
3041 stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer;
3042 stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info;
3043 stream->read_subbuffer_ops.on_sleep = notify_if_more_data;
3044 if (stream->chan->is_live) {
3045 stream->read_subbuffer_ops.send_live_beacon = consumer_flush_ust_index;
3046 }
3047 }
3048
3049 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
3050 end:
3051 return ret;
3052 }
3053
3054 /*
3055 * Called when a stream is created.
3056 *
3057 * Return 0 on success or else a negative value.
3058 */
3059 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
3060 {
3061 int ret;
3062
3063 LTTNG_ASSERT(stream);
3064
3065 /*
3066 * Don't create anything if this is set for streaming or if there is
3067 * no current trace chunk on the parent channel.
3068 */
3069 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
3070 stream->chan->trace_chunk) {
3071 ret = consumer_stream_create_output_files(stream, true);
3072 if (ret) {
3073 goto error;
3074 }
3075 }
3076
3077 lttng_ustconsumer_set_stream_ops(stream);
3078 ret = 0;
3079
3080 error:
3081 return ret;
3082 }
3083
3084 /*
3085 * Check if data is still being extracted from the buffers for a specific
3086 * stream. Consumer data lock MUST be acquired before calling this function
3087 * and the stream lock.
3088 *
3089 * Return 1 if the traced data are still getting read else 0 meaning that the
3090 * data is available for trace viewer reading.
3091 */
3092 int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
3093 {
3094 int ret;
3095
3096 LTTNG_ASSERT(stream);
3097 LTTNG_ASSERT(stream->ustream);
3098 ASSERT_LOCKED(stream->lock);
3099
3100 DBG("UST consumer checking data pending");
3101
3102 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
3103 ret = 0;
3104 goto end;
3105 }
3106
3107 if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
3108 uint64_t contiguous, pushed;
3109
3110 /* Ease our life a bit. */
3111 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
3112 contiguous = stream->chan->metadata_cache->contents.size;
3113 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
3114 pushed = stream->ust_metadata_pushed;
3115
3116 /*
3117 * We can simply check whether all contiguously available data
3118 * has been pushed to the ring buffer, since the push operation
3119 * is performed within get_next_subbuf(), and because both
3120 * get_next_subbuf() and put_next_subbuf() are issued atomically
3121 * thanks to the stream lock within
3122 * lttng_ustconsumer_read_subbuffer(). This basically means that
3123 * whetnever ust_metadata_pushed is incremented, the associated
3124 * metadata has been consumed from the metadata stream.
3125 */
3126 DBG("UST consumer metadata pending check: contiguous %" PRIu64
3127 " vs pushed %" PRIu64,
3128 contiguous,
3129 pushed);
3130 LTTNG_ASSERT(((int64_t) (contiguous - pushed)) >= 0);
3131 if ((contiguous != pushed) ||
3132 (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
3133 ret = 1; /* Data is pending */
3134 goto end;
3135 }
3136 } else {
3137 ret = lttng_ust_ctl_get_next_subbuf(stream->ustream);
3138 if (ret == 0) {
3139 /*
3140 * There is still data so let's put back this
3141 * subbuffer.
3142 */
3143 ret = lttng_ust_ctl_put_subbuf(stream->ustream);
3144 LTTNG_ASSERT(ret == 0);
3145 ret = 1; /* Data is pending */
3146 goto end;
3147 }
3148 }
3149
3150 /* Data is NOT pending so ready to be read. */
3151 ret = 0;
3152
3153 end:
3154 return ret;
3155 }
3156
3157 /*
3158 * Stop a given metadata channel timer if enabled and close the wait fd which
3159 * is the poll pipe of the metadata stream.
3160 *
3161 * This MUST be called with the metadata channel lock acquired.
3162 */
3163 void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
3164 {
3165 int ret;
3166
3167 LTTNG_ASSERT(metadata);
3168 LTTNG_ASSERT(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
3169
3170 DBG("Closing metadata channel key %" PRIu64, metadata->key);
3171
3172 if (metadata->switch_timer_enabled == 1) {
3173 consumer_timer_switch_stop(metadata);
3174 }
3175
3176 if (!metadata->metadata_stream) {
3177 goto end;
3178 }
3179
3180 /*
3181 * Closing write side so the thread monitoring the stream wakes up if any
3182 * and clean the metadata stream.
3183 */
3184 if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
3185 ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
3186 if (ret < 0) {
3187 PERROR("closing metadata pipe write side");
3188 }
3189 metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
3190 }
3191
3192 end:
3193 return;
3194 }
3195
3196 /*
3197 * Close every metadata stream wait fd of the metadata hash table. This
3198 * function MUST be used very carefully so not to run into a race between the
3199 * metadata thread handling streams and this function closing their wait fd.
3200 *
3201 * For UST, this is used when the session daemon hangs up. Its the metadata
3202 * producer so calling this is safe because we are assured that no state change
3203 * can occur in the metadata thread for the streams in the hash table.
3204 */
3205 void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
3206 {
3207 struct lttng_ht_iter iter;
3208 struct lttng_consumer_stream *stream;
3209
3210 LTTNG_ASSERT(metadata_ht);
3211 LTTNG_ASSERT(metadata_ht->ht);
3212
3213 DBG("UST consumer closing all metadata streams");
3214
3215 rcu_read_lock();
3216 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
3217 health_code_update();
3218
3219 pthread_mutex_lock(&stream->chan->lock);
3220 lttng_ustconsumer_close_metadata(stream->chan);
3221 pthread_mutex_unlock(&stream->chan->lock);
3222 }
3223 rcu_read_unlock();
3224 }
3225
3226 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
3227 {
3228 int ret;
3229
3230 ret = lttng_ust_ctl_stream_close_wakeup_fd(stream->ustream);
3231 if (ret < 0) {
3232 ERR("Unable to close wakeup fd");
3233 }
3234 }
3235
3236 /*
3237 * Please refer to consumer-timer.c before adding any lock within this
3238 * function or any of its callees. Timers have a very strict locking
3239 * semantic with respect to teardown. Failure to respect this semantic
3240 * introduces deadlocks.
3241 *
3242 * DON'T hold the metadata lock when calling this function, else this
3243 * can cause deadlock involving consumer awaiting for metadata to be
3244 * pushed out due to concurrent interaction with the session daemon.
3245 */
3246 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
3247 struct lttng_consumer_channel *channel,
3248 int timer,
3249 int wait)
3250 {
3251 struct lttcomm_metadata_request_msg request;
3252 struct lttcomm_consumer_msg msg;
3253 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3254 uint64_t len, key, offset, version;
3255 int ret;
3256
3257 LTTNG_ASSERT(channel);
3258 LTTNG_ASSERT(channel->metadata_cache);
3259
3260 memset(&request, 0, sizeof(request));
3261
3262 /* send the metadata request to sessiond */
3263 switch (the_consumer_data.type) {
3264 case LTTNG_CONSUMER64_UST:
3265 request.bits_per_long = 64;
3266 break;
3267 case LTTNG_CONSUMER32_UST:
3268 request.bits_per_long = 32;
3269 break;
3270 default:
3271 request.bits_per_long = 0;
3272 break;
3273 }
3274
3275 request.session_id = channel->session_id;
3276 request.session_id_per_pid = channel->session_id_per_pid;
3277 /*
3278 * Request the application UID here so the metadata of that application can
3279 * be sent back. The channel UID corresponds to the user UID of the session
3280 * used for the rights on the stream file(s).
3281 */
3282 request.uid = channel->ust_app_uid;
3283 request.key = channel->key;
3284
3285 DBG("Sending metadata request to sessiond, session id %" PRIu64 ", per-pid %" PRIu64
3286 ", app UID %u and channel key %" PRIu64,
3287 request.session_id,
3288 request.session_id_per_pid,
3289 request.uid,
3290 request.key);
3291
3292 pthread_mutex_lock(&ctx->metadata_socket_lock);
3293
3294 health_code_update();
3295
3296 ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, sizeof(request));
3297 if (ret < 0) {
3298 ERR("Asking metadata to sessiond");
3299 goto end;
3300 }
3301
3302 health_code_update();
3303
3304 /* Receive the metadata from sessiond */
3305 ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, sizeof(msg));
3306 if (ret != sizeof(msg)) {
3307 DBG("Consumer received unexpected message size %d (expects %zu)", ret, sizeof(msg));
3308 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
3309 /*
3310 * The ret value might 0 meaning an orderly shutdown but this is ok
3311 * since the caller handles this.
3312 */
3313 goto end;
3314 }
3315
3316 health_code_update();
3317
3318 if (msg.cmd_type == LTTNG_ERR_UND) {
3319 /* No registry found */
3320 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
3321 ret = 0;
3322 goto end;
3323 } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
3324 ERR("Unexpected cmd_type received %d", msg.cmd_type);
3325 ret = -1;
3326 goto end;
3327 }
3328
3329 len = msg.u.push_metadata.len;
3330 key = msg.u.push_metadata.key;
3331 offset = msg.u.push_metadata.target_offset;
3332 version = msg.u.push_metadata.version;
3333
3334 LTTNG_ASSERT(key == channel->key);
3335 if (len == 0) {
3336 DBG("No new metadata to receive for key %" PRIu64, key);
3337 }
3338
3339 health_code_update();
3340
3341 /* Tell session daemon we are ready to receive the metadata. */
3342 ret = consumer_send_status_msg(ctx->consumer_metadata_socket, LTTCOMM_CONSUMERD_SUCCESS);
3343 if (ret < 0 || len == 0) {
3344 /*
3345 * Somehow, the session daemon is not responding anymore or there is
3346 * nothing to receive.
3347 */
3348 goto end;
3349 }
3350
3351 health_code_update();
3352
3353 ret = lttng_ustconsumer_recv_metadata(
3354 ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
3355 if (ret >= 0) {
3356 /*
3357 * Only send the status msg if the sessiond is alive meaning a positive
3358 * ret code.
3359 */
3360 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret);
3361 }
3362 ret = 0;
3363
3364 end:
3365 health_code_update();
3366
3367 pthread_mutex_unlock(&ctx->metadata_socket_lock);
3368 return ret;
3369 }
3370
3371 /*
3372 * Return the ustctl call for the get stream id.
3373 */
3374 int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id)
3375 {
3376 LTTNG_ASSERT(stream);
3377 LTTNG_ASSERT(stream_id);
3378
3379 return lttng_ust_ctl_get_stream_id(stream->ustream, stream_id);
3380 }
3381
3382 void lttng_ustconsumer_sigbus_handle(void *addr)
3383 {
3384 lttng_ust_ctl_sigbus_handle(addr);
3385 }
This page took 0.094479 seconds and 5 git commands to generate.