Fix: syscall event rule: emission sites not compared in is_equal
[lttng-tools.git] / src / common / consumer / consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <common/align.hpp>
12 #include <common/common.hpp>
13 #include <common/compat/endian.hpp>
14 #include <common/compat/poll.hpp>
15 #include <common/consumer/consumer-metadata-cache.hpp>
16 #include <common/consumer/consumer-stream.hpp>
17 #include <common/consumer/consumer-testpoint.hpp>
18 #include <common/consumer/consumer-timer.hpp>
19 #include <common/consumer/consumer.hpp>
20 #include <common/dynamic-array.hpp>
21 #include <common/index/ctf-index.hpp>
22 #include <common/index/index.hpp>
23 #include <common/io-hint.hpp>
24 #include <common/kernel-consumer/kernel-consumer.hpp>
25 #include <common/kernel-ctl/kernel-ctl.hpp>
26 #include <common/relayd/relayd.hpp>
27 #include <common/sessiond-comm/relayd.hpp>
28 #include <common/sessiond-comm/sessiond-comm.hpp>
29 #include <common/string-utils/format.hpp>
30 #include <common/time.hpp>
31 #include <common/trace-chunk-registry.hpp>
32 #include <common/trace-chunk.hpp>
33 #include <common/urcu.hpp>
34 #include <common/ust-consumer/ust-consumer.hpp>
35 #include <common/utils.hpp>
36
37 #include <bin/lttng-consumerd/health-consumerd.hpp>
38 #include <fcntl.h>
39 #include <inttypes.h>
40 #include <poll.h>
41 #include <pthread.h>
42 #include <signal.h>
43 #include <stdlib.h>
44 #include <string.h>
45 #include <sys/mman.h>
46 #include <sys/socket.h>
47 #include <sys/types.h>
48 #include <type_traits>
49 #include <unistd.h>
50
51 lttng_consumer_global_data the_consumer_data;
52
53 enum consumer_channel_action {
54 CONSUMER_CHANNEL_ADD,
55 CONSUMER_CHANNEL_DEL,
56 CONSUMER_CHANNEL_QUIT,
57 };
58
59 namespace {
60 struct consumer_channel_msg {
61 enum consumer_channel_action action;
62 struct lttng_consumer_channel *chan; /* add */
63 uint64_t key; /* del */
64 };
65
66 /*
67 * Global hash table containing respectively metadata and data streams. The
68 * stream element in this ht should only be updated by the metadata poll thread
69 * for the metadata and the data poll thread for the data.
70 */
71 struct lttng_ht *metadata_ht;
72 struct lttng_ht *data_ht;
73 } /* namespace */
74
75 /* Flag used to temporarily pause data consumption from testpoints. */
76 int data_consumption_paused;
77
78 /*
79 * Flag to inform the polling thread to quit when all fd hung up. Updated by
80 * the consumer_thread_receive_fds when it notices that all fds has hung up.
81 * Also updated by the signal handler (consumer_should_exit()). Read by the
82 * polling threads.
83 */
84 int consumer_quit;
85
86 static const char *get_consumer_domain()
87 {
88 switch (the_consumer_data.type) {
89 case LTTNG_CONSUMER_KERNEL:
90 return DEFAULT_KERNEL_TRACE_DIR;
91 case LTTNG_CONSUMER64_UST:
92 /* Fall-through. */
93 case LTTNG_CONSUMER32_UST:
94 return DEFAULT_UST_TRACE_DIR;
95 default:
96 abort();
97 }
98 }
99
100 /*
101 * Notify a thread lttng pipe to poll back again. This usually means that some
102 * global state has changed so we just send back the thread in a poll wait
103 * call.
104 */
105 static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
106 {
107 struct lttng_consumer_stream *null_stream = nullptr;
108
109 LTTNG_ASSERT(pipe);
110
111 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
112 pointer. */
113 }
114
115 static void notify_health_quit_pipe(int *pipe)
116 {
117 ssize_t ret;
118
119 ret = lttng_write(pipe[1], "4", 1);
120 if (ret < 1) {
121 PERROR("write consumer health quit");
122 }
123 }
124
125 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
126 struct lttng_consumer_channel *chan,
127 uint64_t key,
128 enum consumer_channel_action action)
129 {
130 struct consumer_channel_msg msg;
131 ssize_t ret;
132
133 memset(&msg, 0, sizeof(msg));
134
135 msg.action = action;
136 msg.chan = chan;
137 msg.key = key;
138 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
139 if (ret < sizeof(msg)) {
140 PERROR("notify_channel_pipe write error");
141 }
142 }
143
144 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
145 {
146 notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
147 }
148
149 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
150 struct lttng_consumer_channel **chan,
151 uint64_t *key,
152 enum consumer_channel_action *action)
153 {
154 struct consumer_channel_msg msg;
155 ssize_t ret;
156
157 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
158 if (ret < sizeof(msg)) {
159 ret = -1;
160 goto error;
161 }
162 *action = msg.action;
163 *chan = msg.chan;
164 *key = msg.key;
165 error:
166 return (int) ret;
167 }
168
169 /*
170 * Cleanup the stream list of a channel. Those streams are not yet globally
171 * visible
172 */
173 static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
174 {
175 struct lttng_consumer_stream *stream, *stmp;
176
177 LTTNG_ASSERT(channel);
178
179 /* Delete streams that might have been left in the stream list. */
180 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
181 consumer_stream_destroy(stream, nullptr);
182 }
183 }
184
185 /*
186 * Find a stream. The consumer_data.lock must be locked during this
187 * call.
188 */
189 static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
190 {
191 struct lttng_ht_iter iter;
192 struct lttng_ht_node_u64 *node;
193 struct lttng_consumer_stream *stream = nullptr;
194
195 LTTNG_ASSERT(ht);
196
197 /* -1ULL keys are lookup failures */
198 if (key == (uint64_t) -1ULL) {
199 return nullptr;
200 }
201
202 lttng::urcu::read_lock_guard read_lock;
203
204 lttng_ht_lookup(ht, &key, &iter);
205 node = lttng_ht_iter_get_node_u64(&iter);
206 if (node != nullptr) {
207 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
208 }
209
210 return stream;
211 }
212
213 static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
214 {
215 struct lttng_consumer_stream *stream;
216
217 lttng::urcu::read_lock_guard read_lock;
218 stream = find_stream(key, ht);
219 if (stream) {
220 stream->key = (uint64_t) -1ULL;
221 /*
222 * We don't want the lookup to match, but we still need
223 * to iterate on this stream when iterating over the hash table. Just
224 * change the node key.
225 */
226 stream->node.key = (uint64_t) -1ULL;
227 }
228 }
229
230 /*
231 * Return a channel object for the given key.
232 *
233 * RCU read side lock MUST be acquired before calling this function and
234 * protects the channel ptr.
235 */
236 struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
237 {
238 struct lttng_ht_iter iter;
239 struct lttng_ht_node_u64 *node;
240 struct lttng_consumer_channel *channel = nullptr;
241
242 ASSERT_RCU_READ_LOCKED();
243
244 /* -1ULL keys are lookup failures */
245 if (key == (uint64_t) -1ULL) {
246 return nullptr;
247 }
248
249 lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
250 node = lttng_ht_iter_get_node_u64(&iter);
251 if (node != nullptr) {
252 channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
253 }
254
255 return channel;
256 }
257
258 /*
259 * There is a possibility that the consumer does not have enough time between
260 * the close of the channel on the session daemon and the cleanup in here thus
261 * once we have a channel add with an existing key, we know for sure that this
262 * channel will eventually get cleaned up by all streams being closed.
263 *
264 * This function just nullifies the already existing channel key.
265 */
266 static void steal_channel_key(uint64_t key)
267 {
268 struct lttng_consumer_channel *channel;
269
270 lttng::urcu::read_lock_guard read_lock;
271 channel = consumer_find_channel(key);
272 if (channel) {
273 channel->key = (uint64_t) -1ULL;
274 /*
275 * We don't want the lookup to match, but we still need to iterate on
276 * this channel when iterating over the hash table. Just change the
277 * node key.
278 */
279 channel->node.key = (uint64_t) -1ULL;
280 }
281 }
282
283 static void free_channel_rcu(struct rcu_head *head)
284 {
285 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
286 struct lttng_consumer_channel *channel =
287 lttng::utils::container_of(node, &lttng_consumer_channel::node);
288
289 switch (the_consumer_data.type) {
290 case LTTNG_CONSUMER_KERNEL:
291 break;
292 case LTTNG_CONSUMER32_UST:
293 case LTTNG_CONSUMER64_UST:
294 lttng_ustconsumer_free_channel(channel);
295 break;
296 default:
297 ERR("Unknown consumer_data type");
298 abort();
299 }
300
301 delete channel;
302 }
303
304 /*
305 * RCU protected relayd socket pair free.
306 */
307 static void free_relayd_rcu(struct rcu_head *head)
308 {
309 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
310 struct consumer_relayd_sock_pair *relayd =
311 lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
312
313 /*
314 * Close all sockets. This is done in the call RCU since we don't want the
315 * socket fds to be reassigned thus potentially creating bad state of the
316 * relayd object.
317 *
318 * We do not have to lock the control socket mutex here since at this stage
319 * there is no one referencing to this relayd object.
320 */
321 (void) relayd_close(&relayd->control_sock);
322 (void) relayd_close(&relayd->data_sock);
323
324 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
325 free(relayd);
326 }
327
328 /*
329 * Destroy and free relayd socket pair object.
330 */
331 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
332 {
333 int ret;
334 struct lttng_ht_iter iter;
335
336 if (relayd == nullptr) {
337 return;
338 }
339
340 DBG("Consumer destroy and close relayd socket pair");
341
342 iter.iter.node = &relayd->node.node;
343 ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
344 if (ret != 0) {
345 /* We assume the relayd is being or is destroyed */
346 return;
347 }
348
349 /* RCU free() call */
350 call_rcu(&relayd->node.head, free_relayd_rcu);
351 }
352
353 /*
354 * Remove a channel from the global list protected by a mutex. This function is
355 * also responsible for freeing its data structures.
356 */
357 void consumer_del_channel(struct lttng_consumer_channel *channel)
358 {
359 struct lttng_ht_iter iter;
360
361 DBG("Consumer delete channel key %" PRIu64, channel->key);
362
363 pthread_mutex_lock(&the_consumer_data.lock);
364 pthread_mutex_lock(&channel->lock);
365
366 /* Destroy streams that might have been left in the stream list. */
367 clean_channel_stream_list(channel);
368
369 if (channel->live_timer_enabled == 1) {
370 consumer_timer_live_stop(channel);
371 }
372 if (channel->monitor_timer_enabled == 1) {
373 consumer_timer_monitor_stop(channel);
374 }
375
376 /*
377 * Send a last buffer statistics sample to the session daemon
378 * to ensure it tracks the amount of data consumed by this channel.
379 */
380 sample_and_send_channel_buffer_stats(channel);
381
382 switch (the_consumer_data.type) {
383 case LTTNG_CONSUMER_KERNEL:
384 break;
385 case LTTNG_CONSUMER32_UST:
386 case LTTNG_CONSUMER64_UST:
387 lttng_ustconsumer_del_channel(channel);
388 break;
389 default:
390 ERR("Unknown consumer_data type");
391 abort();
392 goto end;
393 }
394
395 lttng_trace_chunk_put(channel->trace_chunk);
396 channel->trace_chunk = nullptr;
397
398 if (channel->is_published) {
399 int ret;
400
401 lttng::urcu::read_lock_guard read_lock;
402 iter.iter.node = &channel->node.node;
403 ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
404 LTTNG_ASSERT(!ret);
405
406 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
407 ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
408 LTTNG_ASSERT(!ret);
409 }
410
411 channel->is_deleted = true;
412 call_rcu(&channel->node.head, free_channel_rcu);
413 end:
414 pthread_mutex_unlock(&channel->lock);
415 pthread_mutex_unlock(&the_consumer_data.lock);
416 }
417
418 /*
419 * Iterate over the relayd hash table and destroy each element. Finally,
420 * destroy the whole hash table.
421 */
422 static void cleanup_relayd_ht()
423 {
424 struct lttng_ht_iter iter;
425 struct consumer_relayd_sock_pair *relayd;
426
427 {
428 lttng::urcu::read_lock_guard read_lock;
429
430 cds_lfht_for_each_entry (
431 the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
432 consumer_destroy_relayd(relayd);
433 }
434 }
435
436 lttng_ht_destroy(the_consumer_data.relayd_ht);
437 }
438
439 /*
440 * Update the end point status of all streams having the given network sequence
441 * index (relayd index).
442 *
443 * It's atomically set without having the stream mutex locked which is fine
444 * because we handle the write/read race with a pipe wakeup for each thread.
445 */
446 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
447 enum consumer_endpoint_status status)
448 {
449 struct lttng_ht_iter iter;
450 struct lttng_consumer_stream *stream;
451
452 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
453
454 lttng::urcu::read_lock_guard read_lock;
455
456 /* Let's begin with metadata */
457 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
458 if (stream->net_seq_idx == net_seq_idx) {
459 uatomic_set(&stream->endpoint_status, status);
460 stream->chan->metadata_pushed_wait_queue.wake_all();
461
462 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
463 }
464 }
465
466 /* Follow up by the data streams */
467 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
468 if (stream->net_seq_idx == net_seq_idx) {
469 uatomic_set(&stream->endpoint_status, status);
470 DBG("Delete flag set to data stream %d", stream->wait_fd);
471 }
472 }
473 }
474
475 /*
476 * Cleanup a relayd object by flagging every associated streams for deletion,
477 * destroying the object meaning removing it from the relayd hash table,
478 * closing the sockets and freeing the memory in a RCU call.
479 *
480 * If a local data context is available, notify the threads that the streams'
481 * state have changed.
482 */
483 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
484 {
485 uint64_t netidx;
486
487 LTTNG_ASSERT(relayd);
488
489 DBG("Cleaning up relayd object ID %" PRIu64, relayd->net_seq_idx);
490
491 /* Save the net sequence index before destroying the object */
492 netidx = relayd->net_seq_idx;
493
494 /*
495 * Delete the relayd from the relayd hash table, close the sockets and free
496 * the object in a RCU call.
497 */
498 consumer_destroy_relayd(relayd);
499
500 /* Set inactive endpoint to all streams */
501 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
502
503 /*
504 * With a local data context, notify the threads that the streams' state
505 * have changed. The write() action on the pipe acts as an "implicit"
506 * memory barrier ordering the updates of the end point status from the
507 * read of this status which happens AFTER receiving this notify.
508 */
509 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
510 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
511 }
512
513 /*
514 * Flag a relayd socket pair for destruction. Destroy it if the refcount
515 * reaches zero.
516 *
517 * RCU read side lock MUST be aquired before calling this function.
518 */
519 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
520 {
521 LTTNG_ASSERT(relayd);
522 ASSERT_RCU_READ_LOCKED();
523
524 /* Set destroy flag for this object */
525 uatomic_set(&relayd->destroy_flag, 1);
526
527 /* Destroy the relayd if refcount is 0 */
528 if (uatomic_read(&relayd->refcount) == 0) {
529 consumer_destroy_relayd(relayd);
530 }
531 }
532
533 /*
534 * Completly destroy stream from every visiable data structure and the given
535 * hash table if one.
536 *
537 * One this call returns, the stream object is not longer usable nor visible.
538 */
539 void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
540 {
541 consumer_stream_destroy(stream, ht);
542 }
543
544 /*
545 * XXX naming of del vs destroy is all mixed up.
546 */
547 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
548 {
549 consumer_stream_destroy(stream, data_ht);
550 }
551
552 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
553 {
554 consumer_stream_destroy(stream, metadata_ht);
555 }
556
557 void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
558 struct lttng_consumer_channel *channel)
559 {
560 stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
561 }
562
563 /*
564 * Add a stream to the global list protected by a mutex.
565 */
566 void consumer_add_data_stream(struct lttng_consumer_stream *stream)
567 {
568 struct lttng_ht *ht = data_ht;
569
570 LTTNG_ASSERT(stream);
571 LTTNG_ASSERT(ht);
572
573 DBG3("Adding consumer stream %" PRIu64, stream->key);
574
575 pthread_mutex_lock(&the_consumer_data.lock);
576 pthread_mutex_lock(&stream->chan->lock);
577 pthread_mutex_lock(&stream->chan->timer_lock);
578 pthread_mutex_lock(&stream->lock);
579 lttng::urcu::read_lock_guard read_lock;
580
581 /* Steal stream identifier to avoid having streams with the same key */
582 steal_stream_key(stream->key, ht);
583
584 lttng_ht_add_unique_u64(ht, &stream->node);
585
586 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
587
588 /*
589 * Add stream to the stream_list_ht of the consumer data. No need to steal
590 * the key since the HT does not use it and we allow to add redundant keys
591 * into this table.
592 */
593 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
594
595 /*
596 * When nb_init_stream_left reaches 0, we don't need to trigger any action
597 * in terms of destroying the associated channel, because the action that
598 * causes the count to become 0 also causes a stream to be added. The
599 * channel deletion will thus be triggered by the following removal of this
600 * stream.
601 */
602 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
603 /* Increment refcount before decrementing nb_init_stream_left */
604 cmm_smp_wmb();
605 uatomic_dec(&stream->chan->nb_init_stream_left);
606 }
607
608 /* Update consumer data once the node is inserted. */
609 the_consumer_data.stream_count++;
610 the_consumer_data.need_update = 1;
611
612 pthread_mutex_unlock(&stream->lock);
613 pthread_mutex_unlock(&stream->chan->timer_lock);
614 pthread_mutex_unlock(&stream->chan->lock);
615 pthread_mutex_unlock(&the_consumer_data.lock);
616 }
617
618 /*
619 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
620 * be acquired before calling this.
621 */
622 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
623 {
624 int ret = 0;
625 struct lttng_ht_node_u64 *node;
626 struct lttng_ht_iter iter;
627
628 LTTNG_ASSERT(relayd);
629 ASSERT_RCU_READ_LOCKED();
630
631 lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
632 node = lttng_ht_iter_get_node_u64(&iter);
633 if (node != nullptr) {
634 goto end;
635 }
636 lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
637
638 end:
639 return ret;
640 }
641
642 /*
643 * Allocate and return a consumer relayd socket.
644 */
645 static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
646 {
647 struct consumer_relayd_sock_pair *obj = nullptr;
648
649 /* net sequence index of -1 is a failure */
650 if (net_seq_idx == (uint64_t) -1ULL) {
651 goto error;
652 }
653
654 obj = zmalloc<consumer_relayd_sock_pair>();
655 if (obj == nullptr) {
656 PERROR("zmalloc relayd sock");
657 goto error;
658 }
659
660 obj->net_seq_idx = net_seq_idx;
661 obj->refcount = 0;
662 obj->destroy_flag = 0;
663 obj->control_sock.sock.fd = -1;
664 obj->data_sock.sock.fd = -1;
665 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
666 pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
667
668 error:
669 return obj;
670 }
671
672 /*
673 * Find a relayd socket pair in the global consumer data.
674 *
675 * Return the object if found else NULL.
676 * RCU read-side lock must be held across this call and while using the
677 * returned object.
678 */
679 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
680 {
681 struct lttng_ht_iter iter;
682 struct lttng_ht_node_u64 *node;
683 struct consumer_relayd_sock_pair *relayd = nullptr;
684
685 ASSERT_RCU_READ_LOCKED();
686
687 /* Negative keys are lookup failures */
688 if (key == (uint64_t) -1ULL) {
689 goto error;
690 }
691
692 lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
693 node = lttng_ht_iter_get_node_u64(&iter);
694 if (node != nullptr) {
695 relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
696 }
697
698 error:
699 return relayd;
700 }
701
702 /*
703 * Find a relayd and send the stream
704 *
705 * Returns 0 on success, < 0 on error
706 */
707 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
708 {
709 int ret = 0;
710 struct consumer_relayd_sock_pair *relayd;
711
712 LTTNG_ASSERT(stream);
713 LTTNG_ASSERT(stream->net_seq_idx != -1ULL);
714 LTTNG_ASSERT(path);
715
716 /* The stream is not metadata. Get relayd reference if exists. */
717 lttng::urcu::read_lock_guard read_lock;
718 relayd = consumer_find_relayd(stream->net_seq_idx);
719 if (relayd != nullptr) {
720 /* Add stream on the relayd */
721 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
722 ret = relayd_add_stream(&relayd->control_sock,
723 stream->name,
724 get_consumer_domain(),
725 path,
726 &stream->relayd_stream_id,
727 stream->chan->tracefile_size,
728 stream->chan->tracefile_count,
729 stream->trace_chunk);
730 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
731 if (ret < 0) {
732 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
733 relayd->net_seq_idx);
734 lttng_consumer_cleanup_relayd(relayd);
735 goto end;
736 }
737
738 uatomic_inc(&relayd->refcount);
739 stream->sent_to_relayd = 1;
740 } else {
741 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
742 stream->key,
743 stream->net_seq_idx);
744 ret = -1;
745 goto end;
746 }
747
748 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
749 stream->name,
750 stream->key,
751 stream->net_seq_idx);
752
753 end:
754 return ret;
755 }
756
757 /*
758 * Find a relayd and send the streams sent message
759 *
760 * Returns 0 on success, < 0 on error
761 */
762 int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
763 {
764 int ret = 0;
765 struct consumer_relayd_sock_pair *relayd;
766
767 LTTNG_ASSERT(net_seq_idx != -1ULL);
768
769 /* The stream is not metadata. Get relayd reference if exists. */
770 lttng::urcu::read_lock_guard read_lock;
771 relayd = consumer_find_relayd(net_seq_idx);
772 if (relayd != nullptr) {
773 /* Add stream on the relayd */
774 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
775 ret = relayd_streams_sent(&relayd->control_sock);
776 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
777 if (ret < 0) {
778 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
779 relayd->net_seq_idx);
780 lttng_consumer_cleanup_relayd(relayd);
781 goto end;
782 }
783 } else {
784 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
785 ret = -1;
786 goto end;
787 }
788
789 ret = 0;
790 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
791
792 end:
793 return ret;
794 }
795
796 /*
797 * Find a relayd and close the stream
798 */
799 void close_relayd_stream(struct lttng_consumer_stream *stream)
800 {
801 struct consumer_relayd_sock_pair *relayd;
802
803 /* The stream is not metadata. Get relayd reference if exists. */
804 lttng::urcu::read_lock_guard read_lock;
805 relayd = consumer_find_relayd(stream->net_seq_idx);
806 if (relayd) {
807 consumer_stream_relayd_close(stream, relayd);
808 }
809 }
810
811 /*
812 * Handle stream for relayd transmission if the stream applies for network
813 * streaming where the net sequence index is set.
814 *
815 * Return destination file descriptor or negative value on error.
816 */
817 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
818 size_t data_size,
819 unsigned long padding,
820 struct consumer_relayd_sock_pair *relayd)
821 {
822 int outfd = -1, ret;
823 struct lttcomm_relayd_data_hdr data_hdr;
824
825 /* Safety net */
826 LTTNG_ASSERT(stream);
827 LTTNG_ASSERT(relayd);
828
829 /* Reset data header */
830 memset(&data_hdr, 0, sizeof(data_hdr));
831
832 if (stream->metadata_flag) {
833 /* Caller MUST acquire the relayd control socket lock */
834 ret = relayd_send_metadata(&relayd->control_sock, data_size);
835 if (ret < 0) {
836 goto error;
837 }
838
839 /* Metadata are always sent on the control socket. */
840 outfd = relayd->control_sock.sock.fd;
841 } else {
842 /* Set header with stream information */
843 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
844 data_hdr.data_size = htobe32(data_size);
845 data_hdr.padding_size = htobe32(padding);
846
847 /*
848 * Note that net_seq_num below is assigned with the *current* value of
849 * next_net_seq_num and only after that the next_net_seq_num will be
850 * increment. This is why when issuing a command on the relayd using
851 * this next value, 1 should always be substracted in order to compare
852 * the last seen sequence number on the relayd side to the last sent.
853 */
854 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
855 /* Other fields are zeroed previously */
856
857 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
858 if (ret < 0) {
859 goto error;
860 }
861
862 ++stream->next_net_seq_num;
863
864 /* Set to go on data socket */
865 outfd = relayd->data_sock.sock.fd;
866 }
867
868 error:
869 return outfd;
870 }
871
872 /*
873 * Write a character on the metadata poll pipe to wake the metadata thread.
874 * Returns 0 on success, -1 on error.
875 */
876 int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
877 {
878 int ret = 0;
879
880 DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
881 if (channel->monitor && channel->metadata_stream) {
882 const char dummy = 'c';
883 const ssize_t write_ret =
884 lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
885
886 if (write_ret < 1) {
887 if (errno == EWOULDBLOCK) {
888 /*
889 * This is fine, the metadata poll thread
890 * is having a hard time keeping-up, but
891 * it will eventually wake-up and consume
892 * the available data.
893 */
894 ret = 0;
895 } else {
896 PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
897 ret = -1;
898 goto end;
899 }
900 }
901 }
902
903 end:
904 return ret;
905 }
906
907 /*
908 * Trigger a dump of the metadata content. Following/during the succesful
909 * completion of this call, the metadata poll thread will start receiving
910 * metadata packets to consume.
911 *
912 * The caller must hold the channel and stream locks.
913 */
914 static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
915 {
916 int ret;
917
918 ASSERT_LOCKED(stream->chan->lock);
919 ASSERT_LOCKED(stream->lock);
920 LTTNG_ASSERT(stream->metadata_flag);
921 LTTNG_ASSERT(stream->chan->trace_chunk);
922
923 switch (the_consumer_data.type) {
924 case LTTNG_CONSUMER_KERNEL:
925 /*
926 * Reset the position of what has been read from the
927 * metadata cache to 0 so we can dump it again.
928 */
929 ret = kernctl_metadata_cache_dump(stream->wait_fd);
930 break;
931 case LTTNG_CONSUMER32_UST:
932 case LTTNG_CONSUMER64_UST:
933 /*
934 * Reset the position pushed from the metadata cache so it
935 * will write from the beginning on the next push.
936 */
937 stream->ust_metadata_pushed = 0;
938 ret = consumer_metadata_wakeup_pipe(stream->chan);
939 break;
940 default:
941 ERR("Unknown consumer_data type");
942 abort();
943 }
944 if (ret < 0) {
945 ERR("Failed to dump the metadata cache");
946 }
947 return ret;
948 }
949
950 static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
951 struct lttng_trace_chunk *new_trace_chunk)
952 {
953 pthread_mutex_lock(&channel->lock);
954 if (channel->is_deleted) {
955 /*
956 * The channel has been logically deleted and should no longer
957 * be used. It has released its reference to its current trace
958 * chunk and should not acquire a new one.
959 *
960 * Return success as there is nothing for the caller to do.
961 */
962 goto end;
963 }
964
965 /*
966 * The acquisition of the reference cannot fail (barring
967 * a severe internal error) since a reference to the published
968 * chunk is already held by the caller.
969 */
970 if (new_trace_chunk) {
971 const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
972
973 LTTNG_ASSERT(acquired_reference);
974 }
975
976 lttng_trace_chunk_put(channel->trace_chunk);
977 channel->trace_chunk = new_trace_chunk;
978 end:
979 pthread_mutex_unlock(&channel->lock);
980 return 0;
981 }
982
983 /*
984 * Allocate and return a new lttng_consumer_channel object using the given key
985 * to initialize the hash table node.
986 *
987 * On error, return NULL.
988 */
989 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
990 uint64_t session_id,
991 const uint64_t *chunk_id,
992 const char *pathname,
993 const char *name,
994 uint64_t relayd_id,
995 enum lttng_event_output output,
996 uint64_t tracefile_size,
997 uint64_t tracefile_count,
998 uint64_t session_id_per_pid,
999 unsigned int monitor,
1000 unsigned int live_timer_interval,
1001 bool is_in_live_session,
1002 const char *root_shm_path,
1003 const char *shm_path)
1004 {
1005 struct lttng_consumer_channel *channel = nullptr;
1006 struct lttng_trace_chunk *trace_chunk = nullptr;
1007
1008 if (chunk_id) {
1009 trace_chunk = lttng_trace_chunk_registry_find_chunk(
1010 the_consumer_data.chunk_registry, session_id, *chunk_id);
1011 if (!trace_chunk) {
1012 ERR("Failed to find trace chunk reference during creation of channel");
1013 goto end;
1014 }
1015 }
1016
1017 try {
1018 channel = new lttng_consumer_channel;
1019 } catch (const std::bad_alloc& e) {
1020 ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
1021 channel = nullptr;
1022 goto end;
1023 }
1024
1025 channel->key = key;
1026 channel->refcount = 0;
1027 channel->session_id = session_id;
1028 channel->session_id_per_pid = session_id_per_pid;
1029 channel->relayd_id = relayd_id;
1030 channel->tracefile_size = tracefile_size;
1031 channel->tracefile_count = tracefile_count;
1032 channel->monitor = monitor;
1033 channel->live_timer_interval = live_timer_interval;
1034 channel->is_live = is_in_live_session;
1035 pthread_mutex_init(&channel->lock, NULL);
1036 pthread_mutex_init(&channel->timer_lock, NULL);
1037
1038 switch (output) {
1039 case LTTNG_EVENT_SPLICE:
1040 channel->output = CONSUMER_CHANNEL_SPLICE;
1041 break;
1042 case LTTNG_EVENT_MMAP:
1043 channel->output = CONSUMER_CHANNEL_MMAP;
1044 break;
1045 default:
1046 abort();
1047 delete channel;
1048 channel = nullptr;
1049 goto end;
1050 }
1051
1052 /*
1053 * In monitor mode, the streams associated with the channel will be put in
1054 * a special list ONLY owned by this channel. So, the refcount is set to 1
1055 * here meaning that the channel itself has streams that are referenced.
1056 *
1057 * On a channel deletion, once the channel is no longer visible, the
1058 * refcount is decremented and checked for a zero value to delete it. With
1059 * streams in no monitor mode, it will now be safe to destroy the channel.
1060 */
1061 if (!channel->monitor) {
1062 channel->refcount = 1;
1063 }
1064
1065 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1066 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1067
1068 strncpy(channel->name, name, sizeof(channel->name));
1069 channel->name[sizeof(channel->name) - 1] = '\0';
1070
1071 if (root_shm_path) {
1072 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1073 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1074 }
1075 if (shm_path) {
1076 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1077 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1078 }
1079
1080 lttng_ht_node_init_u64(&channel->node, channel->key);
1081 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
1082
1083 channel->wait_fd = -1;
1084 CDS_INIT_LIST_HEAD(&channel->streams.head);
1085
1086 if (trace_chunk) {
1087 int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
1088 if (ret) {
1089 goto error;
1090 }
1091 }
1092
1093 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
1094
1095 end:
1096 lttng_trace_chunk_put(trace_chunk);
1097 return channel;
1098 error:
1099 consumer_del_channel(channel);
1100 channel = nullptr;
1101 goto end;
1102 }
1103
1104 /*
1105 * Add a channel to the global list protected by a mutex.
1106 *
1107 * Always return 0 indicating success.
1108 */
1109 int consumer_add_channel(struct lttng_consumer_channel *channel,
1110 struct lttng_consumer_local_data *ctx)
1111 {
1112 pthread_mutex_lock(&the_consumer_data.lock);
1113 pthread_mutex_lock(&channel->lock);
1114 pthread_mutex_lock(&channel->timer_lock);
1115
1116 /*
1117 * This gives us a guarantee that the channel we are about to add to the
1118 * channel hash table will be unique. See this function comment on the why
1119 * we need to steel the channel key at this stage.
1120 */
1121 steal_channel_key(channel->key);
1122
1123 lttng::urcu::read_lock_guard read_lock;
1124 lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
1125 lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
1126 &channel->channels_by_session_id_ht_node);
1127 channel->is_published = true;
1128
1129 pthread_mutex_unlock(&channel->timer_lock);
1130 pthread_mutex_unlock(&channel->lock);
1131 pthread_mutex_unlock(&the_consumer_data.lock);
1132
1133 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
1134 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
1135 }
1136
1137 return 0;
1138 }
1139
1140 /*
1141 * Allocate the pollfd structure and the local view of the out fds to avoid
1142 * doing a lookup in the linked list and concurrency issues when writing is
1143 * needed. Called with consumer_data.lock held.
1144 *
1145 * Returns the number of fds in the structures.
1146 */
1147 static int update_poll_array(struct lttng_consumer_local_data *ctx,
1148 struct pollfd **pollfd,
1149 struct lttng_consumer_stream **local_stream,
1150 struct lttng_ht *ht,
1151 int *nb_inactive_fd)
1152 {
1153 int i = 0;
1154 struct lttng_ht_iter iter;
1155 struct lttng_consumer_stream *stream;
1156
1157 LTTNG_ASSERT(ctx);
1158 LTTNG_ASSERT(ht);
1159 LTTNG_ASSERT(pollfd);
1160 LTTNG_ASSERT(local_stream);
1161
1162 DBG("Updating poll fd array");
1163 *nb_inactive_fd = 0;
1164
1165 {
1166 lttng::urcu::read_lock_guard read_lock;
1167 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1168 /*
1169 * Only active streams with an active end point can be added to the
1170 * poll set and local stream storage of the thread.
1171 *
1172 * There is a potential race here for endpoint_status to be updated
1173 * just after the check. However, this is OK since the stream(s) will
1174 * be deleted once the thread is notified that the end point state has
1175 * changed where this function will be called back again.
1176 *
1177 * We track the number of inactive FDs because they still need to be
1178 * closed by the polling thread after a wakeup on the data_pipe or
1179 * metadata_pipe.
1180 */
1181 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1182 (*nb_inactive_fd)++;
1183 continue;
1184 }
1185
1186 (*pollfd)[i].fd = stream->wait_fd;
1187 (*pollfd)[i].events = POLLIN | POLLPRI;
1188 local_stream[i] = stream;
1189 i++;
1190 }
1191 }
1192
1193 /*
1194 * Insert the consumer_data_pipe at the end of the array and don't
1195 * increment i so nb_fd is the number of real FD.
1196 */
1197 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
1198 (*pollfd)[i].events = POLLIN | POLLPRI;
1199
1200 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1201 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
1202 return i;
1203 }
1204
1205 /*
1206 * Poll on the should_quit pipe and the command socket return -1 on
1207 * error, 1 if should exit, 0 if data is available on the command socket
1208 */
1209 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1210 {
1211 int num_rdy;
1212
1213 restart:
1214 num_rdy = poll(consumer_sockpoll, 2, -1);
1215 if (num_rdy == -1) {
1216 /*
1217 * Restart interrupted system call.
1218 */
1219 if (errno == EINTR) {
1220 goto restart;
1221 }
1222 PERROR("Poll error");
1223 return -1;
1224 }
1225 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
1226 DBG("consumer_should_quit wake up");
1227 return 1;
1228 }
1229 return 0;
1230 }
1231
1232 /*
1233 * Set the error socket.
1234 */
1235 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
1236 {
1237 ctx->consumer_error_socket = sock;
1238 }
1239
1240 /*
1241 * Set the command socket path.
1242 */
1243 void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
1244 {
1245 ctx->consumer_command_sock_path = sock;
1246 }
1247
1248 /*
1249 * Send return code to the session daemon.
1250 * If the socket is not defined, we return 0, it is not a fatal error
1251 */
1252 int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
1253 enum lttcomm_return_code error_code)
1254 {
1255 if (ctx->consumer_error_socket > 0) {
1256 const std::int32_t comm_code = std::int32_t(error_code);
1257
1258 static_assert(
1259 sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
1260 "Fixed-size communication type too small to accomodate lttcomm_return_code");
1261 return lttcomm_send_unix_sock(
1262 ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
1263 }
1264
1265 return 0;
1266 }
1267
1268 /*
1269 * Close all the tracefiles and stream fds and MUST be called when all
1270 * instances are destroyed i.e. when all threads were joined and are ended.
1271 */
1272 void lttng_consumer_cleanup()
1273 {
1274 struct lttng_ht_iter iter;
1275 struct lttng_consumer_channel *channel;
1276 unsigned int trace_chunks_left;
1277
1278 {
1279 lttng::urcu::read_lock_guard read_lock;
1280
1281 cds_lfht_for_each_entry (
1282 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
1283 consumer_del_channel(channel);
1284 }
1285 }
1286
1287 lttng_ht_destroy(the_consumer_data.channel_ht);
1288 lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
1289
1290 cleanup_relayd_ht();
1291
1292 lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
1293
1294 /*
1295 * This HT contains streams that are freed by either the metadata thread or
1296 * the data thread so we do *nothing* on the hash table and simply destroy
1297 * it.
1298 */
1299 lttng_ht_destroy(the_consumer_data.stream_list_ht);
1300
1301 /*
1302 * Trace chunks in the registry may still exist if the session
1303 * daemon has encountered an internal error and could not
1304 * tear down its sessions and/or trace chunks properly.
1305 *
1306 * Release the session daemon's implicit reference to any remaining
1307 * trace chunk and print an error if any trace chunk was found. Note
1308 * that there are _no_ legitimate cases for trace chunks to be left,
1309 * it is a leak. However, it can happen following a crash of the
1310 * session daemon and not emptying the registry would cause an assertion
1311 * to hit.
1312 */
1313 trace_chunks_left =
1314 lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
1315 if (trace_chunks_left) {
1316 ERR("%u trace chunks are leaked by lttng-consumerd. "
1317 "This can be caused by an internal error of the session daemon.",
1318 trace_chunks_left);
1319 }
1320 /* Run all callbacks freeing each chunk. */
1321 rcu_barrier();
1322 lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
1323 }
1324
1325 /*
1326 * Called from signal handler.
1327 */
1328 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1329 {
1330 ssize_t ret;
1331
1332 CMM_STORE_SHARED(consumer_quit, 1);
1333 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1334 if (ret < 1) {
1335 PERROR("write consumer quit");
1336 }
1337
1338 DBG("Consumer flag that it should quit");
1339 }
1340
1341 /*
1342 * Flush pending writes to trace output disk file.
1343 */
1344 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
1345 {
1346 int outfd = stream->out_fd;
1347
1348 /*
1349 * This does a blocking write-and-wait on any page that belongs to the
1350 * subbuffer prior to the one we just wrote.
1351 * Don't care about error values, as these are just hints and ways to
1352 * limit the amount of page cache used.
1353 */
1354 if (orig_offset < stream->max_sb_size) {
1355 return;
1356 }
1357 lttng::io::hint_flush_range_dont_need_sync(
1358 outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
1359 }
1360
1361 /*
1362 * Initialise the necessary environnement :
1363 * - create a new context
1364 * - create the poll_pipe
1365 * - create the should_quit pipe (for signal handler)
1366 * - create the thread pipe (for splice)
1367 *
1368 * Takes a function pointer as argument, this function is called when data is
1369 * available on a buffer. This function is responsible to do the
1370 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1371 * buffer configuration and then kernctl_put_next_subbuf at the end.
1372 *
1373 * Returns a pointer to the new context or NULL on error.
1374 */
1375 struct lttng_consumer_local_data *
1376 lttng_consumer_create(enum lttng_consumer_type type,
1377 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1378 struct lttng_consumer_local_data *ctx,
1379 bool locked_by_caller),
1380 int (*recv_channel)(struct lttng_consumer_channel *channel),
1381 int (*recv_stream)(struct lttng_consumer_stream *stream),
1382 int (*update_stream)(uint64_t stream_key, uint32_t state))
1383 {
1384 int ret;
1385 struct lttng_consumer_local_data *ctx;
1386
1387 LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1388 the_consumer_data.type == type);
1389 the_consumer_data.type = type;
1390
1391 ctx = zmalloc<lttng_consumer_local_data>();
1392 if (ctx == nullptr) {
1393 PERROR("allocating context");
1394 goto error;
1395 }
1396
1397 ctx->consumer_error_socket = -1;
1398 ctx->consumer_metadata_socket = -1;
1399 pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
1400 /* assign the callbacks */
1401 ctx->on_buffer_ready = buffer_ready;
1402 ctx->on_recv_channel = recv_channel;
1403 ctx->on_recv_stream = recv_stream;
1404 ctx->on_update_stream = update_stream;
1405
1406 ctx->consumer_data_pipe = lttng_pipe_open(0);
1407 if (!ctx->consumer_data_pipe) {
1408 goto error_poll_pipe;
1409 }
1410
1411 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1412 if (!ctx->consumer_wakeup_pipe) {
1413 goto error_wakeup_pipe;
1414 }
1415
1416 ret = pipe(ctx->consumer_should_quit);
1417 if (ret < 0) {
1418 PERROR("Error creating recv pipe");
1419 goto error_quit_pipe;
1420 }
1421
1422 ret = pipe(ctx->consumer_channel_pipe);
1423 if (ret < 0) {
1424 PERROR("Error creating channel pipe");
1425 goto error_channel_pipe;
1426 }
1427
1428 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1429 if (!ctx->consumer_metadata_pipe) {
1430 goto error_metadata_pipe;
1431 }
1432
1433 ctx->channel_monitor_pipe = -1;
1434
1435 return ctx;
1436
1437 error_metadata_pipe:
1438 utils_close_pipe(ctx->consumer_channel_pipe);
1439 error_channel_pipe:
1440 utils_close_pipe(ctx->consumer_should_quit);
1441 error_quit_pipe:
1442 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1443 error_wakeup_pipe:
1444 lttng_pipe_destroy(ctx->consumer_data_pipe);
1445 error_poll_pipe:
1446 free(ctx);
1447 error:
1448 return nullptr;
1449 }
1450
1451 /*
1452 * Iterate over all streams of the hashtable and free them properly.
1453 */
1454 static void destroy_data_stream_ht(struct lttng_ht *ht)
1455 {
1456 struct lttng_ht_iter iter;
1457 struct lttng_consumer_stream *stream;
1458
1459 if (ht == nullptr) {
1460 return;
1461 }
1462
1463 {
1464 lttng::urcu::read_lock_guard read_lock;
1465 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1466 /*
1467 * Ignore return value since we are currently cleaning up so any error
1468 * can't be handled.
1469 */
1470 (void) consumer_del_stream(stream, ht);
1471 }
1472 }
1473
1474 lttng_ht_destroy(ht);
1475 }
1476
1477 /*
1478 * Iterate over all streams of the metadata hashtable and free them
1479 * properly.
1480 */
1481 static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1482 {
1483 struct lttng_ht_iter iter;
1484 struct lttng_consumer_stream *stream;
1485
1486 if (ht == nullptr) {
1487 return;
1488 }
1489
1490 {
1491 lttng::urcu::read_lock_guard read_lock;
1492 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1493 /*
1494 * Ignore return value since we are currently cleaning up so any error
1495 * can't be handled.
1496 */
1497 (void) consumer_del_metadata_stream(stream, ht);
1498 }
1499 }
1500
1501 lttng_ht_destroy(ht);
1502 }
1503
1504 /*
1505 * Close all fds associated with the instance and free the context.
1506 */
1507 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1508 {
1509 int ret;
1510
1511 DBG("Consumer destroying it. Closing everything.");
1512
1513 if (!ctx) {
1514 return;
1515 }
1516
1517 destroy_data_stream_ht(data_ht);
1518 destroy_metadata_stream_ht(metadata_ht);
1519
1520 ret = close(ctx->consumer_error_socket);
1521 if (ret) {
1522 PERROR("close");
1523 }
1524 ret = close(ctx->consumer_metadata_socket);
1525 if (ret) {
1526 PERROR("close");
1527 }
1528 utils_close_pipe(ctx->consumer_channel_pipe);
1529 lttng_pipe_destroy(ctx->consumer_data_pipe);
1530 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1531 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1532 utils_close_pipe(ctx->consumer_should_quit);
1533
1534 unlink(ctx->consumer_command_sock_path);
1535 free(ctx);
1536 }
1537
1538 /*
1539 * Write the metadata stream id on the specified file descriptor.
1540 */
1541 static int
1542 write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
1543 {
1544 ssize_t ret;
1545 struct lttcomm_relayd_metadata_payload hdr;
1546
1547 hdr.stream_id = htobe64(stream->relayd_stream_id);
1548 hdr.padding_size = htobe32(padding);
1549 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1550 if (ret < sizeof(hdr)) {
1551 /*
1552 * This error means that the fd's end is closed so ignore the PERROR
1553 * not to clubber the error output since this can happen in a normal
1554 * code path.
1555 */
1556 if (errno != EPIPE) {
1557 PERROR("write metadata stream id");
1558 }
1559 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
1560 /*
1561 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1562 * handle writting the missing part so report that as an error and
1563 * don't lie to the caller.
1564 */
1565 ret = -1;
1566 goto end;
1567 }
1568 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1569 stream->relayd_stream_id,
1570 padding);
1571
1572 end:
1573 return (int) ret;
1574 }
1575
1576 /*
1577 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1578 * core function for writing trace buffers to either the local filesystem or
1579 * the network.
1580 *
1581 * It must be called with the stream and the channel lock held.
1582 *
1583 * Careful review MUST be put if any changes occur!
1584 *
1585 * Returns the number of bytes written
1586 */
1587 ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
1588 const struct lttng_buffer_view *buffer,
1589 unsigned long padding)
1590 {
1591 ssize_t ret = 0;
1592 off_t orig_offset = stream->out_fd_offset;
1593 /* Default is on the disk */
1594 int outfd = stream->out_fd;
1595 struct consumer_relayd_sock_pair *relayd = nullptr;
1596 unsigned int relayd_hang_up = 0;
1597 const size_t subbuf_content_size = buffer->size - padding;
1598 size_t write_len;
1599
1600 /* RCU lock for the relayd pointer */
1601 lttng::urcu::read_lock_guard read_lock;
1602 LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
1603
1604 /* Flag that the current stream if set for network streaming. */
1605 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1606 relayd = consumer_find_relayd(stream->net_seq_idx);
1607 if (relayd == nullptr) {
1608 ret = -EPIPE;
1609 goto end;
1610 }
1611 }
1612
1613 /* Handle stream on the relayd if the output is on the network */
1614 if (relayd) {
1615 unsigned long netlen = subbuf_content_size;
1616
1617 /*
1618 * Lock the control socket for the complete duration of the function
1619 * since from this point on we will use the socket.
1620 */
1621 if (stream->metadata_flag) {
1622 /* Metadata requires the control socket. */
1623 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1624 if (stream->reset_metadata_flag) {
1625 ret = relayd_reset_metadata(&relayd->control_sock,
1626 stream->relayd_stream_id,
1627 stream->metadata_version);
1628 if (ret < 0) {
1629 relayd_hang_up = 1;
1630 goto write_error;
1631 }
1632 stream->reset_metadata_flag = 0;
1633 }
1634 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1635 }
1636
1637 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1638 if (ret < 0) {
1639 relayd_hang_up = 1;
1640 goto write_error;
1641 }
1642 /* Use the returned socket. */
1643 outfd = ret;
1644
1645 /* Write metadata stream id before payload */
1646 if (stream->metadata_flag) {
1647 ret = write_relayd_metadata_id(outfd, stream, padding);
1648 if (ret < 0) {
1649 relayd_hang_up = 1;
1650 goto write_error;
1651 }
1652 }
1653
1654 write_len = subbuf_content_size;
1655 } else {
1656 /* No streaming; we have to write the full padding. */
1657 if (stream->metadata_flag && stream->reset_metadata_flag) {
1658 ret = utils_truncate_stream_file(stream->out_fd, 0);
1659 if (ret < 0) {
1660 ERR("Reset metadata file");
1661 goto end;
1662 }
1663 stream->reset_metadata_flag = 0;
1664 }
1665
1666 /*
1667 * Check if we need to change the tracefile before writing the packet.
1668 */
1669 if (stream->chan->tracefile_size > 0 &&
1670 (stream->tracefile_size_current + buffer->size) >
1671 stream->chan->tracefile_size) {
1672 ret = consumer_stream_rotate_output_files(stream);
1673 if (ret) {
1674 goto end;
1675 }
1676 outfd = stream->out_fd;
1677 orig_offset = 0;
1678 }
1679 stream->tracefile_size_current += buffer->size;
1680 write_len = buffer->size;
1681 }
1682
1683 /*
1684 * This call guarantee that len or less is returned. It's impossible to
1685 * receive a ret value that is bigger than len.
1686 */
1687 ret = lttng_write(outfd, buffer->data, write_len);
1688 DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
1689 if (ret < 0 || ((size_t) ret != write_len)) {
1690 /*
1691 * Report error to caller if nothing was written else at least send the
1692 * amount written.
1693 */
1694 if (ret < 0) {
1695 ret = -errno;
1696 }
1697 relayd_hang_up = 1;
1698
1699 /* Socket operation failed. We consider the relayd dead */
1700 if (errno == EPIPE) {
1701 /*
1702 * This is possible if the fd is closed on the other side
1703 * (outfd) or any write problem. It can be verbose a bit for a
1704 * normal execution if for instance the relayd is stopped
1705 * abruptly. This can happen so set this to a DBG statement.
1706 */
1707 DBG("Consumer mmap write detected relayd hang up");
1708 } else {
1709 /* Unhandled error, print it and stop function right now. */
1710 PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
1711 }
1712 goto write_error;
1713 }
1714 stream->output_written += ret;
1715
1716 /* This call is useless on a socket so better save a syscall. */
1717 if (!relayd) {
1718 /* This won't block, but will start writeout asynchronously */
1719 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
1720 stream->out_fd_offset += write_len;
1721 lttng_consumer_sync_trace_file(stream, orig_offset);
1722 }
1723
1724 write_error:
1725 /*
1726 * This is a special case that the relayd has closed its socket. Let's
1727 * cleanup the relayd object and all associated streams.
1728 */
1729 if (relayd && relayd_hang_up) {
1730 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
1731 lttng_consumer_cleanup_relayd(relayd);
1732 }
1733
1734 end:
1735 /* Unlock only if ctrl socket used */
1736 if (relayd && stream->metadata_flag) {
1737 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1738 }
1739
1740 return ret;
1741 }
1742
1743 /*
1744 * Splice the data from the ring buffer to the tracefile.
1745 *
1746 * It must be called with the stream lock held.
1747 *
1748 * Returns the number of bytes spliced.
1749 */
1750 ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
1751 struct lttng_consumer_stream *stream,
1752 unsigned long len,
1753 unsigned long padding)
1754 {
1755 ssize_t ret = 0, written = 0, ret_splice = 0;
1756 loff_t offset = 0;
1757 off_t orig_offset = stream->out_fd_offset;
1758 int fd = stream->wait_fd;
1759 /* Default is on the disk */
1760 int outfd = stream->out_fd;
1761 struct consumer_relayd_sock_pair *relayd = nullptr;
1762 int *splice_pipe;
1763 unsigned int relayd_hang_up = 0;
1764
1765 switch (the_consumer_data.type) {
1766 case LTTNG_CONSUMER_KERNEL:
1767 break;
1768 case LTTNG_CONSUMER32_UST:
1769 case LTTNG_CONSUMER64_UST:
1770 /* Not supported for user space tracing */
1771 return -ENOSYS;
1772 default:
1773 ERR("Unknown consumer_data type");
1774 abort();
1775 }
1776
1777 /* RCU lock for the relayd pointer */
1778 lttng::urcu::read_lock_guard read_lock;
1779
1780 /* Flag that the current stream if set for network streaming. */
1781 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1782 relayd = consumer_find_relayd(stream->net_seq_idx);
1783 if (relayd == nullptr) {
1784 written = -ret;
1785 goto end;
1786 }
1787 }
1788 splice_pipe = stream->splice_pipe;
1789
1790 /* Write metadata stream id before payload */
1791 if (relayd) {
1792 unsigned long total_len = len;
1793
1794 if (stream->metadata_flag) {
1795 /*
1796 * Lock the control socket for the complete duration of the function
1797 * since from this point on we will use the socket.
1798 */
1799 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1800
1801 if (stream->reset_metadata_flag) {
1802 ret = relayd_reset_metadata(&relayd->control_sock,
1803 stream->relayd_stream_id,
1804 stream->metadata_version);
1805 if (ret < 0) {
1806 relayd_hang_up = 1;
1807 goto write_error;
1808 }
1809 stream->reset_metadata_flag = 0;
1810 }
1811 ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
1812 if (ret < 0) {
1813 written = ret;
1814 relayd_hang_up = 1;
1815 goto write_error;
1816 }
1817
1818 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1819 }
1820
1821 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1822 if (ret < 0) {
1823 written = ret;
1824 relayd_hang_up = 1;
1825 goto write_error;
1826 }
1827 /* Use the returned socket. */
1828 outfd = ret;
1829 } else {
1830 /* No streaming, we have to set the len with the full padding */
1831 len += padding;
1832
1833 if (stream->metadata_flag && stream->reset_metadata_flag) {
1834 ret = utils_truncate_stream_file(stream->out_fd, 0);
1835 if (ret < 0) {
1836 ERR("Reset metadata file");
1837 goto end;
1838 }
1839 stream->reset_metadata_flag = 0;
1840 }
1841 /*
1842 * Check if we need to change the tracefile before writing the packet.
1843 */
1844 if (stream->chan->tracefile_size > 0 &&
1845 (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
1846 ret = consumer_stream_rotate_output_files(stream);
1847 if (ret < 0) {
1848 written = ret;
1849 goto end;
1850 }
1851 outfd = stream->out_fd;
1852 orig_offset = 0;
1853 }
1854 stream->tracefile_size_current += len;
1855 }
1856
1857 while (len > 0) {
1858 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1859 (unsigned long) offset,
1860 len,
1861 fd,
1862 splice_pipe[1]);
1863 ret_splice = splice(
1864 fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
1865 DBG("splice chan to pipe, ret %zd", ret_splice);
1866 if (ret_splice < 0) {
1867 ret = errno;
1868 written = -ret;
1869 PERROR("Error in relay splice");
1870 goto splice_error;
1871 }
1872
1873 /* Handle stream on the relayd if the output is on the network */
1874 if (relayd && stream->metadata_flag) {
1875 size_t metadata_payload_size =
1876 sizeof(struct lttcomm_relayd_metadata_payload);
1877
1878 /* Update counter to fit the spliced data */
1879 ret_splice += metadata_payload_size;
1880 len += metadata_payload_size;
1881 /*
1882 * We do this so the return value can match the len passed as
1883 * argument to this function.
1884 */
1885 written -= metadata_payload_size;
1886 }
1887
1888 /* Splice data out */
1889 ret_splice = splice(splice_pipe[0],
1890 nullptr,
1891 outfd,
1892 nullptr,
1893 ret_splice,
1894 SPLICE_F_MOVE | SPLICE_F_MORE);
1895 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
1896 if (ret_splice < 0) {
1897 ret = errno;
1898 written = -ret;
1899 relayd_hang_up = 1;
1900 goto write_error;
1901 } else if (ret_splice > len) {
1902 /*
1903 * We don't expect this code path to be executed but you never know
1904 * so this is an extra protection agains a buggy splice().
1905 */
1906 ret = errno;
1907 written += ret_splice;
1908 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
1909 goto splice_error;
1910 } else {
1911 /* All good, update current len and continue. */
1912 len -= ret_splice;
1913 }
1914
1915 /* This call is useless on a socket so better save a syscall. */
1916 if (!relayd) {
1917 /* This won't block, but will start writeout asynchronously */
1918 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
1919 stream->out_fd_offset += ret_splice;
1920 }
1921 stream->output_written += ret_splice;
1922 written += ret_splice;
1923 }
1924 if (!relayd) {
1925 lttng_consumer_sync_trace_file(stream, orig_offset);
1926 }
1927 goto end;
1928
1929 write_error:
1930 /*
1931 * This is a special case that the relayd has closed its socket. Let's
1932 * cleanup the relayd object and all associated streams.
1933 */
1934 if (relayd && relayd_hang_up) {
1935 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
1936 lttng_consumer_cleanup_relayd(relayd);
1937 /* Skip splice error so the consumer does not fail */
1938 goto end;
1939 }
1940
1941 splice_error:
1942 /* send the appropriate error description to sessiond */
1943 switch (ret) {
1944 case EINVAL:
1945 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
1946 break;
1947 case ENOMEM:
1948 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
1949 break;
1950 case ESPIPE:
1951 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
1952 break;
1953 }
1954
1955 end:
1956 if (relayd && stream->metadata_flag) {
1957 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1958 }
1959
1960 return written;
1961 }
1962
1963 /*
1964 * Sample the snapshot positions for a specific fd
1965 *
1966 * Returns 0 on success, < 0 on error
1967 */
1968 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
1969 {
1970 switch (the_consumer_data.type) {
1971 case LTTNG_CONSUMER_KERNEL:
1972 return lttng_kconsumer_sample_snapshot_positions(stream);
1973 case LTTNG_CONSUMER32_UST:
1974 case LTTNG_CONSUMER64_UST:
1975 return lttng_ustconsumer_sample_snapshot_positions(stream);
1976 default:
1977 ERR("Unknown consumer_data type");
1978 abort();
1979 return -ENOSYS;
1980 }
1981 }
1982 /*
1983 * Take a snapshot for a specific fd
1984 *
1985 * Returns 0 on success, < 0 on error
1986 */
1987 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
1988 {
1989 switch (the_consumer_data.type) {
1990 case LTTNG_CONSUMER_KERNEL:
1991 return lttng_kconsumer_take_snapshot(stream);
1992 case LTTNG_CONSUMER32_UST:
1993 case LTTNG_CONSUMER64_UST:
1994 return lttng_ustconsumer_take_snapshot(stream);
1995 default:
1996 ERR("Unknown consumer_data type");
1997 abort();
1998 return -ENOSYS;
1999 }
2000 }
2001
2002 /*
2003 * Get the produced position
2004 *
2005 * Returns 0 on success, < 0 on error
2006 */
2007 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
2008 {
2009 switch (the_consumer_data.type) {
2010 case LTTNG_CONSUMER_KERNEL:
2011 return lttng_kconsumer_get_produced_snapshot(stream, pos);
2012 case LTTNG_CONSUMER32_UST:
2013 case LTTNG_CONSUMER64_UST:
2014 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
2015 default:
2016 ERR("Unknown consumer_data type");
2017 abort();
2018 return -ENOSYS;
2019 }
2020 }
2021
2022 /*
2023 * Get the consumed position (free-running counter position in bytes).
2024 *
2025 * Returns 0 on success, < 0 on error
2026 */
2027 int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
2028 {
2029 switch (the_consumer_data.type) {
2030 case LTTNG_CONSUMER_KERNEL:
2031 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2032 case LTTNG_CONSUMER32_UST:
2033 case LTTNG_CONSUMER64_UST:
2034 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2035 default:
2036 ERR("Unknown consumer_data type");
2037 abort();
2038 return -ENOSYS;
2039 }
2040 }
2041
2042 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
2043 int sock,
2044 struct pollfd *consumer_sockpoll)
2045 {
2046 switch (the_consumer_data.type) {
2047 case LTTNG_CONSUMER_KERNEL:
2048 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2049 case LTTNG_CONSUMER32_UST:
2050 case LTTNG_CONSUMER64_UST:
2051 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2052 default:
2053 ERR("Unknown consumer_data type");
2054 abort();
2055 return -ENOSYS;
2056 }
2057 }
2058
2059 static void lttng_consumer_close_all_metadata()
2060 {
2061 switch (the_consumer_data.type) {
2062 case LTTNG_CONSUMER_KERNEL:
2063 /*
2064 * The Kernel consumer has a different metadata scheme so we don't
2065 * close anything because the stream will be closed by the session
2066 * daemon.
2067 */
2068 break;
2069 case LTTNG_CONSUMER32_UST:
2070 case LTTNG_CONSUMER64_UST:
2071 /*
2072 * Close all metadata streams. The metadata hash table is passed and
2073 * this call iterates over it by closing all wakeup fd. This is safe
2074 * because at this point we are sure that the metadata producer is
2075 * either dead or blocked.
2076 */
2077 lttng_ustconsumer_close_all_metadata(metadata_ht);
2078 break;
2079 default:
2080 ERR("Unknown consumer_data type");
2081 abort();
2082 }
2083 }
2084
2085 /*
2086 * Clean up a metadata stream and free its memory.
2087 */
2088 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
2089 {
2090 struct lttng_consumer_channel *channel = nullptr;
2091 bool free_channel = false;
2092
2093 LTTNG_ASSERT(stream);
2094 /*
2095 * This call should NEVER receive regular stream. It must always be
2096 * metadata stream and this is crucial for data structure synchronization.
2097 */
2098 LTTNG_ASSERT(stream->metadata_flag);
2099
2100 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2101
2102 pthread_mutex_lock(&the_consumer_data.lock);
2103 /*
2104 * Note that this assumes that a stream's channel is never changed and
2105 * that the stream's lock doesn't need to be taken to sample its
2106 * channel.
2107 */
2108 channel = stream->chan;
2109 pthread_mutex_lock(&channel->lock);
2110 pthread_mutex_lock(&stream->lock);
2111 if (channel->metadata_cache) {
2112 /* Only applicable to userspace consumers. */
2113 pthread_mutex_lock(&channel->metadata_cache->lock);
2114 }
2115
2116 /* Remove any reference to that stream. */
2117 consumer_stream_delete(stream, ht);
2118
2119 /* Close down everything including the relayd if one. */
2120 consumer_stream_close_output(stream);
2121 /* Destroy tracer buffers of the stream. */
2122 consumer_stream_destroy_buffers(stream);
2123
2124 /* Atomically decrement channel refcount since other threads can use it. */
2125 if (!uatomic_sub_return(&channel->refcount, 1) &&
2126 !uatomic_read(&channel->nb_init_stream_left)) {
2127 /* Go for channel deletion! */
2128 free_channel = true;
2129 }
2130 stream->chan = nullptr;
2131
2132 /*
2133 * Nullify the stream reference so it is not used after deletion. The
2134 * channel lock MUST be acquired before being able to check for a NULL
2135 * pointer value.
2136 */
2137 channel->metadata_stream = nullptr;
2138 channel->metadata_pushed_wait_queue.wake_all();
2139
2140 if (channel->metadata_cache) {
2141 pthread_mutex_unlock(&channel->metadata_cache->lock);
2142 }
2143 pthread_mutex_unlock(&stream->lock);
2144 pthread_mutex_unlock(&channel->lock);
2145 pthread_mutex_unlock(&the_consumer_data.lock);
2146
2147 if (free_channel) {
2148 consumer_del_channel(channel);
2149 }
2150
2151 lttng_trace_chunk_put(stream->trace_chunk);
2152 stream->trace_chunk = nullptr;
2153 consumer_stream_free(stream);
2154 }
2155
2156 /*
2157 * Action done with the metadata stream when adding it to the consumer internal
2158 * data structures to handle it.
2159 */
2160 void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
2161 {
2162 struct lttng_ht *ht = metadata_ht;
2163 struct lttng_ht_iter iter;
2164 struct lttng_ht_node_u64 *node;
2165
2166 LTTNG_ASSERT(stream);
2167 LTTNG_ASSERT(ht);
2168
2169 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
2170
2171 pthread_mutex_lock(&the_consumer_data.lock);
2172 pthread_mutex_lock(&stream->chan->lock);
2173 pthread_mutex_lock(&stream->chan->timer_lock);
2174 pthread_mutex_lock(&stream->lock);
2175
2176 /*
2177 * From here, refcounts are updated so be _careful_ when returning an error
2178 * after this point.
2179 */
2180
2181 lttng::urcu::read_lock_guard read_lock;
2182
2183 /*
2184 * Lookup the stream just to make sure it does not exist in our internal
2185 * state. This should NEVER happen.
2186 */
2187 lttng_ht_lookup(ht, &stream->key, &iter);
2188 node = lttng_ht_iter_get_node_u64(&iter);
2189 LTTNG_ASSERT(!node);
2190
2191 /*
2192 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2193 * in terms of destroying the associated channel, because the action that
2194 * causes the count to become 0 also causes a stream to be added. The
2195 * channel deletion will thus be triggered by the following removal of this
2196 * stream.
2197 */
2198 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
2199 /* Increment refcount before decrementing nb_init_stream_left */
2200 cmm_smp_wmb();
2201 uatomic_dec(&stream->chan->nb_init_stream_left);
2202 }
2203
2204 lttng_ht_add_unique_u64(ht, &stream->node);
2205
2206 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
2207
2208 /*
2209 * Add stream to the stream_list_ht of the consumer data. No need to steal
2210 * the key since the HT does not use it and we allow to add redundant keys
2211 * into this table.
2212 */
2213 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
2214
2215 pthread_mutex_unlock(&stream->lock);
2216 pthread_mutex_unlock(&stream->chan->lock);
2217 pthread_mutex_unlock(&stream->chan->timer_lock);
2218 pthread_mutex_unlock(&the_consumer_data.lock);
2219 }
2220
2221 /*
2222 * Delete data stream that are flagged for deletion (endpoint_status).
2223 */
2224 static void validate_endpoint_status_data_stream()
2225 {
2226 struct lttng_ht_iter iter;
2227 struct lttng_consumer_stream *stream;
2228
2229 DBG("Consumer delete flagged data stream");
2230
2231 {
2232 lttng::urcu::read_lock_guard read_lock;
2233
2234 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
2235 /* Validate delete flag of the stream */
2236 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2237 continue;
2238 }
2239 /* Delete it right now */
2240 consumer_del_stream(stream, data_ht);
2241 }
2242 }
2243 }
2244
2245 /*
2246 * Delete metadata stream that are flagged for deletion (endpoint_status).
2247 */
2248 static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
2249 {
2250 struct lttng_ht_iter iter;
2251 struct lttng_consumer_stream *stream;
2252
2253 DBG("Consumer delete flagged metadata stream");
2254
2255 LTTNG_ASSERT(pollset);
2256
2257 {
2258 lttng::urcu::read_lock_guard read_lock;
2259 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
2260 /* Validate delete flag of the stream */
2261 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2262 continue;
2263 }
2264 /*
2265 * Remove from pollset so the metadata thread can continue without
2266 * blocking on a deleted stream.
2267 */
2268 lttng_poll_del(pollset, stream->wait_fd);
2269
2270 /* Delete it right now */
2271 consumer_del_metadata_stream(stream, metadata_ht);
2272 }
2273 }
2274 }
2275
2276 /*
2277 * Thread polls on metadata file descriptor and write them on disk or on the
2278 * network.
2279 */
2280 void *consumer_thread_metadata_poll(void *data)
2281 {
2282 int ret, i, pollfd, err = -1;
2283 uint32_t revents, nb_fd;
2284 struct lttng_consumer_stream *stream = nullptr;
2285 struct lttng_ht_iter iter;
2286 struct lttng_ht_node_u64 *node;
2287 struct lttng_poll_event events;
2288 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2289 ssize_t len;
2290
2291 rcu_register_thread();
2292
2293 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2294
2295 if (testpoint(consumerd_thread_metadata)) {
2296 goto error_testpoint;
2297 }
2298
2299 health_code_update();
2300
2301 DBG("Thread metadata poll started");
2302
2303 /* Size is set to 1 for the consumer_metadata pipe */
2304 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2305 if (ret < 0) {
2306 ERR("Poll set creation failed");
2307 goto end_poll;
2308 }
2309
2310 ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
2311 if (ret < 0) {
2312 goto end;
2313 }
2314
2315 /* Main loop */
2316 DBG("Metadata main loop started");
2317
2318 while (true) {
2319 restart:
2320 health_code_update();
2321 health_poll_entry();
2322 DBG("Metadata poll wait");
2323 ret = lttng_poll_wait(&events, -1);
2324 DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2325 health_poll_exit();
2326 DBG("Metadata event caught in thread");
2327 if (ret < 0) {
2328 if (errno == EINTR) {
2329 ERR("Poll EINTR caught");
2330 goto restart;
2331 }
2332 if (LTTNG_POLL_GETNB(&events) == 0) {
2333 err = 0; /* All is OK */
2334 }
2335 goto end;
2336 }
2337
2338 nb_fd = ret;
2339
2340 /* From here, the event is a metadata wait fd */
2341 for (i = 0; i < nb_fd; i++) {
2342 health_code_update();
2343
2344 revents = LTTNG_POLL_GETEV(&events, i);
2345 pollfd = LTTNG_POLL_GETFD(&events, i);
2346
2347 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
2348 if (revents & LPOLLIN) {
2349 ssize_t pipe_len;
2350
2351 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2352 &stream,
2353 sizeof(stream)); /* NOLINT sizeof
2354 used on a
2355 pointer. */
2356 if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
2357 pointer. */
2358 if (pipe_len < 0) {
2359 PERROR("read metadata stream");
2360 }
2361 /*
2362 * Remove the pipe from the poll set and continue
2363 * the loop since their might be data to consume.
2364 */
2365 lttng_poll_del(
2366 &events,
2367 lttng_pipe_get_readfd(
2368 ctx->consumer_metadata_pipe));
2369 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2370 continue;
2371 }
2372
2373 /* A NULL stream means that the state has changed. */
2374 if (stream == nullptr) {
2375 /* Check for deleted streams. */
2376 validate_endpoint_status_metadata_stream(&events);
2377 goto restart;
2378 }
2379
2380 DBG("Adding metadata stream %d to poll set",
2381 stream->wait_fd);
2382
2383 /* Add metadata stream to the global poll events list */
2384 lttng_poll_add(
2385 &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
2386 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2387 DBG("Metadata thread pipe hung up");
2388 /*
2389 * Remove the pipe from the poll set and continue the loop
2390 * since their might be data to consume.
2391 */
2392 lttng_poll_del(
2393 &events,
2394 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2395 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2396 continue;
2397 } else {
2398 ERR("Unexpected poll events %u for sock %d",
2399 revents,
2400 pollfd);
2401 goto end;
2402 }
2403
2404 /* Handle other stream */
2405 continue;
2406 }
2407
2408 lttng::urcu::read_lock_guard read_lock;
2409 {
2410 uint64_t tmp_id = (uint64_t) pollfd;
2411
2412 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2413 }
2414 node = lttng_ht_iter_get_node_u64(&iter);
2415 LTTNG_ASSERT(node);
2416
2417 stream = caa_container_of(node, struct lttng_consumer_stream, node);
2418
2419 if (revents & (LPOLLIN | LPOLLPRI)) {
2420 /* Get the data out of the metadata file descriptor */
2421 DBG("Metadata available on fd %d", pollfd);
2422 LTTNG_ASSERT(stream->wait_fd == pollfd);
2423
2424 do {
2425 health_code_update();
2426
2427 len = ctx->on_buffer_ready(stream, ctx, false);
2428 /*
2429 * We don't check the return value here since if we get
2430 * a negative len, it means an error occurred thus we
2431 * simply remove it from the poll set and free the
2432 * stream.
2433 */
2434 } while (len > 0);
2435
2436 /* It's ok to have an unavailable sub-buffer */
2437 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2438 /* Clean up stream from consumer and free it. */
2439 lttng_poll_del(&events, stream->wait_fd);
2440 consumer_del_metadata_stream(stream, metadata_ht);
2441 }
2442 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2443 DBG("Metadata fd %d is hup|err.", pollfd);
2444 if (!stream->hangup_flush_done &&
2445 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2446 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
2447 DBG("Attempting to flush and consume the UST buffers");
2448 lttng_ustconsumer_on_stream_hangup(stream);
2449
2450 /* We just flushed the stream now read it. */
2451 do {
2452 health_code_update();
2453
2454 len = ctx->on_buffer_ready(stream, ctx, false);
2455 /*
2456 * We don't check the return value here since if we
2457 * get a negative len, it means an error occurred
2458 * thus we simply remove it from the poll set and
2459 * free the stream.
2460 */
2461 } while (len > 0);
2462 }
2463
2464 lttng_poll_del(&events, stream->wait_fd);
2465 /*
2466 * This call update the channel states, closes file descriptors
2467 * and securely free the stream.
2468 */
2469 consumer_del_metadata_stream(stream, metadata_ht);
2470 } else {
2471 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2472 goto end;
2473 }
2474 /* Release RCU lock for the stream looked up */
2475 }
2476 }
2477
2478 /* All is OK */
2479 err = 0;
2480 end:
2481 DBG("Metadata poll thread exiting");
2482
2483 lttng_poll_clean(&events);
2484 end_poll:
2485 error_testpoint:
2486 if (err) {
2487 health_error();
2488 ERR("Health error occurred in %s", __func__);
2489 }
2490 health_unregister(health_consumerd);
2491 rcu_unregister_thread();
2492 return nullptr;
2493 }
2494
2495 /*
2496 * This thread polls the fds in the set to consume the data and write
2497 * it to tracefile if necessary.
2498 */
2499 void *consumer_thread_data_poll(void *data)
2500 {
2501 int num_rdy, high_prio, ret, i, err = -1;
2502 struct pollfd *pollfd = nullptr;
2503 /* local view of the streams */
2504 struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
2505 /* local view of consumer_data.fds_count */
2506 int nb_fd = 0;
2507 /* 2 for the consumer_data_pipe and wake up pipe */
2508 const int nb_pipes_fd = 2;
2509 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2510 int nb_inactive_fd = 0;
2511 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2512 ssize_t len;
2513
2514 rcu_register_thread();
2515
2516 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2517
2518 if (testpoint(consumerd_thread_data)) {
2519 goto error_testpoint;
2520 }
2521
2522 health_code_update();
2523
2524 local_stream = zmalloc<lttng_consumer_stream *>();
2525 if (local_stream == nullptr) {
2526 PERROR("local_stream malloc");
2527 goto end;
2528 }
2529
2530 while (true) {
2531 health_code_update();
2532
2533 high_prio = 0;
2534
2535 /*
2536 * the fds set has been updated, we need to update our
2537 * local array as well
2538 */
2539 pthread_mutex_lock(&the_consumer_data.lock);
2540 if (the_consumer_data.need_update) {
2541 free(pollfd);
2542 pollfd = nullptr;
2543
2544 free(local_stream);
2545 local_stream = nullptr;
2546
2547 /* Allocate for all fds */
2548 pollfd =
2549 calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
2550 if (pollfd == nullptr) {
2551 PERROR("pollfd malloc");
2552 pthread_mutex_unlock(&the_consumer_data.lock);
2553 goto end;
2554 }
2555
2556 local_stream = calloc<lttng_consumer_stream *>(
2557 the_consumer_data.stream_count + nb_pipes_fd);
2558 if (local_stream == nullptr) {
2559 PERROR("local_stream malloc");
2560 pthread_mutex_unlock(&the_consumer_data.lock);
2561 goto end;
2562 }
2563 ret = update_poll_array(
2564 ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
2565 if (ret < 0) {
2566 ERR("Error in allocating pollfd or local_outfds");
2567 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2568 pthread_mutex_unlock(&the_consumer_data.lock);
2569 goto end;
2570 }
2571 nb_fd = ret;
2572 the_consumer_data.need_update = 0;
2573 }
2574 pthread_mutex_unlock(&the_consumer_data.lock);
2575
2576 /* No FDs and consumer_quit, consumer_cleanup the thread */
2577 if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
2578 err = 0; /* All is OK */
2579 goto end;
2580 }
2581 /* poll on the array of fds */
2582 restart:
2583 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
2584 if (testpoint(consumerd_thread_data_poll)) {
2585 goto end;
2586 }
2587 health_poll_entry();
2588 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
2589 health_poll_exit();
2590 DBG("poll num_rdy : %d", num_rdy);
2591 if (num_rdy == -1) {
2592 /*
2593 * Restart interrupted system call.
2594 */
2595 if (errno == EINTR) {
2596 goto restart;
2597 }
2598 PERROR("Poll error");
2599 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2600 goto end;
2601 } else if (num_rdy == 0) {
2602 DBG("Polling thread timed out");
2603 goto end;
2604 }
2605
2606 if (caa_unlikely(data_consumption_paused)) {
2607 DBG("Data consumption paused, sleeping...");
2608 sleep(1);
2609 goto restart;
2610 }
2611
2612 /*
2613 * If the consumer_data_pipe triggered poll go directly to the
2614 * beginning of the loop to update the array. We want to prioritize
2615 * array update over low-priority reads.
2616 */
2617 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2618 ssize_t pipe_readlen;
2619
2620 DBG("consumer_data_pipe wake up");
2621 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2622 &new_stream,
2623 sizeof(new_stream)); /* NOLINT sizeof used on
2624 a pointer. */
2625 if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
2626 */
2627 PERROR("Consumer data pipe");
2628 /* Continue so we can at least handle the current stream(s). */
2629 continue;
2630 }
2631
2632 /*
2633 * If the stream is NULL, just ignore it. It's also possible that
2634 * the sessiond poll thread changed the consumer_quit state and is
2635 * waking us up to test it.
2636 */
2637 if (new_stream == nullptr) {
2638 validate_endpoint_status_data_stream();
2639 continue;
2640 }
2641
2642 /* Continue to update the local streams and handle prio ones */
2643 continue;
2644 }
2645
2646 /* Handle wakeup pipe. */
2647 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2648 char dummy;
2649 ssize_t pipe_readlen;
2650
2651 pipe_readlen =
2652 lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
2653 if (pipe_readlen < 0) {
2654 PERROR("Consumer data wakeup pipe");
2655 }
2656 /* We've been awakened to handle stream(s). */
2657 ctx->has_wakeup = 0;
2658 }
2659
2660 /* Take care of high priority channels first. */
2661 for (i = 0; i < nb_fd; i++) {
2662 health_code_update();
2663
2664 if (local_stream[i] == nullptr) {
2665 continue;
2666 }
2667 if (pollfd[i].revents & POLLPRI) {
2668 DBG("Urgent read on fd %d", pollfd[i].fd);
2669 high_prio = 1;
2670 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
2671 /* it's ok to have an unavailable sub-buffer */
2672 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2673 /* Clean the stream and free it. */
2674 consumer_del_stream(local_stream[i], data_ht);
2675 local_stream[i] = nullptr;
2676 } else if (len > 0) {
2677 local_stream[i]->has_data_left_to_be_read_before_teardown =
2678 1;
2679 }
2680 }
2681 }
2682
2683 /*
2684 * If we read high prio channel in this loop, try again
2685 * for more high prio data.
2686 */
2687 if (high_prio) {
2688 continue;
2689 }
2690
2691 /* Take care of low priority channels. */
2692 for (i = 0; i < nb_fd; i++) {
2693 health_code_update();
2694
2695 if (local_stream[i] == nullptr) {
2696 continue;
2697 }
2698 if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
2699 local_stream[i]->has_data) {
2700 DBG("Normal read on fd %d", pollfd[i].fd);
2701 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
2702 /* it's ok to have an unavailable sub-buffer */
2703 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2704 /* Clean the stream and free it. */
2705 consumer_del_stream(local_stream[i], data_ht);
2706 local_stream[i] = nullptr;
2707 } else if (len > 0) {
2708 local_stream[i]->has_data_left_to_be_read_before_teardown =
2709 1;
2710 }
2711 }
2712 }
2713
2714 /* Handle hangup and errors */
2715 for (i = 0; i < nb_fd; i++) {
2716 health_code_update();
2717
2718 if (local_stream[i] == nullptr) {
2719 continue;
2720 }
2721 if (!local_stream[i]->hangup_flush_done &&
2722 (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
2723 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2724 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
2725 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2726 pollfd[i].fd);
2727 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2728 /* Attempt read again, for the data we just flushed. */
2729 local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
2730 }
2731 /*
2732 * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
2733 * performed. This type of flush ensures that a new packet is produced no
2734 * matter the consumed/produced positions are.
2735 *
2736 * This, in turn, causes the next pass to see that data available for the
2737 * stream. When we come back here, we can be assured that all available
2738 * data has been consumed and we can finally destroy the stream.
2739 *
2740 * If the poll flag is HUP/ERR/NVAL and we have
2741 * read no data in this pass, we can remove the
2742 * stream from its hash table.
2743 */
2744 if ((pollfd[i].revents & POLLHUP)) {
2745 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2746 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2747 consumer_del_stream(local_stream[i], data_ht);
2748 local_stream[i] = nullptr;
2749 }
2750 } else if (pollfd[i].revents & POLLERR) {
2751 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2752 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2753 consumer_del_stream(local_stream[i], data_ht);
2754 local_stream[i] = nullptr;
2755 }
2756 } else if (pollfd[i].revents & POLLNVAL) {
2757 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2758 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2759 consumer_del_stream(local_stream[i], data_ht);
2760 local_stream[i] = nullptr;
2761 }
2762 }
2763 if (local_stream[i] != nullptr) {
2764 local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
2765 }
2766 }
2767 }
2768 /* All is OK */
2769 err = 0;
2770 end:
2771 DBG("polling thread exiting");
2772 free(pollfd);
2773 free(local_stream);
2774
2775 /*
2776 * Close the write side of the pipe so epoll_wait() in
2777 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2778 * read side of the pipe. If we close them both, epoll_wait strangely does
2779 * not return and could create a endless wait period if the pipe is the
2780 * only tracked fd in the poll set. The thread will take care of closing
2781 * the read side.
2782 */
2783 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
2784
2785 error_testpoint:
2786 if (err) {
2787 health_error();
2788 ERR("Health error occurred in %s", __func__);
2789 }
2790 health_unregister(health_consumerd);
2791
2792 rcu_unregister_thread();
2793 return nullptr;
2794 }
2795
2796 /*
2797 * Close wake-up end of each stream belonging to the channel. This will
2798 * allow the poll() on the stream read-side to detect when the
2799 * write-side (application) finally closes them.
2800 */
2801 static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2802 {
2803 struct lttng_ht *ht;
2804 struct lttng_consumer_stream *stream;
2805 struct lttng_ht_iter iter;
2806
2807 ht = the_consumer_data.stream_per_chan_id_ht;
2808
2809 lttng::urcu::read_lock_guard read_lock;
2810 cds_lfht_for_each_entry_duplicate(ht->ht,
2811 ht->hash_fct(&channel->key, lttng_ht_seed),
2812 ht->match_fct,
2813 &channel->key,
2814 &iter.iter,
2815 stream,
2816 node_channel_id.node)
2817 {
2818 /*
2819 * Protect against teardown with mutex.
2820 */
2821 pthread_mutex_lock(&stream->lock);
2822 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2823 goto next;
2824 }
2825 switch (the_consumer_data.type) {
2826 case LTTNG_CONSUMER_KERNEL:
2827 break;
2828 case LTTNG_CONSUMER32_UST:
2829 case LTTNG_CONSUMER64_UST:
2830 if (stream->metadata_flag) {
2831 /* Safe and protected by the stream lock. */
2832 lttng_ustconsumer_close_metadata(stream->chan);
2833 } else {
2834 /*
2835 * Note: a mutex is taken internally within
2836 * liblttng-ust-ctl to protect timer wakeup_fd
2837 * use from concurrent close.
2838 */
2839 lttng_ustconsumer_close_stream_wakeup(stream);
2840 }
2841 break;
2842 default:
2843 ERR("Unknown consumer_data type");
2844 abort();
2845 }
2846 next:
2847 pthread_mutex_unlock(&stream->lock);
2848 }
2849 }
2850
2851 static void destroy_channel_ht(struct lttng_ht *ht)
2852 {
2853 struct lttng_ht_iter iter;
2854 struct lttng_consumer_channel *channel;
2855 int ret;
2856
2857 if (ht == nullptr) {
2858 return;
2859 }
2860
2861 {
2862 lttng::urcu::read_lock_guard read_lock;
2863
2864 cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
2865 ret = lttng_ht_del(ht, &iter);
2866 LTTNG_ASSERT(ret != 0);
2867 }
2868 }
2869
2870 lttng_ht_destroy(ht);
2871 }
2872
2873 /*
2874 * This thread polls the channel fds to detect when they are being
2875 * closed. It closes all related streams if the channel is detected as
2876 * closed. It is currently only used as a shim layer for UST because the
2877 * consumerd needs to keep the per-stream wakeup end of pipes open for
2878 * periodical flush.
2879 */
2880 void *consumer_thread_channel_poll(void *data)
2881 {
2882 int ret, i, pollfd, err = -1;
2883 uint32_t revents, nb_fd;
2884 struct lttng_consumer_channel *chan = nullptr;
2885 struct lttng_ht_iter iter;
2886 struct lttng_ht_node_u64 *node;
2887 struct lttng_poll_event events;
2888 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2889 struct lttng_ht *channel_ht;
2890
2891 rcu_register_thread();
2892
2893 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2894
2895 if (testpoint(consumerd_thread_channel)) {
2896 goto error_testpoint;
2897 }
2898
2899 health_code_update();
2900
2901 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2902 if (!channel_ht) {
2903 /* ENOMEM at this point. Better to bail out. */
2904 goto end_ht;
2905 }
2906
2907 DBG("Thread channel poll started");
2908
2909 /* Size is set to 1 for the consumer_channel pipe */
2910 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2911 if (ret < 0) {
2912 ERR("Poll set creation failed");
2913 goto end_poll;
2914 }
2915
2916 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2917 if (ret < 0) {
2918 goto end;
2919 }
2920
2921 /* Main loop */
2922 DBG("Channel main loop started");
2923
2924 while (true) {
2925 restart:
2926 health_code_update();
2927 DBG("Channel poll wait");
2928 health_poll_entry();
2929 ret = lttng_poll_wait(&events, -1);
2930 DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2931 health_poll_exit();
2932 DBG("Channel event caught in thread");
2933 if (ret < 0) {
2934 if (errno == EINTR) {
2935 ERR("Poll EINTR caught");
2936 goto restart;
2937 }
2938 if (LTTNG_POLL_GETNB(&events) == 0) {
2939 err = 0; /* All is OK */
2940 }
2941 goto end;
2942 }
2943
2944 nb_fd = ret;
2945
2946 /* From here, the event is a channel wait fd */
2947 for (i = 0; i < nb_fd; i++) {
2948 health_code_update();
2949
2950 revents = LTTNG_POLL_GETEV(&events, i);
2951 pollfd = LTTNG_POLL_GETFD(&events, i);
2952
2953 if (pollfd == ctx->consumer_channel_pipe[0]) {
2954 if (revents & LPOLLIN) {
2955 enum consumer_channel_action action;
2956 uint64_t key;
2957
2958 ret = read_channel_pipe(ctx, &chan, &key, &action);
2959 if (ret <= 0) {
2960 if (ret < 0) {
2961 ERR("Error reading channel pipe");
2962 }
2963 lttng_poll_del(&events,
2964 ctx->consumer_channel_pipe[0]);
2965 continue;
2966 }
2967
2968 switch (action) {
2969 case CONSUMER_CHANNEL_ADD:
2970 {
2971 DBG("Adding channel %d to poll set", chan->wait_fd);
2972
2973 lttng_ht_node_init_u64(&chan->wait_fd_node,
2974 chan->wait_fd);
2975 lttng::urcu::read_lock_guard read_lock;
2976 lttng_ht_add_unique_u64(channel_ht,
2977 &chan->wait_fd_node);
2978 /* Add channel to the global poll events list */
2979 // FIXME: Empty flag on a pipe pollset, this might
2980 // hang on FreeBSD.
2981 lttng_poll_add(&events, chan->wait_fd, 0);
2982 break;
2983 }
2984 case CONSUMER_CHANNEL_DEL:
2985 {
2986 /*
2987 * This command should never be called if the
2988 * channel has streams monitored by either the data
2989 * or metadata thread. The consumer only notify this
2990 * thread with a channel del. command if it receives
2991 * a destroy channel command from the session daemon
2992 * that send it if a command prior to the
2993 * GET_CHANNEL failed.
2994 */
2995
2996 lttng::urcu::read_lock_guard read_lock;
2997 chan = consumer_find_channel(key);
2998 if (!chan) {
2999 ERR("UST consumer get channel key %" PRIu64
3000 " not found for del channel",
3001 key);
3002 break;
3003 }
3004 lttng_poll_del(&events, chan->wait_fd);
3005 iter.iter.node = &chan->wait_fd_node.node;
3006 ret = lttng_ht_del(channel_ht, &iter);
3007 LTTNG_ASSERT(ret == 0);
3008
3009 switch (the_consumer_data.type) {
3010 case LTTNG_CONSUMER_KERNEL:
3011 break;
3012 case LTTNG_CONSUMER32_UST:
3013 case LTTNG_CONSUMER64_UST:
3014 health_code_update();
3015 /* Destroy streams that might have been left
3016 * in the stream list. */
3017 clean_channel_stream_list(chan);
3018 break;
3019 default:
3020 ERR("Unknown consumer_data type");
3021 abort();
3022 }
3023
3024 /*
3025 * Release our own refcount. Force channel deletion
3026 * even if streams were not initialized.
3027 */
3028 if (!uatomic_sub_return(&chan->refcount, 1)) {
3029 consumer_del_channel(chan);
3030 }
3031 goto restart;
3032 }
3033 case CONSUMER_CHANNEL_QUIT:
3034 /*
3035 * Remove the pipe from the poll set and continue
3036 * the loop since their might be data to consume.
3037 */
3038 lttng_poll_del(&events,
3039 ctx->consumer_channel_pipe[0]);
3040 continue;
3041 default:
3042 ERR("Unknown action");
3043 break;
3044 }
3045 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3046 DBG("Channel thread pipe hung up");
3047 /*
3048 * Remove the pipe from the poll set and continue the loop
3049 * since their might be data to consume.
3050 */
3051 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3052 continue;
3053 } else {
3054 ERR("Unexpected poll events %u for sock %d",
3055 revents,
3056 pollfd);
3057 goto end;
3058 }
3059
3060 /* Handle other stream */
3061 continue;
3062 }
3063
3064 lttng::urcu::read_lock_guard read_lock;
3065 {
3066 uint64_t tmp_id = (uint64_t) pollfd;
3067
3068 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3069 }
3070 node = lttng_ht_iter_get_node_u64(&iter);
3071 LTTNG_ASSERT(node);
3072
3073 chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
3074
3075 /* Check for error event */
3076 if (revents & (LPOLLERR | LPOLLHUP)) {
3077 DBG("Channel fd %d is hup|err.", pollfd);
3078
3079 lttng_poll_del(&events, chan->wait_fd);
3080 ret = lttng_ht_del(channel_ht, &iter);
3081 LTTNG_ASSERT(ret == 0);
3082
3083 /*
3084 * This will close the wait fd for each stream associated to
3085 * this channel AND monitored by the data/metadata thread thus
3086 * will be clean by the right thread.
3087 */
3088 consumer_close_channel_streams(chan);
3089
3090 /* Release our own refcount */
3091 if (!uatomic_sub_return(&chan->refcount, 1) &&
3092 !uatomic_read(&chan->nb_init_stream_left)) {
3093 consumer_del_channel(chan);
3094 }
3095 } else {
3096 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3097 goto end;
3098 }
3099
3100 /* Release RCU lock for the channel looked up */
3101 }
3102 }
3103
3104 /* All is OK */
3105 err = 0;
3106 end:
3107 lttng_poll_clean(&events);
3108 end_poll:
3109 destroy_channel_ht(channel_ht);
3110 end_ht:
3111 error_testpoint:
3112 DBG("Channel poll thread exiting");
3113 if (err) {
3114 health_error();
3115 ERR("Health error occurred in %s", __func__);
3116 }
3117 health_unregister(health_consumerd);
3118 rcu_unregister_thread();
3119 return nullptr;
3120 }
3121
3122 static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
3123 struct pollfd *sockpoll,
3124 int client_socket)
3125 {
3126 int ret;
3127
3128 LTTNG_ASSERT(ctx);
3129 LTTNG_ASSERT(sockpoll);
3130
3131 ret = lttng_consumer_poll_socket(sockpoll);
3132 if (ret) {
3133 goto error;
3134 }
3135 DBG("Metadata connection on client_socket");
3136
3137 /* Blocking call, waiting for transmission */
3138 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3139 if (ctx->consumer_metadata_socket < 0) {
3140 WARN("On accept metadata");
3141 ret = -1;
3142 goto error;
3143 }
3144 ret = 0;
3145
3146 error:
3147 return ret;
3148 }
3149
3150 /*
3151 * This thread listens on the consumerd socket and receives the file
3152 * descriptors from the session daemon.
3153 */
3154 void *consumer_thread_sessiond_poll(void *data)
3155 {
3156 int sock = -1, client_socket, ret, err = -1;
3157 /*
3158 * structure to poll for incoming data on communication socket avoids
3159 * making blocking sockets.
3160 */
3161 struct pollfd consumer_sockpoll[2];
3162 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
3163
3164 rcu_register_thread();
3165
3166 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3167
3168 if (testpoint(consumerd_thread_sessiond)) {
3169 goto error_testpoint;
3170 }
3171
3172 health_code_update();
3173
3174 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3175 unlink(ctx->consumer_command_sock_path);
3176 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3177 if (client_socket < 0) {
3178 ERR("Cannot create command socket");
3179 goto end;
3180 }
3181
3182 ret = lttcomm_listen_unix_sock(client_socket);
3183 if (ret < 0) {
3184 goto end;
3185 }
3186
3187 DBG("Sending ready command to lttng-sessiond");
3188 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3189 /* return < 0 on error, but == 0 is not fatal */
3190 if (ret < 0) {
3191 ERR("Error sending ready command to lttng-sessiond");
3192 goto end;
3193 }
3194
3195 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3196 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3197 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3198 consumer_sockpoll[1].fd = client_socket;
3199 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3200
3201 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3202 if (ret) {
3203 if (ret > 0) {
3204 /* should exit */
3205 err = 0;
3206 }
3207 goto end;
3208 }
3209 DBG("Connection on client_socket");
3210
3211 /* Blocking call, waiting for transmission */
3212 sock = lttcomm_accept_unix_sock(client_socket);
3213 if (sock < 0) {
3214 WARN("On accept");
3215 goto end;
3216 }
3217
3218 /*
3219 * Setup metadata socket which is the second socket connection on the
3220 * command unix socket.
3221 */
3222 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
3223 if (ret) {
3224 if (ret > 0) {
3225 /* should exit */
3226 err = 0;
3227 }
3228 goto end;
3229 }
3230
3231 /* This socket is not useful anymore. */
3232 ret = close(client_socket);
3233 if (ret < 0) {
3234 PERROR("close client_socket");
3235 }
3236 client_socket = -1;
3237
3238 /* update the polling structure to poll on the established socket */
3239 consumer_sockpoll[1].fd = sock;
3240 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3241
3242 while (true) {
3243 health_code_update();
3244
3245 health_poll_entry();
3246 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3247 health_poll_exit();
3248 if (ret) {
3249 if (ret > 0) {
3250 /* should exit */
3251 err = 0;
3252 }
3253 goto end;
3254 }
3255 DBG("Incoming command on sock");
3256 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
3257 if (ret <= 0) {
3258 /*
3259 * This could simply be a session daemon quitting. Don't output
3260 * ERR() here.
3261 */
3262 DBG("Communication interrupted on command socket");
3263 err = 0;
3264 goto end;
3265 }
3266 if (CMM_LOAD_SHARED(consumer_quit)) {
3267 DBG("consumer_thread_receive_fds received quit from signal");
3268 err = 0; /* All is OK */
3269 goto end;
3270 }
3271 DBG("Received command on sock");
3272 }
3273 /* All is OK */
3274 err = 0;
3275
3276 end:
3277 DBG("Consumer thread sessiond poll exiting");
3278
3279 /*
3280 * Close metadata streams since the producer is the session daemon which
3281 * just died.
3282 *
3283 * NOTE: for now, this only applies to the UST tracer.
3284 */
3285 lttng_consumer_close_all_metadata();
3286
3287 /*
3288 * when all fds have hung up, the polling thread
3289 * can exit cleanly
3290 */
3291 CMM_STORE_SHARED(consumer_quit, 1);
3292
3293 /*
3294 * Notify the data poll thread to poll back again and test the
3295 * consumer_quit state that we just set so to quit gracefully.
3296 */
3297 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
3298
3299 notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
3300
3301 notify_health_quit_pipe(health_quit_pipe);
3302
3303 /* Cleaning up possibly open sockets. */
3304 if (sock >= 0) {
3305 ret = close(sock);
3306 if (ret < 0) {
3307 PERROR("close sock sessiond poll");
3308 }
3309 }
3310 if (client_socket >= 0) {
3311 ret = close(client_socket);
3312 if (ret < 0) {
3313 PERROR("close client_socket sessiond poll");
3314 }
3315 }
3316
3317 error_testpoint:
3318 if (err) {
3319 health_error();
3320 ERR("Health error occurred in %s", __func__);
3321 }
3322 health_unregister(health_consumerd);
3323
3324 rcu_unregister_thread();
3325 return nullptr;
3326 }
3327
3328 static int post_consume(struct lttng_consumer_stream *stream,
3329 const struct stream_subbuffer *subbuffer,
3330 struct lttng_consumer_local_data *ctx)
3331 {
3332 size_t i;
3333 int ret = 0;
3334 const size_t count =
3335 lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
3336
3337 for (i = 0; i < count; i++) {
3338 const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
3339 &stream->read_subbuffer_ops.post_consume_cbs, i);
3340
3341 ret = op(stream, subbuffer, ctx);
3342 if (ret) {
3343 goto end;
3344 }
3345 }
3346 end:
3347 return ret;
3348 }
3349
3350 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
3351 struct lttng_consumer_local_data *ctx,
3352 bool locked_by_caller)
3353 {
3354 ssize_t ret, written_bytes = 0;
3355 int rotation_ret;
3356 struct stream_subbuffer subbuffer = {};
3357 enum get_next_subbuffer_status get_next_status;
3358
3359 if (!locked_by_caller) {
3360 stream->read_subbuffer_ops.lock(stream);
3361 } else {
3362 stream->read_subbuffer_ops.assert_locked(stream);
3363 }
3364
3365 if (stream->read_subbuffer_ops.on_wake_up) {
3366 ret = stream->read_subbuffer_ops.on_wake_up(stream);
3367 if (ret) {
3368 goto end;
3369 }
3370 }
3371
3372 /*
3373 * If the stream was flagged to be ready for rotation before we extract
3374 * the next packet, rotate it now.
3375 */
3376 if (stream->rotate_ready) {
3377 DBG("Rotate stream before consuming data");
3378 ret = lttng_consumer_rotate_stream(stream);
3379 if (ret < 0) {
3380 ERR("Stream rotation error before consuming data");
3381 goto end;
3382 }
3383 }
3384
3385 get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
3386 switch (get_next_status) {
3387 case GET_NEXT_SUBBUFFER_STATUS_OK:
3388 break;
3389 case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
3390 /* Not an error. */
3391 ret = 0;
3392 goto sleep_stream;
3393 case GET_NEXT_SUBBUFFER_STATUS_ERROR:
3394 ret = -1;
3395 goto end;
3396 default:
3397 abort();
3398 }
3399
3400 ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
3401 if (ret) {
3402 goto error_put_subbuf;
3403 }
3404
3405 written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
3406 if (written_bytes <= 0) {
3407 ERR("Error consuming subbuffer: (%zd)", written_bytes);
3408 ret = (int) written_bytes;
3409 goto error_put_subbuf;
3410 }
3411
3412 ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3413 if (ret) {
3414 goto end;
3415 }
3416
3417 ret = post_consume(stream, &subbuffer, ctx);
3418 if (ret) {
3419 goto end;
3420 }
3421
3422 /*
3423 * After extracting the packet, we check if the stream is now ready to
3424 * be rotated and perform the action immediately.
3425 *
3426 * Don't overwrite `ret` as callers expect the number of bytes
3427 * consumed to be returned on success.
3428 */
3429 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3430 if (rotation_ret == 1) {
3431 rotation_ret = lttng_consumer_rotate_stream(stream);
3432 if (rotation_ret < 0) {
3433 ret = rotation_ret;
3434 ERR("Stream rotation error after consuming data");
3435 goto end;
3436 }
3437
3438 } else if (rotation_ret < 0) {
3439 ret = rotation_ret;
3440 ERR("Failed to check if stream was ready to rotate after consuming data");
3441 goto end;
3442 }
3443
3444 sleep_stream:
3445 if (stream->read_subbuffer_ops.on_sleep) {
3446 stream->read_subbuffer_ops.on_sleep(stream, ctx);
3447 }
3448
3449 ret = written_bytes;
3450 end:
3451 if (!locked_by_caller) {
3452 stream->read_subbuffer_ops.unlock(stream);
3453 }
3454
3455 return ret;
3456 error_put_subbuf:
3457 (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3458 goto end;
3459 }
3460
3461 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3462 {
3463 switch (the_consumer_data.type) {
3464 case LTTNG_CONSUMER_KERNEL:
3465 return lttng_kconsumer_on_recv_stream(stream);
3466 case LTTNG_CONSUMER32_UST:
3467 case LTTNG_CONSUMER64_UST:
3468 return lttng_ustconsumer_on_recv_stream(stream);
3469 default:
3470 ERR("Unknown consumer_data type");
3471 abort();
3472 return -ENOSYS;
3473 }
3474 }
3475
3476 /*
3477 * Allocate and set consumer data hash tables.
3478 */
3479 int lttng_consumer_init()
3480 {
3481 the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3482 if (!the_consumer_data.channel_ht) {
3483 goto error;
3484 }
3485
3486 the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3487 if (!the_consumer_data.channels_by_session_id_ht) {
3488 goto error;
3489 }
3490
3491 the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3492 if (!the_consumer_data.relayd_ht) {
3493 goto error;
3494 }
3495
3496 the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3497 if (!the_consumer_data.stream_list_ht) {
3498 goto error;
3499 }
3500
3501 the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3502 if (!the_consumer_data.stream_per_chan_id_ht) {
3503 goto error;
3504 }
3505
3506 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3507 if (!data_ht) {
3508 goto error;
3509 }
3510
3511 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3512 if (!metadata_ht) {
3513 goto error;
3514 }
3515
3516 the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3517 if (!the_consumer_data.chunk_registry) {
3518 goto error;
3519 }
3520
3521 return 0;
3522
3523 error:
3524 return -1;
3525 }
3526
3527 /*
3528 * Process the ADD_RELAYD command receive by a consumer.
3529 *
3530 * This will create a relayd socket pair and add it to the relayd hash table.
3531 * The caller MUST acquire a RCU read side lock before calling it.
3532 */
3533 void consumer_add_relayd_socket(uint64_t net_seq_idx,
3534 int sock_type,
3535 struct lttng_consumer_local_data *ctx,
3536 int sock,
3537 struct pollfd *consumer_sockpoll,
3538 uint64_t sessiond_id,
3539 uint64_t relayd_session_id,
3540 uint32_t relayd_version_major,
3541 uint32_t relayd_version_minor,
3542 enum lttcomm_sock_proto relayd_socket_protocol)
3543 {
3544 int fd = -1, ret = -1, relayd_created = 0;
3545 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3546 struct consumer_relayd_sock_pair *relayd = nullptr;
3547
3548 LTTNG_ASSERT(ctx);
3549 LTTNG_ASSERT(sock >= 0);
3550 ASSERT_RCU_READ_LOCKED();
3551
3552 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
3553
3554 /* Get relayd reference if exists. */
3555 relayd = consumer_find_relayd(net_seq_idx);
3556 if (relayd == nullptr) {
3557 LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
3558 /* Not found. Allocate one. */
3559 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3560 if (relayd == nullptr) {
3561 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3562 goto error;
3563 } else {
3564 relayd->sessiond_session_id = sessiond_id;
3565 relayd_created = 1;
3566 }
3567
3568 /*
3569 * This code path MUST continue to the consumer send status message to
3570 * we can notify the session daemon and continue our work without
3571 * killing everything.
3572 */
3573 } else {
3574 /*
3575 * relayd key should never be found for control socket.
3576 */
3577 LTTNG_ASSERT(sock_type != LTTNG_STREAM_CONTROL);
3578 }
3579
3580 /* First send a status message before receiving the fds. */
3581 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
3582 if (ret < 0) {
3583 /* Somehow, the session daemon is not responding anymore. */
3584 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3585 goto error_nosignal;
3586 }
3587
3588 /* Poll on consumer socket. */
3589 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3590 if (ret) {
3591 /* Needing to exit in the middle of a command: error. */
3592 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3593 goto error_nosignal;
3594 }
3595
3596 /* Get relayd socket from session daemon */
3597 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3598 if (ret != sizeof(fd)) {
3599 fd = -1; /* Just in case it gets set with an invalid value. */
3600
3601 /*
3602 * Failing to receive FDs might indicate a major problem such as
3603 * reaching a fd limit during the receive where the kernel returns a
3604 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3605 * don't take any chances and stop everything.
3606 *
3607 * XXX: Feature request #558 will fix that and avoid this possible
3608 * issue when reaching the fd limit.
3609 */
3610 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
3611 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
3612 goto error;
3613 }
3614
3615 /* Copy socket information and received FD */
3616 switch (sock_type) {
3617 case LTTNG_STREAM_CONTROL:
3618 /* Copy received lttcomm socket */
3619 ret = lttcomm_populate_sock_from_open_socket(
3620 &relayd->control_sock.sock, fd, relayd_socket_protocol);
3621
3622 /* Assign version values. */
3623 relayd->control_sock.major = relayd_version_major;
3624 relayd->control_sock.minor = relayd_version_minor;
3625
3626 relayd->relayd_session_id = relayd_session_id;
3627
3628 break;
3629 case LTTNG_STREAM_DATA:
3630 /* Copy received lttcomm socket */
3631 ret = lttcomm_populate_sock_from_open_socket(
3632 &relayd->data_sock.sock, fd, relayd_socket_protocol);
3633 /* Assign version values. */
3634 relayd->data_sock.major = relayd_version_major;
3635 relayd->data_sock.minor = relayd_version_minor;
3636 break;
3637 default:
3638 ERR("Unknown relayd socket type (%d)", sock_type);
3639 ret_code = LTTCOMM_CONSUMERD_FATAL;
3640 goto error;
3641 }
3642
3643 if (ret < 0) {
3644 ret_code = LTTCOMM_CONSUMERD_FATAL;
3645 goto error;
3646 }
3647
3648 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
3649 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3650 relayd->net_seq_idx,
3651 fd);
3652 /*
3653 * We gave the ownership of the fd to the relayd structure. Set the
3654 * fd to -1 so we don't call close() on it in the error path below.
3655 */
3656 fd = -1;
3657
3658 /* We successfully added the socket. Send status back. */
3659 ret = consumer_send_status_msg(sock, ret_code);
3660 if (ret < 0) {
3661 /* Somehow, the session daemon is not responding anymore. */
3662 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3663 goto error_nosignal;
3664 }
3665
3666 /*
3667 * Add relayd socket pair to consumer data hashtable. If object already
3668 * exists or on error, the function gracefully returns.
3669 */
3670 relayd->ctx = ctx;
3671 add_relayd(relayd);
3672
3673 /* All good! */
3674 return;
3675
3676 error:
3677 if (consumer_send_status_msg(sock, ret_code) < 0) {
3678 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3679 }
3680
3681 error_nosignal:
3682 /* Close received socket if valid. */
3683 if (fd >= 0) {
3684 if (close(fd)) {
3685 PERROR("close received socket");
3686 }
3687 }
3688
3689 if (relayd_created) {
3690 free(relayd);
3691 }
3692 }
3693
3694 /*
3695 * Search for a relayd associated to the session id and return the reference.
3696 *
3697 * A rcu read side lock MUST be acquire before calling this function and locked
3698 * until the relayd object is no longer necessary.
3699 */
3700 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3701 {
3702 struct lttng_ht_iter iter;
3703 struct consumer_relayd_sock_pair *relayd = nullptr;
3704
3705 ASSERT_RCU_READ_LOCKED();
3706
3707 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3708 cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
3709 /*
3710 * Check by sessiond id which is unique here where the relayd session
3711 * id might not be when having multiple relayd.
3712 */
3713 if (relayd->sessiond_session_id == id) {
3714 /* Found the relayd. There can be only one per id. */
3715 goto found;
3716 }
3717 }
3718
3719 return nullptr;
3720
3721 found:
3722 return relayd;
3723 }
3724
3725 /*
3726 * Check if for a given session id there is still data needed to be extract
3727 * from the buffers.
3728 *
3729 * Return 1 if data is pending or else 0 meaning ready to be read.
3730 */
3731 int consumer_data_pending(uint64_t id)
3732 {
3733 int ret;
3734 struct lttng_ht_iter iter;
3735 struct lttng_ht *ht;
3736 struct lttng_consumer_stream *stream;
3737 struct consumer_relayd_sock_pair *relayd = nullptr;
3738 int (*data_pending)(struct lttng_consumer_stream *);
3739
3740 DBG("Consumer data pending command on session id %" PRIu64, id);
3741
3742 lttng::urcu::read_lock_guard read_lock;
3743 pthread_mutex_lock(&the_consumer_data.lock);
3744
3745 switch (the_consumer_data.type) {
3746 case LTTNG_CONSUMER_KERNEL:
3747 data_pending = lttng_kconsumer_data_pending;
3748 break;
3749 case LTTNG_CONSUMER32_UST:
3750 case LTTNG_CONSUMER64_UST:
3751 data_pending = lttng_ustconsumer_data_pending;
3752 break;
3753 default:
3754 ERR("Unknown consumer data type");
3755 abort();
3756 }
3757
3758 /* Ease our life a bit */
3759 ht = the_consumer_data.stream_list_ht;
3760
3761 cds_lfht_for_each_entry_duplicate(ht->ht,
3762 ht->hash_fct(&id, lttng_ht_seed),
3763 ht->match_fct,
3764 &id,
3765 &iter.iter,
3766 stream,
3767 node_session_id.node)
3768 {
3769 pthread_mutex_lock(&stream->lock);
3770
3771 /*
3772 * A removed node from the hash table indicates that the stream has
3773 * been deleted thus having a guarantee that the buffers are closed
3774 * on the consumer side. However, data can still be transmitted
3775 * over the network so don't skip the relayd check.
3776 */
3777 ret = cds_lfht_is_node_deleted(&stream->node.node);
3778 if (!ret) {
3779 /* Check the stream if there is data in the buffers. */
3780 ret = data_pending(stream);
3781 if (ret == 1) {
3782 pthread_mutex_unlock(&stream->lock);
3783 goto data_pending;
3784 }
3785 }
3786
3787 pthread_mutex_unlock(&stream->lock);
3788 }
3789
3790 relayd = find_relayd_by_session_id(id);
3791 if (relayd) {
3792 unsigned int is_data_inflight = 0;
3793
3794 /* Send init command for data pending. */
3795 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3796 ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
3797 if (ret < 0) {
3798 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3799 /* Communication error thus the relayd so no data pending. */
3800 goto data_not_pending;
3801 }
3802
3803 cds_lfht_for_each_entry_duplicate(ht->ht,
3804 ht->hash_fct(&id, lttng_ht_seed),
3805 ht->match_fct,
3806 &id,
3807 &iter.iter,
3808 stream,
3809 node_session_id.node)
3810 {
3811 if (stream->metadata_flag) {
3812 ret = relayd_quiescent_control(&relayd->control_sock,
3813 stream->relayd_stream_id);
3814 } else {
3815 ret = relayd_data_pending(&relayd->control_sock,
3816 stream->relayd_stream_id,
3817 stream->next_net_seq_num - 1);
3818 }
3819
3820 if (ret == 1) {
3821 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3822 goto data_pending;
3823 } else if (ret < 0) {
3824 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
3825 relayd->net_seq_idx);
3826 lttng_consumer_cleanup_relayd(relayd);
3827 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3828 goto data_not_pending;
3829 }
3830 }
3831
3832 /* Send end command for data pending. */
3833 ret = relayd_end_data_pending(
3834 &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
3835 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3836 if (ret < 0) {
3837 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
3838 relayd->net_seq_idx);
3839 lttng_consumer_cleanup_relayd(relayd);
3840 goto data_not_pending;
3841 }
3842 if (is_data_inflight) {
3843 goto data_pending;
3844 }
3845 }
3846
3847 /*
3848 * Finding _no_ node in the hash table and no inflight data means that the
3849 * stream(s) have been removed thus data is guaranteed to be available for
3850 * analysis from the trace files.
3851 */
3852
3853 data_not_pending:
3854 /* Data is available to be read by a viewer. */
3855 pthread_mutex_unlock(&the_consumer_data.lock);
3856 return 0;
3857
3858 data_pending:
3859 /* Data is still being extracted from buffers. */
3860 pthread_mutex_unlock(&the_consumer_data.lock);
3861 return 1;
3862 }
3863
3864 /*
3865 * Send a ret code status message to the sessiond daemon.
3866 *
3867 * Return the sendmsg() return value.
3868 */
3869 int consumer_send_status_msg(int sock, int ret_code)
3870 {
3871 struct lttcomm_consumer_status_msg msg;
3872
3873 memset(&msg, 0, sizeof(msg));
3874 msg.ret_code = (lttcomm_return_code) ret_code;
3875
3876 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3877 }
3878
3879 /*
3880 * Send a channel status message to the sessiond daemon.
3881 *
3882 * Return the sendmsg() return value.
3883 */
3884 int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
3885 {
3886 struct lttcomm_consumer_status_channel msg;
3887
3888 LTTNG_ASSERT(sock >= 0);
3889
3890 memset(&msg, 0, sizeof(msg));
3891 if (!channel) {
3892 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
3893 } else {
3894 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3895 msg.key = channel->key;
3896 msg.stream_count = channel->streams.count;
3897 }
3898
3899 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3900 }
3901
3902 unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
3903 unsigned long produced_pos,
3904 uint64_t nb_packets_per_stream,
3905 uint64_t max_sb_size)
3906 {
3907 unsigned long start_pos;
3908
3909 if (!nb_packets_per_stream) {
3910 return consumed_pos; /* Grab everything */
3911 }
3912 start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
3913 start_pos -= max_sb_size * nb_packets_per_stream;
3914 if ((long) (start_pos - consumed_pos) < 0) {
3915 return consumed_pos; /* Grab everything */
3916 }
3917 return start_pos;
3918 }
3919
3920 /* Stream lock must be held by the caller. */
3921 static int sample_stream_positions(struct lttng_consumer_stream *stream,
3922 unsigned long *produced,
3923 unsigned long *consumed)
3924 {
3925 int ret;
3926
3927 ASSERT_LOCKED(stream->lock);
3928
3929 ret = lttng_consumer_sample_snapshot_positions(stream);
3930 if (ret < 0) {
3931 ERR("Failed to sample snapshot positions");
3932 goto end;
3933 }
3934
3935 ret = lttng_consumer_get_produced_snapshot(stream, produced);
3936 if (ret < 0) {
3937 ERR("Failed to sample produced position");
3938 goto end;
3939 }
3940
3941 ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
3942 if (ret < 0) {
3943 ERR("Failed to sample consumed position");
3944 goto end;
3945 }
3946
3947 end:
3948 return ret;
3949 }
3950
3951 /*
3952 * Sample the rotate position for all the streams of a channel. If a stream
3953 * is already at the rotate position (produced == consumed), we flag it as
3954 * ready for rotation. The rotation of ready streams occurs after we have
3955 * replied to the session daemon that we have finished sampling the positions.
3956 * Must be called with RCU read-side lock held to ensure existence of channel.
3957 *
3958 * Returns 0 on success, < 0 on error
3959 */
3960 int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
3961 uint64_t key,
3962 uint64_t relayd_id)
3963 {
3964 int ret;
3965 struct lttng_consumer_stream *stream;
3966 struct lttng_ht_iter iter;
3967 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
3968 struct lttng_dynamic_array stream_rotation_positions;
3969 uint64_t next_chunk_id, stream_count = 0;
3970 enum lttng_trace_chunk_status chunk_status;
3971 const bool is_local_trace = relayd_id == -1ULL;
3972 struct consumer_relayd_sock_pair *relayd = nullptr;
3973 bool rotating_to_new_chunk = true;
3974 /* Array of `struct lttng_consumer_stream *` */
3975 struct lttng_dynamic_pointer_array streams_packet_to_open;
3976 size_t stream_idx;
3977
3978 ASSERT_RCU_READ_LOCKED();
3979
3980 DBG("Consumer sample rotate position for channel %" PRIu64, key);
3981
3982 lttng_dynamic_array_init(&stream_rotation_positions,
3983 sizeof(struct relayd_stream_rotation_position),
3984 nullptr);
3985 lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
3986
3987 lttng::urcu::read_lock_guard read_lock;
3988
3989 pthread_mutex_lock(&channel->lock);
3990 LTTNG_ASSERT(channel->trace_chunk);
3991 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
3992 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
3993 ret = -1;
3994 goto end_unlock_channel;
3995 }
3996
3997 cds_lfht_for_each_entry_duplicate(ht->ht,
3998 ht->hash_fct(&channel->key, lttng_ht_seed),
3999 ht->match_fct,
4000 &channel->key,
4001 &iter.iter,
4002 stream,
4003 node_channel_id.node)
4004 {
4005 unsigned long produced_pos = 0, consumed_pos = 0;
4006
4007 health_code_update();
4008
4009 /*
4010 * Lock stream because we are about to change its state.
4011 */
4012 pthread_mutex_lock(&stream->lock);
4013
4014 if (stream->trace_chunk == stream->chan->trace_chunk) {
4015 rotating_to_new_chunk = false;
4016 }
4017
4018 /*
4019 * Do not flush a packet when rotating from a NULL trace
4020 * chunk. The stream has no means to output data, and the prior
4021 * rotation which rotated to NULL performed that side-effect
4022 * already. No new data can be produced when a stream has no
4023 * associated trace chunk (e.g. a stop followed by a rotate).
4024 */
4025 if (stream->trace_chunk) {
4026 bool flush_active;
4027
4028 if (stream->metadata_flag) {
4029 /*
4030 * Don't produce an empty metadata packet,
4031 * simply close the current one.
4032 *
4033 * Metadata is regenerated on every trace chunk
4034 * switch; there is no concern that no data was
4035 * produced.
4036 */
4037 flush_active = true;
4038 } else {
4039 /*
4040 * Only flush an empty packet if the "packet
4041 * open" could not be performed on transition
4042 * to a new trace chunk and no packets were
4043 * consumed within the chunk's lifetime.
4044 */
4045 if (stream->opened_packet_in_current_trace_chunk) {
4046 flush_active = true;
4047 } else {
4048 /*
4049 * Stream could have been full at the
4050 * time of rotation, but then have had
4051 * no activity at all.
4052 *
4053 * It is important to flush a packet
4054 * to prevent 0-length files from being
4055 * produced as most viewers choke on
4056 * them.
4057 *
4058 * Unfortunately viewers will not be
4059 * able to know that tracing was active
4060 * for this stream during this trace
4061 * chunk's lifetime.
4062 */
4063 ret = sample_stream_positions(
4064 stream, &produced_pos, &consumed_pos);
4065 if (ret) {
4066 goto end_unlock_stream;
4067 }
4068
4069 /*
4070 * Don't flush an empty packet if data
4071 * was produced; it will be consumed
4072 * before the rotation completes.
4073 */
4074 flush_active = produced_pos != consumed_pos;
4075 if (!flush_active) {
4076 const char *trace_chunk_name;
4077 uint64_t trace_chunk_id;
4078
4079 chunk_status = lttng_trace_chunk_get_name(
4080 stream->trace_chunk,
4081 &trace_chunk_name,
4082 nullptr);
4083 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
4084 trace_chunk_name = "none";
4085 }
4086
4087 /*
4088 * Consumer trace chunks are
4089 * never anonymous.
4090 */
4091 chunk_status = lttng_trace_chunk_get_id(
4092 stream->trace_chunk, &trace_chunk_id);
4093 LTTNG_ASSERT(chunk_status ==
4094 LTTNG_TRACE_CHUNK_STATUS_OK);
4095
4096 DBG("Unable to open packet for stream during trace chunk's lifetime. "
4097 "Flushing an empty packet to prevent an empty file from being created: "
4098 "stream id = %" PRIu64
4099 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
4100 stream->key,
4101 trace_chunk_name,
4102 trace_chunk_id);
4103 }
4104 }
4105 }
4106
4107 /*
4108 * Close the current packet before sampling the
4109 * ring buffer positions.
4110 */
4111 ret = consumer_stream_flush_buffer(stream, flush_active);
4112 if (ret < 0) {
4113 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
4114 stream->key);
4115 goto end_unlock_stream;
4116 }
4117 }
4118
4119 ret = lttng_consumer_take_snapshot(stream);
4120 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4121 ERR("Failed to sample snapshot position during channel rotation");
4122 goto end_unlock_stream;
4123 }
4124 if (!ret) {
4125 ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
4126 if (ret < 0) {
4127 ERR("Failed to sample produced position during channel rotation");
4128 goto end_unlock_stream;
4129 }
4130
4131 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
4132 if (ret < 0) {
4133 ERR("Failed to sample consumed position during channel rotation");
4134 goto end_unlock_stream;
4135 }
4136 }
4137 /*
4138 * Align produced position on the start-of-packet boundary of the first
4139 * packet going into the next trace chunk.
4140 */
4141 produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
4142 if (consumed_pos == produced_pos) {
4143 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
4144 stream->key,
4145 produced_pos,
4146 consumed_pos);
4147 stream->rotate_ready = true;
4148 } else {
4149 DBG("Different consumed and produced positions "
4150 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4151 stream->key,
4152 produced_pos,
4153 consumed_pos);
4154 }
4155 /*
4156 * The rotation position is based on the packet_seq_num of the
4157 * packet following the last packet that was consumed for this
4158 * stream, incremented by the offset between produced and
4159 * consumed positions. This rotation position is a lower bound
4160 * (inclusive) at which the next trace chunk starts. Since it
4161 * is a lower bound, it is OK if the packet_seq_num does not
4162 * correspond exactly to the same packet identified by the
4163 * consumed_pos, which can happen in overwrite mode.
4164 */
4165 if (stream->sequence_number_unavailable) {
4166 /*
4167 * Rotation should never be performed on a session which
4168 * interacts with a pre-2.8 lttng-modules, which does
4169 * not implement packet sequence number.
4170 */
4171 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
4172 stream->key);
4173 ret = -1;
4174 goto end_unlock_stream;
4175 }
4176 stream->rotate_position = stream->last_sequence_number + 1 +
4177 ((produced_pos - consumed_pos) / stream->max_sb_size);
4178 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
4179 stream->key,
4180 stream->rotate_position);
4181
4182 if (!is_local_trace) {
4183 /*
4184 * The relay daemon control protocol expects a rotation
4185 * position as "the sequence number of the first packet
4186 * _after_ the current trace chunk".
4187 */
4188 const struct relayd_stream_rotation_position position = {
4189 .stream_id = stream->relayd_stream_id,
4190 .rotate_at_seq_num = stream->rotate_position,
4191 };
4192
4193 ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
4194 &position);
4195 if (ret) {
4196 ERR("Failed to allocate stream rotation position");
4197 goto end_unlock_stream;
4198 }
4199 stream_count++;
4200 }
4201
4202 stream->opened_packet_in_current_trace_chunk = false;
4203
4204 if (rotating_to_new_chunk && !stream->metadata_flag) {
4205 /*
4206 * Attempt to flush an empty packet as close to the
4207 * rotation point as possible. In the event where a
4208 * stream remains inactive after the rotation point,
4209 * this ensures that the new trace chunk has a
4210 * beginning timestamp set at the begining of the
4211 * trace chunk instead of only creating an empty
4212 * packet when the trace chunk is stopped.
4213 *
4214 * This indicates to the viewers that the stream
4215 * was being recorded, but more importantly it
4216 * allows viewers to determine a useable trace
4217 * intersection.
4218 *
4219 * This presents a problem in the case where the
4220 * ring-buffer is completely full.
4221 *
4222 * Consider the following scenario:
4223 * - The consumption of data is slow (slow network,
4224 * for instance),
4225 * - The ring buffer is full,
4226 * - A rotation is initiated,
4227 * - The flush below does nothing (no space left to
4228 * open a new packet),
4229 * - The other streams rotate very soon, and new
4230 * data is produced in the new chunk,
4231 * - This stream completes its rotation long after the
4232 * rotation was initiated
4233 * - The session is stopped before any event can be
4234 * produced in this stream's buffers.
4235 *
4236 * The resulting trace chunk will have a single packet
4237 * temporaly at the end of the trace chunk for this
4238 * stream making the stream intersection more narrow
4239 * than it should be.
4240 *
4241 * To work-around this, an empty flush is performed
4242 * after the first consumption of a packet during a
4243 * rotation if open_packet fails. The idea is that
4244 * consuming a packet frees enough space to switch
4245 * packets in this scenario and allows the tracer to
4246 * "stamp" the beginning of the new trace chunk at the
4247 * earliest possible point.
4248 *
4249 * The packet open is performed after the channel
4250 * rotation to ensure that no attempt to open a packet
4251 * is performed in a stream that has no active trace
4252 * chunk.
4253 */
4254 ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
4255 stream);
4256 if (ret) {
4257 PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
4258 ret = -1;
4259 goto end_unlock_stream;
4260 }
4261 }
4262
4263 pthread_mutex_unlock(&stream->lock);
4264 }
4265 stream = nullptr;
4266
4267 if (!is_local_trace) {
4268 relayd = consumer_find_relayd(relayd_id);
4269 if (!relayd) {
4270 ERR("Failed to find relayd %" PRIu64, relayd_id);
4271 ret = -1;
4272 goto end_unlock_channel;
4273 }
4274
4275 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4276 ret = relayd_rotate_streams(&relayd->control_sock,
4277 stream_count,
4278 rotating_to_new_chunk ? &next_chunk_id : nullptr,
4279 (const struct relayd_stream_rotation_position *)
4280 stream_rotation_positions.buffer.data);
4281 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4282 if (ret < 0) {
4283 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
4284 relayd->net_seq_idx);
4285 lttng_consumer_cleanup_relayd(relayd);
4286 goto end_unlock_channel;
4287 }
4288 }
4289
4290 for (stream_idx = 0;
4291 stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
4292 stream_idx++) {
4293 enum consumer_stream_open_packet_status status;
4294
4295 stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
4296 &streams_packet_to_open, stream_idx);
4297
4298 pthread_mutex_lock(&stream->lock);
4299 status = consumer_stream_open_packet(stream);
4300 pthread_mutex_unlock(&stream->lock);
4301 switch (status) {
4302 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
4303 DBG("Opened a packet after a rotation: stream id = %" PRIu64
4304 ", channel name = %s, session id = %" PRIu64,
4305 stream->key,
4306 stream->chan->name,
4307 stream->chan->session_id);
4308 break;
4309 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
4310 /*
4311 * Can't open a packet as there is no space left
4312 * in the buffer. A new packet will be opened
4313 * once one has been consumed.
4314 */
4315 DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
4316 ", channel name = %s, session id = %" PRIu64,
4317 stream->key,
4318 stream->chan->name,
4319 stream->chan->session_id);
4320 break;
4321 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
4322 /* Logged by callee. */
4323 ret = -1;
4324 goto end_unlock_channel;
4325 default:
4326 abort();
4327 }
4328 }
4329
4330 pthread_mutex_unlock(&channel->lock);
4331 ret = 0;
4332 goto end;
4333
4334 end_unlock_stream:
4335 pthread_mutex_unlock(&stream->lock);
4336 end_unlock_channel:
4337 pthread_mutex_unlock(&channel->lock);
4338 end:
4339 lttng_dynamic_array_reset(&stream_rotation_positions);
4340 lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
4341 return ret;
4342 }
4343
4344 static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
4345 {
4346 int ret = 0;
4347 unsigned long consumed_pos_before, consumed_pos_after;
4348
4349 ret = lttng_consumer_sample_snapshot_positions(stream);
4350 if (ret < 0) {
4351 ERR("Taking snapshot positions");
4352 goto end;
4353 }
4354
4355 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4356 if (ret < 0) {
4357 ERR("Consumed snapshot position");
4358 goto end;
4359 }
4360
4361 switch (the_consumer_data.type) {
4362 case LTTNG_CONSUMER_KERNEL:
4363 ret = kernctl_buffer_clear(stream->wait_fd);
4364 if (ret < 0) {
4365 ERR("Failed to clear kernel stream (ret = %d)", ret);
4366 goto end;
4367 }
4368 break;
4369 case LTTNG_CONSUMER32_UST:
4370 case LTTNG_CONSUMER64_UST:
4371 ret = lttng_ustconsumer_clear_buffer(stream);
4372 if (ret < 0) {
4373 ERR("Failed to clear ust stream (ret = %d)", ret);
4374 goto end;
4375 }
4376 break;
4377 default:
4378 ERR("Unknown consumer_data type");
4379 abort();
4380 }
4381
4382 ret = lttng_consumer_sample_snapshot_positions(stream);
4383 if (ret < 0) {
4384 ERR("Taking snapshot positions");
4385 goto end;
4386 }
4387 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4388 if (ret < 0) {
4389 ERR("Consumed snapshot position");
4390 goto end;
4391 }
4392 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4393 end:
4394 return ret;
4395 }
4396
4397 static int consumer_clear_stream(struct lttng_consumer_stream *stream)
4398 {
4399 int ret;
4400
4401 ret = consumer_stream_flush_buffer(stream, true);
4402 if (ret < 0) {
4403 ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
4404 ret = LTTCOMM_CONSUMERD_FATAL;
4405 goto error;
4406 }
4407
4408 ret = consumer_clear_buffer(stream);
4409 if (ret < 0) {
4410 ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
4411 ret = LTTCOMM_CONSUMERD_FATAL;
4412 goto error;
4413 }
4414
4415 ret = LTTCOMM_CONSUMERD_SUCCESS;
4416 error:
4417 return ret;
4418 }
4419
4420 static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
4421 {
4422 int ret;
4423 struct lttng_consumer_stream *stream;
4424
4425 lttng::urcu::read_lock_guard read_lock;
4426 pthread_mutex_lock(&channel->lock);
4427 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
4428 health_code_update();
4429 pthread_mutex_lock(&stream->lock);
4430 ret = consumer_clear_stream(stream);
4431 if (ret) {
4432 goto error_unlock;
4433 }
4434 pthread_mutex_unlock(&stream->lock);
4435 }
4436 pthread_mutex_unlock(&channel->lock);
4437 return 0;
4438
4439 error_unlock:
4440 pthread_mutex_unlock(&stream->lock);
4441 pthread_mutex_unlock(&channel->lock);
4442 return ret;
4443 }
4444
4445 /*
4446 * Check if a stream is ready to be rotated after extracting it.
4447 *
4448 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4449 * error. Stream lock must be held.
4450 */
4451 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4452 {
4453 DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
4454 " last_sequence_number %" PRIu64,
4455 stream->key,
4456 stream->rotate_ready,
4457 stream->rotate_position,
4458 stream->last_sequence_number);
4459 if (stream->rotate_ready) {
4460 return 1;
4461 }
4462
4463 /*
4464 * If packet seq num is unavailable, it means we are interacting
4465 * with a pre-2.8 lttng-modules which does not implement the
4466 * sequence number. Rotation should never be used by sessiond in this
4467 * scenario.
4468 */
4469 if (stream->sequence_number_unavailable) {
4470 ERR("Internal error: rotation used on stream %" PRIu64
4471 " with unavailable sequence number",
4472 stream->key);
4473 return -1;
4474 }
4475
4476 if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
4477 return 0;
4478 }
4479
4480 /*
4481 * Rotate position not reached yet. The stream rotate position is
4482 * the position of the next packet belonging to the next trace chunk,
4483 * but consumerd considers rotation ready when reaching the last
4484 * packet of the current chunk, hence the "rotate_position - 1".
4485 */
4486
4487 DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
4488 " rotate_position %" PRIu64,
4489 stream->key,
4490 stream->last_sequence_number,
4491 stream->rotate_position);
4492 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4493 return 1;
4494 }
4495
4496 return 0;
4497 }
4498
4499 /*
4500 * Reset the state for a stream after a rotation occurred.
4501 */
4502 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4503 {
4504 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
4505 stream->rotate_position = -1ULL;
4506 stream->rotate_ready = false;
4507 }
4508
4509 /*
4510 * Perform the rotation a local stream file.
4511 */
4512 static int rotate_local_stream(struct lttng_consumer_stream *stream)
4513 {
4514 int ret = 0;
4515
4516 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
4517 stream->key,
4518 stream->chan->key);
4519 stream->tracefile_size_current = 0;
4520 stream->tracefile_count_current = 0;
4521
4522 if (stream->out_fd >= 0) {
4523 ret = close(stream->out_fd);
4524 if (ret) {
4525 PERROR("Failed to close stream out_fd of channel \"%s\"",
4526 stream->chan->name);
4527 }
4528 stream->out_fd = -1;
4529 }
4530
4531 if (stream->index_file) {
4532 lttng_index_file_put(stream->index_file);
4533 stream->index_file = nullptr;
4534 }
4535
4536 if (!stream->trace_chunk) {
4537 goto end;
4538 }
4539
4540 ret = consumer_stream_create_output_files(stream, true);
4541 end:
4542 return ret;
4543 }
4544
4545 /*
4546 * Performs the stream rotation for the rotate session feature if needed.
4547 * It must be called with the channel and stream locks held.
4548 *
4549 * Return 0 on success, a negative number of error.
4550 */
4551 int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
4552 {
4553 int ret;
4554
4555 DBG("Consumer rotate stream %" PRIu64, stream->key);
4556
4557 /*
4558 * Update the stream's 'current' chunk to the session's (channel)
4559 * now-current chunk.
4560 */
4561 lttng_trace_chunk_put(stream->trace_chunk);
4562 if (stream->chan->trace_chunk == stream->trace_chunk) {
4563 /*
4564 * A channel can be rotated and not have a "next" chunk
4565 * to transition to. In that case, the channel's "current chunk"
4566 * has not been closed yet, but it has not been updated to
4567 * a "next" trace chunk either. Hence, the stream, like its
4568 * parent channel, becomes part of no chunk and can't output
4569 * anything until a new trace chunk is created.
4570 */
4571 stream->trace_chunk = nullptr;
4572 } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
4573 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4574 ret = -1;
4575 goto error;
4576 } else {
4577 /*
4578 * Update the stream's trace chunk to its parent channel's
4579 * current trace chunk.
4580 */
4581 stream->trace_chunk = stream->chan->trace_chunk;
4582 }
4583
4584 if (stream->net_seq_idx == (uint64_t) -1ULL) {
4585 ret = rotate_local_stream(stream);
4586 if (ret < 0) {
4587 ERR("Failed to rotate stream, ret = %i", ret);
4588 goto error;
4589 }
4590 }
4591
4592 if (stream->metadata_flag && stream->trace_chunk) {
4593 /*
4594 * If the stream has transitioned to a new trace
4595 * chunk, the metadata should be re-dumped to the
4596 * newest chunk.
4597 *
4598 * However, it is possible for a stream to transition to
4599 * a "no-chunk" state. This can happen if a rotation
4600 * occurs on an inactive session. In such cases, the metadata
4601 * regeneration will happen when the next trace chunk is
4602 * created.
4603 */
4604 ret = consumer_metadata_stream_dump(stream);
4605 if (ret) {
4606 goto error;
4607 }
4608 }
4609 lttng_consumer_reset_stream_rotate_state(stream);
4610
4611 ret = 0;
4612
4613 error:
4614 return ret;
4615 }
4616
4617 /*
4618 * Rotate all the ready streams now.
4619 *
4620 * This is especially important for low throughput streams that have already
4621 * been consumed, we cannot wait for their next packet to perform the
4622 * rotation.
4623 * Need to be called with RCU read-side lock held to ensure existence of
4624 * channel.
4625 *
4626 * Returns 0 on success, < 0 on error
4627 */
4628 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
4629 {
4630 int ret;
4631 struct lttng_consumer_stream *stream;
4632 struct lttng_ht_iter iter;
4633 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
4634
4635 ASSERT_RCU_READ_LOCKED();
4636
4637 lttng::urcu::read_lock_guard read_lock;
4638
4639 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4640
4641 cds_lfht_for_each_entry_duplicate(ht->ht,
4642 ht->hash_fct(&channel->key, lttng_ht_seed),
4643 ht->match_fct,
4644 &channel->key,
4645 &iter.iter,
4646 stream,
4647 node_channel_id.node)
4648 {
4649 health_code_update();
4650
4651 pthread_mutex_lock(&stream->chan->lock);
4652 pthread_mutex_lock(&stream->lock);
4653
4654 if (!stream->rotate_ready) {
4655 pthread_mutex_unlock(&stream->lock);
4656 pthread_mutex_unlock(&stream->chan->lock);
4657 continue;
4658 }
4659 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4660
4661 ret = lttng_consumer_rotate_stream(stream);
4662 pthread_mutex_unlock(&stream->lock);
4663 pthread_mutex_unlock(&stream->chan->lock);
4664 if (ret) {
4665 goto end;
4666 }
4667 }
4668
4669 ret = 0;
4670
4671 end:
4672 return ret;
4673 }
4674
4675 enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
4676 const lttng_uuid& sessiond_uuid)
4677 {
4678 enum lttcomm_return_code ret;
4679 char uuid_str[LTTNG_UUID_STR_LEN];
4680
4681 if (ctx->sessiond_uuid.is_set) {
4682 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
4683 goto end;
4684 }
4685
4686 ctx->sessiond_uuid.is_set = true;
4687 ctx->sessiond_uuid.value = sessiond_uuid;
4688 ret = LTTCOMM_CONSUMERD_SUCCESS;
4689 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4690 DBG("Received session daemon UUID: %s", uuid_str);
4691 end:
4692 return ret;
4693 }
4694
4695 enum lttcomm_return_code
4696 lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
4697 uint64_t session_id,
4698 uint64_t chunk_id,
4699 time_t chunk_creation_timestamp,
4700 const char *chunk_override_name,
4701 const struct lttng_credentials *credentials,
4702 struct lttng_directory_handle *chunk_directory_handle)
4703 {
4704 int ret;
4705 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4706 struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
4707 enum lttng_trace_chunk_status chunk_status;
4708 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4709 char creation_timestamp_buffer[ISO8601_STR_LEN];
4710 const char *relayd_id_str = "(none)";
4711 const char *creation_timestamp_str;
4712 struct lttng_ht_iter iter;
4713 struct lttng_consumer_channel *channel;
4714
4715 if (relayd_id) {
4716 /* Only used for logging purposes. */
4717 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
4718 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4719 relayd_id_str = relayd_id_buffer;
4720 } else {
4721 relayd_id_str = "(formatting error)";
4722 }
4723 }
4724
4725 /* Local protocol error. */
4726 LTTNG_ASSERT(chunk_creation_timestamp);
4727 ret = time_to_iso8601_str(chunk_creation_timestamp,
4728 creation_timestamp_buffer,
4729 sizeof(creation_timestamp_buffer));
4730 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
4731
4732 DBG("Consumer create trace chunk command: relay_id = %s"
4733 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
4734 ", chunk_creation_timestamp = %s",
4735 relayd_id_str,
4736 session_id,
4737 chunk_id,
4738 chunk_override_name ?: "(none)",
4739 creation_timestamp_str);
4740
4741 /*
4742 * The trace chunk registry, as used by the consumer daemon, implicitly
4743 * owns the trace chunks. This is only needed in the consumer since
4744 * the consumer has no notion of a session beyond session IDs being
4745 * used to identify other objects.
4746 *
4747 * The lttng_trace_chunk_registry_publish() call below provides a
4748 * reference which is not released; it implicitly becomes the session
4749 * daemon's reference to the chunk in the consumer daemon.
4750 *
4751 * The lifetime of trace chunks in the consumer daemon is managed by
4752 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4753 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
4754 */
4755 created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
4756 if (!created_chunk) {
4757 ERR("Failed to create trace chunk");
4758 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4759 goto error;
4760 }
4761
4762 if (chunk_override_name) {
4763 chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
4764 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4765 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4766 goto error;
4767 }
4768 }
4769
4770 if (chunk_directory_handle) {
4771 chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
4772 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4773 ERR("Failed to set trace chunk credentials");
4774 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4775 goto error;
4776 }
4777 /*
4778 * The consumer daemon has no ownership of the chunk output
4779 * directory.
4780 */
4781 chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
4782 chunk_directory_handle = nullptr;
4783 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4784 ERR("Failed to set trace chunk's directory handle");
4785 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4786 goto error;
4787 }
4788 }
4789
4790 published_chunk = lttng_trace_chunk_registry_publish_chunk(
4791 the_consumer_data.chunk_registry, session_id, created_chunk);
4792 lttng_trace_chunk_put(created_chunk);
4793 created_chunk = nullptr;
4794 if (!published_chunk) {
4795 ERR("Failed to publish trace chunk");
4796 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4797 goto error;
4798 }
4799
4800 {
4801 lttng::urcu::read_lock_guard read_lock;
4802 cds_lfht_for_each_entry_duplicate(
4803 the_consumer_data.channels_by_session_id_ht->ht,
4804 the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
4805 lttng_ht_seed),
4806 the_consumer_data.channels_by_session_id_ht->match_fct,
4807 &session_id,
4808 &iter.iter,
4809 channel,
4810 channels_by_session_id_ht_node.node)
4811 {
4812 ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
4813 if (ret) {
4814 /*
4815 * Roll-back the creation of this chunk.
4816 *
4817 * This is important since the session daemon will
4818 * assume that the creation of this chunk failed and
4819 * will never ask for it to be closed, resulting
4820 * in a leak and an inconsistent state for some
4821 * channels.
4822 */
4823 enum lttcomm_return_code close_ret;
4824 char path[LTTNG_PATH_MAX];
4825
4826 DBG("Failed to set new trace chunk on existing channels, rolling back");
4827 close_ret =
4828 lttng_consumer_close_trace_chunk(relayd_id,
4829 session_id,
4830 chunk_id,
4831 chunk_creation_timestamp,
4832 nullptr,
4833 path);
4834 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4835 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4836 ", chunk_id = %" PRIu64,
4837 session_id,
4838 chunk_id);
4839 }
4840
4841 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4842 break;
4843 }
4844 }
4845 }
4846
4847 if (relayd_id) {
4848 struct consumer_relayd_sock_pair *relayd;
4849
4850 relayd = consumer_find_relayd(*relayd_id);
4851 if (relayd) {
4852 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4853 ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
4854 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4855 } else {
4856 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4857 }
4858
4859 if (!relayd || ret) {
4860 enum lttcomm_return_code close_ret;
4861 char path[LTTNG_PATH_MAX];
4862
4863 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
4864 session_id,
4865 chunk_id,
4866 chunk_creation_timestamp,
4867 nullptr,
4868 path);
4869 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4870 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4871 ", chunk_id = %" PRIu64,
4872 session_id,
4873 chunk_id);
4874 }
4875
4876 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4877 goto error_unlock;
4878 }
4879 }
4880 error_unlock:
4881 error:
4882 /* Release the reference returned by the "publish" operation. */
4883 lttng_trace_chunk_put(published_chunk);
4884 lttng_trace_chunk_put(created_chunk);
4885 return ret_code;
4886 }
4887
4888 enum lttcomm_return_code
4889 lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
4890 uint64_t session_id,
4891 uint64_t chunk_id,
4892 time_t chunk_close_timestamp,
4893 const enum lttng_trace_chunk_command_type *close_command,
4894 char *path)
4895 {
4896 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4897 struct lttng_trace_chunk *chunk;
4898 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4899 const char *relayd_id_str = "(none)";
4900 const char *close_command_name = "none";
4901 struct lttng_ht_iter iter;
4902 struct lttng_consumer_channel *channel;
4903 enum lttng_trace_chunk_status chunk_status;
4904
4905 if (relayd_id) {
4906 int ret;
4907
4908 /* Only used for logging purposes. */
4909 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
4910 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4911 relayd_id_str = relayd_id_buffer;
4912 } else {
4913 relayd_id_str = "(formatting error)";
4914 }
4915 }
4916 if (close_command) {
4917 close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
4918 }
4919
4920 DBG("Consumer close trace chunk command: relayd_id = %s"
4921 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
4922 relayd_id_str,
4923 session_id,
4924 chunk_id,
4925 close_command_name);
4926
4927 chunk = lttng_trace_chunk_registry_find_chunk(
4928 the_consumer_data.chunk_registry, session_id, chunk_id);
4929 if (!chunk) {
4930 ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4931 session_id,
4932 chunk_id);
4933 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4934 goto end;
4935 }
4936
4937 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
4938 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4939 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4940 goto end;
4941 }
4942
4943 if (close_command) {
4944 chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
4945 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4946 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4947 goto end;
4948 }
4949 }
4950
4951 /*
4952 * chunk is now invalid to access as we no longer hold a reference to
4953 * it; it is only kept around to compare it (by address) to the
4954 * current chunk found in the session's channels.
4955 */
4956 {
4957 lttng::urcu::read_lock_guard read_lock;
4958 cds_lfht_for_each_entry (
4959 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
4960 int ret;
4961
4962 /*
4963 * Only change the channel's chunk to NULL if it still
4964 * references the chunk being closed. The channel may
4965 * reference a newer channel in the case of a session
4966 * rotation. When a session rotation occurs, the "next"
4967 * chunk is created before the "current" chunk is closed.
4968 */
4969 if (channel->trace_chunk != chunk) {
4970 continue;
4971 }
4972 ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
4973 if (ret) {
4974 /*
4975 * Attempt to close the chunk on as many channels as
4976 * possible.
4977 */
4978 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4979 }
4980 }
4981 }
4982 if (relayd_id) {
4983 int ret;
4984 struct consumer_relayd_sock_pair *relayd;
4985
4986 relayd = consumer_find_relayd(*relayd_id);
4987 if (relayd) {
4988 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4989 ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
4990 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4991 } else {
4992 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4993 }
4994
4995 if (!relayd || ret) {
4996 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4997 goto error_unlock;
4998 }
4999 }
5000 error_unlock:
5001 end:
5002 /*
5003 * Release the reference returned by the "find" operation and
5004 * the session daemon's implicit reference to the chunk.
5005 */
5006 lttng_trace_chunk_put(chunk);
5007 lttng_trace_chunk_put(chunk);
5008
5009 return ret_code;
5010 }
5011
5012 enum lttcomm_return_code
5013 lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
5014 {
5015 int ret;
5016 enum lttcomm_return_code ret_code;
5017 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
5018 const char *relayd_id_str = "(none)";
5019 const bool is_local_trace = !relayd_id;
5020 struct consumer_relayd_sock_pair *relayd = nullptr;
5021 bool chunk_exists_local, chunk_exists_remote;
5022 lttng::urcu::read_lock_guard read_lock;
5023
5024 if (relayd_id) {
5025 /* Only used for logging purposes. */
5026 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
5027 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
5028 relayd_id_str = relayd_id_buffer;
5029 } else {
5030 relayd_id_str = "(formatting error)";
5031 }
5032 }
5033
5034 DBG("Consumer trace chunk exists command: relayd_id = %s"
5035 ", chunk_id = %" PRIu64,
5036 relayd_id_str,
5037 chunk_id);
5038 ret = lttng_trace_chunk_registry_chunk_exists(
5039 the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
5040 if (ret) {
5041 /* Internal error. */
5042 ERR("Failed to query the existence of a trace chunk");
5043 ret_code = LTTCOMM_CONSUMERD_FATAL;
5044 goto end;
5045 }
5046 DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
5047 if (chunk_exists_local) {
5048 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
5049 goto end;
5050 } else if (is_local_trace) {
5051 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5052 goto end;
5053 }
5054
5055 relayd = consumer_find_relayd(*relayd_id);
5056 if (!relayd) {
5057 ERR("Failed to find relayd %" PRIu64, *relayd_id);
5058 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5059 goto end_rcu_unlock;
5060 }
5061 DBG("Looking up existence of trace chunk on relay daemon");
5062 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
5063 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
5064 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5065 if (ret < 0) {
5066 ERR("Failed to look-up the existence of trace chunk on relay daemon");
5067 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
5068 goto end_rcu_unlock;
5069 }
5070
5071 ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
5072 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5073 DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
5074
5075 end_rcu_unlock:
5076 end:
5077 return ret_code;
5078 }
5079
5080 static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
5081 {
5082 struct lttng_ht *ht;
5083 struct lttng_consumer_stream *stream;
5084 struct lttng_ht_iter iter;
5085 int ret;
5086
5087 ht = the_consumer_data.stream_per_chan_id_ht;
5088
5089 lttng::urcu::read_lock_guard read_lock;
5090 cds_lfht_for_each_entry_duplicate(ht->ht,
5091 ht->hash_fct(&channel->key, lttng_ht_seed),
5092 ht->match_fct,
5093 &channel->key,
5094 &iter.iter,
5095 stream,
5096 node_channel_id.node)
5097 {
5098 /*
5099 * Protect against teardown with mutex.
5100 */
5101 pthread_mutex_lock(&stream->lock);
5102 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5103 goto next;
5104 }
5105 ret = consumer_clear_stream(stream);
5106 if (ret) {
5107 goto error_unlock;
5108 }
5109 next:
5110 pthread_mutex_unlock(&stream->lock);
5111 }
5112 return LTTCOMM_CONSUMERD_SUCCESS;
5113
5114 error_unlock:
5115 pthread_mutex_unlock(&stream->lock);
5116 return ret;
5117 }
5118
5119 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
5120 {
5121 int ret;
5122
5123 DBG("Consumer clear channel %" PRIu64, channel->key);
5124
5125 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
5126 /*
5127 * Nothing to do for the metadata channel/stream.
5128 * Snapshot mechanism already take care of the metadata
5129 * handling/generation, and monitored channels only need to
5130 * have their data stream cleared..
5131 */
5132 ret = LTTCOMM_CONSUMERD_SUCCESS;
5133 goto end;
5134 }
5135
5136 if (!channel->monitor) {
5137 ret = consumer_clear_unmonitored_channel(channel);
5138 } else {
5139 ret = consumer_clear_monitored_channel(channel);
5140 }
5141 end:
5142 return ret;
5143 }
5144
5145 enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
5146 {
5147 struct lttng_consumer_stream *stream;
5148 enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
5149
5150 if (channel->metadata_stream) {
5151 ERR("Open channel packets command attempted on a metadata channel");
5152 ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5153 goto end;
5154 }
5155
5156 {
5157 lttng::urcu::read_lock_guard read_lock;
5158 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5159 enum consumer_stream_open_packet_status status;
5160
5161 pthread_mutex_lock(&stream->lock);
5162 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5163 goto next;
5164 }
5165
5166 status = consumer_stream_open_packet(stream);
5167 switch (status) {
5168 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
5169 DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
5170 ", channel name = %s, session id = %" PRIu64,
5171 stream->key,
5172 stream->chan->name,
5173 stream->chan->session_id);
5174 stream->opened_packet_in_current_trace_chunk = true;
5175 break;
5176 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
5177 DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
5178 ", channel name = %s, session id = %" PRIu64,
5179 stream->key,
5180 stream->chan->name,
5181 stream->chan->session_id);
5182 break;
5183 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
5184 /*
5185 * Only unexpected internal errors can lead to this
5186 * failing. Report an unknown error.
5187 */
5188 ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
5189 ", channel id = %" PRIu64 ", channel name = %s"
5190 ", session id = %" PRIu64,
5191 stream->key,
5192 channel->key,
5193 channel->name,
5194 channel->session_id);
5195 ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
5196 goto error_unlock;
5197 default:
5198 abort();
5199 }
5200
5201 next:
5202 pthread_mutex_unlock(&stream->lock);
5203 }
5204 }
5205 end_rcu_unlock:
5206 end:
5207 return ret;
5208
5209 error_unlock:
5210 pthread_mutex_unlock(&stream->lock);
5211 goto end_rcu_unlock;
5212 }
5213
5214 void lttng_consumer_sigbus_handle(void *addr)
5215 {
5216 lttng_ustconsumer_sigbus_handle(addr);
5217 }
This page took 0.177833 seconds and 5 git commands to generate.