Remove fcntl wrapper
[lttng-tools.git] / src / common / consumer / consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <common/align.hpp>
12 #include <common/common.hpp>
13 #include <common/compat/endian.hpp>
14 #include <common/compat/poll.hpp>
15 #include <common/consumer/consumer-metadata-cache.hpp>
16 #include <common/consumer/consumer-stream.hpp>
17 #include <common/consumer/consumer-testpoint.hpp>
18 #include <common/consumer/consumer-timer.hpp>
19 #include <common/consumer/consumer.hpp>
20 #include <common/dynamic-array.hpp>
21 #include <common/index/ctf-index.hpp>
22 #include <common/index/index.hpp>
23 #include <common/io-hint.hpp>
24 #include <common/kernel-consumer/kernel-consumer.hpp>
25 #include <common/kernel-ctl/kernel-ctl.hpp>
26 #include <common/relayd/relayd.hpp>
27 #include <common/sessiond-comm/relayd.hpp>
28 #include <common/sessiond-comm/sessiond-comm.hpp>
29 #include <common/string-utils/format.hpp>
30 #include <common/time.hpp>
31 #include <common/trace-chunk-registry.hpp>
32 #include <common/trace-chunk.hpp>
33 #include <common/urcu.hpp>
34 #include <common/ust-consumer/ust-consumer.hpp>
35 #include <common/utils.hpp>
36
37 #include <bin/lttng-consumerd/health-consumerd.hpp>
38 #include <fcntl.h>
39 #include <inttypes.h>
40 #include <poll.h>
41 #include <pthread.h>
42 #include <signal.h>
43 #include <stdlib.h>
44 #include <string.h>
45 #include <sys/mman.h>
46 #include <sys/socket.h>
47 #include <sys/types.h>
48 #include <unistd.h>
49
50 lttng_consumer_global_data the_consumer_data;
51
52 enum consumer_channel_action {
53 CONSUMER_CHANNEL_ADD,
54 CONSUMER_CHANNEL_DEL,
55 CONSUMER_CHANNEL_QUIT,
56 };
57
58 namespace {
59 struct consumer_channel_msg {
60 enum consumer_channel_action action;
61 struct lttng_consumer_channel *chan; /* add */
62 uint64_t key; /* del */
63 };
64
65 /*
66 * Global hash table containing respectively metadata and data streams. The
67 * stream element in this ht should only be updated by the metadata poll thread
68 * for the metadata and the data poll thread for the data.
69 */
70 struct lttng_ht *metadata_ht;
71 struct lttng_ht *data_ht;
72 } /* namespace */
73
74 /* Flag used to temporarily pause data consumption from testpoints. */
75 int data_consumption_paused;
76
77 /*
78 * Flag to inform the polling thread to quit when all fd hung up. Updated by
79 * the consumer_thread_receive_fds when it notices that all fds has hung up.
80 * Also updated by the signal handler (consumer_should_exit()). Read by the
81 * polling threads.
82 */
83 int consumer_quit;
84
85 static const char *get_consumer_domain()
86 {
87 switch (the_consumer_data.type) {
88 case LTTNG_CONSUMER_KERNEL:
89 return DEFAULT_KERNEL_TRACE_DIR;
90 case LTTNG_CONSUMER64_UST:
91 /* Fall-through. */
92 case LTTNG_CONSUMER32_UST:
93 return DEFAULT_UST_TRACE_DIR;
94 default:
95 abort();
96 }
97 }
98
99 /*
100 * Notify a thread lttng pipe to poll back again. This usually means that some
101 * global state has changed so we just send back the thread in a poll wait
102 * call.
103 */
104 static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
105 {
106 struct lttng_consumer_stream *null_stream = nullptr;
107
108 LTTNG_ASSERT(pipe);
109
110 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
111 pointer. */
112 }
113
114 static void notify_health_quit_pipe(int *pipe)
115 {
116 ssize_t ret;
117
118 ret = lttng_write(pipe[1], "4", 1);
119 if (ret < 1) {
120 PERROR("write consumer health quit");
121 }
122 }
123
124 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
125 struct lttng_consumer_channel *chan,
126 uint64_t key,
127 enum consumer_channel_action action)
128 {
129 struct consumer_channel_msg msg;
130 ssize_t ret;
131
132 memset(&msg, 0, sizeof(msg));
133
134 msg.action = action;
135 msg.chan = chan;
136 msg.key = key;
137 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
138 if (ret < sizeof(msg)) {
139 PERROR("notify_channel_pipe write error");
140 }
141 }
142
143 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
144 {
145 notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
146 }
147
148 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
149 struct lttng_consumer_channel **chan,
150 uint64_t *key,
151 enum consumer_channel_action *action)
152 {
153 struct consumer_channel_msg msg;
154 ssize_t ret;
155
156 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
157 if (ret < sizeof(msg)) {
158 ret = -1;
159 goto error;
160 }
161 *action = msg.action;
162 *chan = msg.chan;
163 *key = msg.key;
164 error:
165 return (int) ret;
166 }
167
168 /*
169 * Cleanup the stream list of a channel. Those streams are not yet globally
170 * visible
171 */
172 static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
173 {
174 struct lttng_consumer_stream *stream, *stmp;
175
176 LTTNG_ASSERT(channel);
177
178 /* Delete streams that might have been left in the stream list. */
179 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
180 /*
181 * Once a stream is added to this list, the buffers were created so we
182 * have a guarantee that this call will succeed. Setting the monitor
183 * mode to 0 so we don't lock nor try to delete the stream from the
184 * global hash table.
185 */
186 stream->monitor = 0;
187 consumer_stream_destroy(stream, nullptr);
188 }
189 }
190
191 /*
192 * Find a stream. The consumer_data.lock must be locked during this
193 * call.
194 */
195 static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
196 {
197 struct lttng_ht_iter iter;
198 struct lttng_ht_node_u64 *node;
199 struct lttng_consumer_stream *stream = nullptr;
200
201 LTTNG_ASSERT(ht);
202
203 /* -1ULL keys are lookup failures */
204 if (key == (uint64_t) -1ULL) {
205 return nullptr;
206 }
207
208 lttng::urcu::read_lock_guard read_lock;
209
210 lttng_ht_lookup(ht, &key, &iter);
211 node = lttng_ht_iter_get_node_u64(&iter);
212 if (node != nullptr) {
213 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
214 }
215
216 return stream;
217 }
218
219 static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
220 {
221 struct lttng_consumer_stream *stream;
222
223 lttng::urcu::read_lock_guard read_lock;
224 stream = find_stream(key, ht);
225 if (stream) {
226 stream->key = (uint64_t) -1ULL;
227 /*
228 * We don't want the lookup to match, but we still need
229 * to iterate on this stream when iterating over the hash table. Just
230 * change the node key.
231 */
232 stream->node.key = (uint64_t) -1ULL;
233 }
234 }
235
236 /*
237 * Return a channel object for the given key.
238 *
239 * RCU read side lock MUST be acquired before calling this function and
240 * protects the channel ptr.
241 */
242 struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
243 {
244 struct lttng_ht_iter iter;
245 struct lttng_ht_node_u64 *node;
246 struct lttng_consumer_channel *channel = nullptr;
247
248 ASSERT_RCU_READ_LOCKED();
249
250 /* -1ULL keys are lookup failures */
251 if (key == (uint64_t) -1ULL) {
252 return nullptr;
253 }
254
255 lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
256 node = lttng_ht_iter_get_node_u64(&iter);
257 if (node != nullptr) {
258 channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
259 }
260
261 return channel;
262 }
263
264 /*
265 * There is a possibility that the consumer does not have enough time between
266 * the close of the channel on the session daemon and the cleanup in here thus
267 * once we have a channel add with an existing key, we know for sure that this
268 * channel will eventually get cleaned up by all streams being closed.
269 *
270 * This function just nullifies the already existing channel key.
271 */
272 static void steal_channel_key(uint64_t key)
273 {
274 struct lttng_consumer_channel *channel;
275
276 lttng::urcu::read_lock_guard read_lock;
277 channel = consumer_find_channel(key);
278 if (channel) {
279 channel->key = (uint64_t) -1ULL;
280 /*
281 * We don't want the lookup to match, but we still need to iterate on
282 * this channel when iterating over the hash table. Just change the
283 * node key.
284 */
285 channel->node.key = (uint64_t) -1ULL;
286 }
287 }
288
289 static void free_channel_rcu(struct rcu_head *head)
290 {
291 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
292 struct lttng_consumer_channel *channel =
293 lttng::utils::container_of(node, &lttng_consumer_channel::node);
294
295 switch (the_consumer_data.type) {
296 case LTTNG_CONSUMER_KERNEL:
297 break;
298 case LTTNG_CONSUMER32_UST:
299 case LTTNG_CONSUMER64_UST:
300 lttng_ustconsumer_free_channel(channel);
301 break;
302 default:
303 ERR("Unknown consumer_data type");
304 abort();
305 }
306 free(channel);
307 }
308
309 /*
310 * RCU protected relayd socket pair free.
311 */
312 static void free_relayd_rcu(struct rcu_head *head)
313 {
314 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
315 struct consumer_relayd_sock_pair *relayd =
316 lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
317
318 /*
319 * Close all sockets. This is done in the call RCU since we don't want the
320 * socket fds to be reassigned thus potentially creating bad state of the
321 * relayd object.
322 *
323 * We do not have to lock the control socket mutex here since at this stage
324 * there is no one referencing to this relayd object.
325 */
326 (void) relayd_close(&relayd->control_sock);
327 (void) relayd_close(&relayd->data_sock);
328
329 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
330 free(relayd);
331 }
332
333 /*
334 * Destroy and free relayd socket pair object.
335 */
336 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
337 {
338 int ret;
339 struct lttng_ht_iter iter;
340
341 if (relayd == nullptr) {
342 return;
343 }
344
345 DBG("Consumer destroy and close relayd socket pair");
346
347 iter.iter.node = &relayd->node.node;
348 ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
349 if (ret != 0) {
350 /* We assume the relayd is being or is destroyed */
351 return;
352 }
353
354 /* RCU free() call */
355 call_rcu(&relayd->node.head, free_relayd_rcu);
356 }
357
358 /*
359 * Remove a channel from the global list protected by a mutex. This function is
360 * also responsible for freeing its data structures.
361 */
362 void consumer_del_channel(struct lttng_consumer_channel *channel)
363 {
364 struct lttng_ht_iter iter;
365
366 DBG("Consumer delete channel key %" PRIu64, channel->key);
367
368 pthread_mutex_lock(&the_consumer_data.lock);
369 pthread_mutex_lock(&channel->lock);
370
371 /* Destroy streams that might have been left in the stream list. */
372 clean_channel_stream_list(channel);
373
374 if (channel->live_timer_enabled == 1) {
375 consumer_timer_live_stop(channel);
376 }
377 if (channel->monitor_timer_enabled == 1) {
378 consumer_timer_monitor_stop(channel);
379 }
380
381 /*
382 * Send a last buffer statistics sample to the session daemon
383 * to ensure it tracks the amount of data consumed by this channel.
384 */
385 sample_and_send_channel_buffer_stats(channel);
386
387 switch (the_consumer_data.type) {
388 case LTTNG_CONSUMER_KERNEL:
389 break;
390 case LTTNG_CONSUMER32_UST:
391 case LTTNG_CONSUMER64_UST:
392 lttng_ustconsumer_del_channel(channel);
393 break;
394 default:
395 ERR("Unknown consumer_data type");
396 abort();
397 goto end;
398 }
399
400 lttng_trace_chunk_put(channel->trace_chunk);
401 channel->trace_chunk = nullptr;
402
403 if (channel->is_published) {
404 int ret;
405
406 lttng::urcu::read_lock_guard read_lock;
407 iter.iter.node = &channel->node.node;
408 ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
409 LTTNG_ASSERT(!ret);
410
411 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
412 ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
413 LTTNG_ASSERT(!ret);
414 }
415
416 channel->is_deleted = true;
417 call_rcu(&channel->node.head, free_channel_rcu);
418 end:
419 pthread_mutex_unlock(&channel->lock);
420 pthread_mutex_unlock(&the_consumer_data.lock);
421 }
422
423 /*
424 * Iterate over the relayd hash table and destroy each element. Finally,
425 * destroy the whole hash table.
426 */
427 static void cleanup_relayd_ht()
428 {
429 struct lttng_ht_iter iter;
430 struct consumer_relayd_sock_pair *relayd;
431
432 {
433 lttng::urcu::read_lock_guard read_lock;
434
435 cds_lfht_for_each_entry (
436 the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
437 consumer_destroy_relayd(relayd);
438 }
439 }
440
441 lttng_ht_destroy(the_consumer_data.relayd_ht);
442 }
443
444 /*
445 * Update the end point status of all streams having the given network sequence
446 * index (relayd index).
447 *
448 * It's atomically set without having the stream mutex locked which is fine
449 * because we handle the write/read race with a pipe wakeup for each thread.
450 */
451 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
452 enum consumer_endpoint_status status)
453 {
454 struct lttng_ht_iter iter;
455 struct lttng_consumer_stream *stream;
456
457 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
458
459 lttng::urcu::read_lock_guard read_lock;
460
461 /* Let's begin with metadata */
462 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
463 if (stream->net_seq_idx == net_seq_idx) {
464 uatomic_set(&stream->endpoint_status, status);
465 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
466 }
467 }
468
469 /* Follow up by the data streams */
470 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
471 if (stream->net_seq_idx == net_seq_idx) {
472 uatomic_set(&stream->endpoint_status, status);
473 DBG("Delete flag set to data stream %d", stream->wait_fd);
474 }
475 }
476 }
477
478 /*
479 * Cleanup a relayd object by flagging every associated streams for deletion,
480 * destroying the object meaning removing it from the relayd hash table,
481 * closing the sockets and freeing the memory in a RCU call.
482 *
483 * If a local data context is available, notify the threads that the streams'
484 * state have changed.
485 */
486 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
487 {
488 uint64_t netidx;
489
490 LTTNG_ASSERT(relayd);
491
492 DBG("Cleaning up relayd object ID %" PRIu64, relayd->net_seq_idx);
493
494 /* Save the net sequence index before destroying the object */
495 netidx = relayd->net_seq_idx;
496
497 /*
498 * Delete the relayd from the relayd hash table, close the sockets and free
499 * the object in a RCU call.
500 */
501 consumer_destroy_relayd(relayd);
502
503 /* Set inactive endpoint to all streams */
504 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
505
506 /*
507 * With a local data context, notify the threads that the streams' state
508 * have changed. The write() action on the pipe acts as an "implicit"
509 * memory barrier ordering the updates of the end point status from the
510 * read of this status which happens AFTER receiving this notify.
511 */
512 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
513 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
514 }
515
516 /*
517 * Flag a relayd socket pair for destruction. Destroy it if the refcount
518 * reaches zero.
519 *
520 * RCU read side lock MUST be aquired before calling this function.
521 */
522 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
523 {
524 LTTNG_ASSERT(relayd);
525 ASSERT_RCU_READ_LOCKED();
526
527 /* Set destroy flag for this object */
528 uatomic_set(&relayd->destroy_flag, 1);
529
530 /* Destroy the relayd if refcount is 0 */
531 if (uatomic_read(&relayd->refcount) == 0) {
532 consumer_destroy_relayd(relayd);
533 }
534 }
535
536 /*
537 * Completly destroy stream from every visiable data structure and the given
538 * hash table if one.
539 *
540 * One this call returns, the stream object is not longer usable nor visible.
541 */
542 void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
543 {
544 consumer_stream_destroy(stream, ht);
545 }
546
547 /*
548 * XXX naming of del vs destroy is all mixed up.
549 */
550 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
551 {
552 consumer_stream_destroy(stream, data_ht);
553 }
554
555 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
556 {
557 consumer_stream_destroy(stream, metadata_ht);
558 }
559
560 void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
561 struct lttng_consumer_channel *channel)
562 {
563 stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
564 }
565
566 /*
567 * Add a stream to the global list protected by a mutex.
568 */
569 void consumer_add_data_stream(struct lttng_consumer_stream *stream)
570 {
571 struct lttng_ht *ht = data_ht;
572
573 LTTNG_ASSERT(stream);
574 LTTNG_ASSERT(ht);
575
576 DBG3("Adding consumer stream %" PRIu64, stream->key);
577
578 pthread_mutex_lock(&the_consumer_data.lock);
579 pthread_mutex_lock(&stream->chan->lock);
580 pthread_mutex_lock(&stream->chan->timer_lock);
581 pthread_mutex_lock(&stream->lock);
582 lttng::urcu::read_lock_guard read_lock;
583
584 /* Steal stream identifier to avoid having streams with the same key */
585 steal_stream_key(stream->key, ht);
586
587 lttng_ht_add_unique_u64(ht, &stream->node);
588
589 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
590
591 /*
592 * Add stream to the stream_list_ht of the consumer data. No need to steal
593 * the key since the HT does not use it and we allow to add redundant keys
594 * into this table.
595 */
596 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
597
598 /*
599 * When nb_init_stream_left reaches 0, we don't need to trigger any action
600 * in terms of destroying the associated channel, because the action that
601 * causes the count to become 0 also causes a stream to be added. The
602 * channel deletion will thus be triggered by the following removal of this
603 * stream.
604 */
605 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
606 /* Increment refcount before decrementing nb_init_stream_left */
607 cmm_smp_wmb();
608 uatomic_dec(&stream->chan->nb_init_stream_left);
609 }
610
611 /* Update consumer data once the node is inserted. */
612 the_consumer_data.stream_count++;
613 the_consumer_data.need_update = 1;
614
615 pthread_mutex_unlock(&stream->lock);
616 pthread_mutex_unlock(&stream->chan->timer_lock);
617 pthread_mutex_unlock(&stream->chan->lock);
618 pthread_mutex_unlock(&the_consumer_data.lock);
619 }
620
621 /*
622 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
623 * be acquired before calling this.
624 */
625 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
626 {
627 int ret = 0;
628 struct lttng_ht_node_u64 *node;
629 struct lttng_ht_iter iter;
630
631 LTTNG_ASSERT(relayd);
632 ASSERT_RCU_READ_LOCKED();
633
634 lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
635 node = lttng_ht_iter_get_node_u64(&iter);
636 if (node != nullptr) {
637 goto end;
638 }
639 lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
640
641 end:
642 return ret;
643 }
644
645 /*
646 * Allocate and return a consumer relayd socket.
647 */
648 static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
649 {
650 struct consumer_relayd_sock_pair *obj = nullptr;
651
652 /* net sequence index of -1 is a failure */
653 if (net_seq_idx == (uint64_t) -1ULL) {
654 goto error;
655 }
656
657 obj = zmalloc<consumer_relayd_sock_pair>();
658 if (obj == nullptr) {
659 PERROR("zmalloc relayd sock");
660 goto error;
661 }
662
663 obj->net_seq_idx = net_seq_idx;
664 obj->refcount = 0;
665 obj->destroy_flag = 0;
666 obj->control_sock.sock.fd = -1;
667 obj->data_sock.sock.fd = -1;
668 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
669 pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
670
671 error:
672 return obj;
673 }
674
675 /*
676 * Find a relayd socket pair in the global consumer data.
677 *
678 * Return the object if found else NULL.
679 * RCU read-side lock must be held across this call and while using the
680 * returned object.
681 */
682 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
683 {
684 struct lttng_ht_iter iter;
685 struct lttng_ht_node_u64 *node;
686 struct consumer_relayd_sock_pair *relayd = nullptr;
687
688 ASSERT_RCU_READ_LOCKED();
689
690 /* Negative keys are lookup failures */
691 if (key == (uint64_t) -1ULL) {
692 goto error;
693 }
694
695 lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
696 node = lttng_ht_iter_get_node_u64(&iter);
697 if (node != nullptr) {
698 relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
699 }
700
701 error:
702 return relayd;
703 }
704
705 /*
706 * Find a relayd and send the stream
707 *
708 * Returns 0 on success, < 0 on error
709 */
710 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
711 {
712 int ret = 0;
713 struct consumer_relayd_sock_pair *relayd;
714
715 LTTNG_ASSERT(stream);
716 LTTNG_ASSERT(stream->net_seq_idx != -1ULL);
717 LTTNG_ASSERT(path);
718
719 /* The stream is not metadata. Get relayd reference if exists. */
720 lttng::urcu::read_lock_guard read_lock;
721 relayd = consumer_find_relayd(stream->net_seq_idx);
722 if (relayd != nullptr) {
723 /* Add stream on the relayd */
724 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
725 ret = relayd_add_stream(&relayd->control_sock,
726 stream->name,
727 get_consumer_domain(),
728 path,
729 &stream->relayd_stream_id,
730 stream->chan->tracefile_size,
731 stream->chan->tracefile_count,
732 stream->trace_chunk);
733 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
734 if (ret < 0) {
735 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
736 relayd->net_seq_idx);
737 lttng_consumer_cleanup_relayd(relayd);
738 goto end;
739 }
740
741 uatomic_inc(&relayd->refcount);
742 stream->sent_to_relayd = 1;
743 } else {
744 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
745 stream->key,
746 stream->net_seq_idx);
747 ret = -1;
748 goto end;
749 }
750
751 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
752 stream->name,
753 stream->key,
754 stream->net_seq_idx);
755
756 end:
757 return ret;
758 }
759
760 /*
761 * Find a relayd and send the streams sent message
762 *
763 * Returns 0 on success, < 0 on error
764 */
765 int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
766 {
767 int ret = 0;
768 struct consumer_relayd_sock_pair *relayd;
769
770 LTTNG_ASSERT(net_seq_idx != -1ULL);
771
772 /* The stream is not metadata. Get relayd reference if exists. */
773 lttng::urcu::read_lock_guard read_lock;
774 relayd = consumer_find_relayd(net_seq_idx);
775 if (relayd != nullptr) {
776 /* Add stream on the relayd */
777 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
778 ret = relayd_streams_sent(&relayd->control_sock);
779 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
780 if (ret < 0) {
781 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
782 relayd->net_seq_idx);
783 lttng_consumer_cleanup_relayd(relayd);
784 goto end;
785 }
786 } else {
787 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
788 ret = -1;
789 goto end;
790 }
791
792 ret = 0;
793 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
794
795 end:
796 return ret;
797 }
798
799 /*
800 * Find a relayd and close the stream
801 */
802 void close_relayd_stream(struct lttng_consumer_stream *stream)
803 {
804 struct consumer_relayd_sock_pair *relayd;
805
806 /* The stream is not metadata. Get relayd reference if exists. */
807 lttng::urcu::read_lock_guard read_lock;
808 relayd = consumer_find_relayd(stream->net_seq_idx);
809 if (relayd) {
810 consumer_stream_relayd_close(stream, relayd);
811 }
812 }
813
814 /*
815 * Handle stream for relayd transmission if the stream applies for network
816 * streaming where the net sequence index is set.
817 *
818 * Return destination file descriptor or negative value on error.
819 */
820 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
821 size_t data_size,
822 unsigned long padding,
823 struct consumer_relayd_sock_pair *relayd)
824 {
825 int outfd = -1, ret;
826 struct lttcomm_relayd_data_hdr data_hdr;
827
828 /* Safety net */
829 LTTNG_ASSERT(stream);
830 LTTNG_ASSERT(relayd);
831
832 /* Reset data header */
833 memset(&data_hdr, 0, sizeof(data_hdr));
834
835 if (stream->metadata_flag) {
836 /* Caller MUST acquire the relayd control socket lock */
837 ret = relayd_send_metadata(&relayd->control_sock, data_size);
838 if (ret < 0) {
839 goto error;
840 }
841
842 /* Metadata are always sent on the control socket. */
843 outfd = relayd->control_sock.sock.fd;
844 } else {
845 /* Set header with stream information */
846 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
847 data_hdr.data_size = htobe32(data_size);
848 data_hdr.padding_size = htobe32(padding);
849
850 /*
851 * Note that net_seq_num below is assigned with the *current* value of
852 * next_net_seq_num and only after that the next_net_seq_num will be
853 * increment. This is why when issuing a command on the relayd using
854 * this next value, 1 should always be substracted in order to compare
855 * the last seen sequence number on the relayd side to the last sent.
856 */
857 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
858 /* Other fields are zeroed previously */
859
860 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
861 if (ret < 0) {
862 goto error;
863 }
864
865 ++stream->next_net_seq_num;
866
867 /* Set to go on data socket */
868 outfd = relayd->data_sock.sock.fd;
869 }
870
871 error:
872 return outfd;
873 }
874
875 /*
876 * Write a character on the metadata poll pipe to wake the metadata thread.
877 * Returns 0 on success, -1 on error.
878 */
879 int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
880 {
881 int ret = 0;
882
883 DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
884 if (channel->monitor && channel->metadata_stream) {
885 const char dummy = 'c';
886 const ssize_t write_ret =
887 lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
888
889 if (write_ret < 1) {
890 if (errno == EWOULDBLOCK) {
891 /*
892 * This is fine, the metadata poll thread
893 * is having a hard time keeping-up, but
894 * it will eventually wake-up and consume
895 * the available data.
896 */
897 ret = 0;
898 } else {
899 PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
900 ret = -1;
901 goto end;
902 }
903 }
904 }
905
906 end:
907 return ret;
908 }
909
910 /*
911 * Trigger a dump of the metadata content. Following/during the succesful
912 * completion of this call, the metadata poll thread will start receiving
913 * metadata packets to consume.
914 *
915 * The caller must hold the channel and stream locks.
916 */
917 static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
918 {
919 int ret;
920
921 ASSERT_LOCKED(stream->chan->lock);
922 ASSERT_LOCKED(stream->lock);
923 LTTNG_ASSERT(stream->metadata_flag);
924 LTTNG_ASSERT(stream->chan->trace_chunk);
925
926 switch (the_consumer_data.type) {
927 case LTTNG_CONSUMER_KERNEL:
928 /*
929 * Reset the position of what has been read from the
930 * metadata cache to 0 so we can dump it again.
931 */
932 ret = kernctl_metadata_cache_dump(stream->wait_fd);
933 break;
934 case LTTNG_CONSUMER32_UST:
935 case LTTNG_CONSUMER64_UST:
936 /*
937 * Reset the position pushed from the metadata cache so it
938 * will write from the beginning on the next push.
939 */
940 stream->ust_metadata_pushed = 0;
941 ret = consumer_metadata_wakeup_pipe(stream->chan);
942 break;
943 default:
944 ERR("Unknown consumer_data type");
945 abort();
946 }
947 if (ret < 0) {
948 ERR("Failed to dump the metadata cache");
949 }
950 return ret;
951 }
952
953 static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
954 struct lttng_trace_chunk *new_trace_chunk)
955 {
956 pthread_mutex_lock(&channel->lock);
957 if (channel->is_deleted) {
958 /*
959 * The channel has been logically deleted and should no longer
960 * be used. It has released its reference to its current trace
961 * chunk and should not acquire a new one.
962 *
963 * Return success as there is nothing for the caller to do.
964 */
965 goto end;
966 }
967
968 /*
969 * The acquisition of the reference cannot fail (barring
970 * a severe internal error) since a reference to the published
971 * chunk is already held by the caller.
972 */
973 if (new_trace_chunk) {
974 const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
975
976 LTTNG_ASSERT(acquired_reference);
977 }
978
979 lttng_trace_chunk_put(channel->trace_chunk);
980 channel->trace_chunk = new_trace_chunk;
981 end:
982 pthread_mutex_unlock(&channel->lock);
983 return 0;
984 }
985
986 /*
987 * Allocate and return a new lttng_consumer_channel object using the given key
988 * to initialize the hash table node.
989 *
990 * On error, return NULL.
991 */
992 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
993 uint64_t session_id,
994 const uint64_t *chunk_id,
995 const char *pathname,
996 const char *name,
997 uint64_t relayd_id,
998 enum lttng_event_output output,
999 uint64_t tracefile_size,
1000 uint64_t tracefile_count,
1001 uint64_t session_id_per_pid,
1002 unsigned int monitor,
1003 unsigned int live_timer_interval,
1004 bool is_in_live_session,
1005 const char *root_shm_path,
1006 const char *shm_path)
1007 {
1008 struct lttng_consumer_channel *channel = nullptr;
1009 struct lttng_trace_chunk *trace_chunk = nullptr;
1010
1011 if (chunk_id) {
1012 trace_chunk = lttng_trace_chunk_registry_find_chunk(
1013 the_consumer_data.chunk_registry, session_id, *chunk_id);
1014 if (!trace_chunk) {
1015 ERR("Failed to find trace chunk reference during creation of channel");
1016 goto end;
1017 }
1018 }
1019
1020 channel = zmalloc<lttng_consumer_channel>();
1021 if (channel == nullptr) {
1022 PERROR("malloc struct lttng_consumer_channel");
1023 goto end;
1024 }
1025
1026 channel->key = key;
1027 channel->refcount = 0;
1028 channel->session_id = session_id;
1029 channel->session_id_per_pid = session_id_per_pid;
1030 channel->relayd_id = relayd_id;
1031 channel->tracefile_size = tracefile_size;
1032 channel->tracefile_count = tracefile_count;
1033 channel->monitor = monitor;
1034 channel->live_timer_interval = live_timer_interval;
1035 channel->is_live = is_in_live_session;
1036 pthread_mutex_init(&channel->lock, nullptr);
1037 pthread_mutex_init(&channel->timer_lock, nullptr);
1038
1039 switch (output) {
1040 case LTTNG_EVENT_SPLICE:
1041 channel->output = CONSUMER_CHANNEL_SPLICE;
1042 break;
1043 case LTTNG_EVENT_MMAP:
1044 channel->output = CONSUMER_CHANNEL_MMAP;
1045 break;
1046 default:
1047 abort();
1048 free(channel);
1049 channel = nullptr;
1050 goto end;
1051 }
1052
1053 /*
1054 * In monitor mode, the streams associated with the channel will be put in
1055 * a special list ONLY owned by this channel. So, the refcount is set to 1
1056 * here meaning that the channel itself has streams that are referenced.
1057 *
1058 * On a channel deletion, once the channel is no longer visible, the
1059 * refcount is decremented and checked for a zero value to delete it. With
1060 * streams in no monitor mode, it will now be safe to destroy the channel.
1061 */
1062 if (!channel->monitor) {
1063 channel->refcount = 1;
1064 }
1065
1066 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1067 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1068
1069 strncpy(channel->name, name, sizeof(channel->name));
1070 channel->name[sizeof(channel->name) - 1] = '\0';
1071
1072 if (root_shm_path) {
1073 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1074 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1075 }
1076 if (shm_path) {
1077 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1078 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1079 }
1080
1081 lttng_ht_node_init_u64(&channel->node, channel->key);
1082 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
1083
1084 channel->wait_fd = -1;
1085 CDS_INIT_LIST_HEAD(&channel->streams.head);
1086
1087 if (trace_chunk) {
1088 int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
1089 if (ret) {
1090 goto error;
1091 }
1092 }
1093
1094 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
1095
1096 end:
1097 lttng_trace_chunk_put(trace_chunk);
1098 return channel;
1099 error:
1100 consumer_del_channel(channel);
1101 channel = nullptr;
1102 goto end;
1103 }
1104
1105 /*
1106 * Add a channel to the global list protected by a mutex.
1107 *
1108 * Always return 0 indicating success.
1109 */
1110 int consumer_add_channel(struct lttng_consumer_channel *channel,
1111 struct lttng_consumer_local_data *ctx)
1112 {
1113 pthread_mutex_lock(&the_consumer_data.lock);
1114 pthread_mutex_lock(&channel->lock);
1115 pthread_mutex_lock(&channel->timer_lock);
1116
1117 /*
1118 * This gives us a guarantee that the channel we are about to add to the
1119 * channel hash table will be unique. See this function comment on the why
1120 * we need to steel the channel key at this stage.
1121 */
1122 steal_channel_key(channel->key);
1123
1124 lttng::urcu::read_lock_guard read_lock;
1125 lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
1126 lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
1127 &channel->channels_by_session_id_ht_node);
1128 channel->is_published = true;
1129
1130 pthread_mutex_unlock(&channel->timer_lock);
1131 pthread_mutex_unlock(&channel->lock);
1132 pthread_mutex_unlock(&the_consumer_data.lock);
1133
1134 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
1135 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
1136 }
1137
1138 return 0;
1139 }
1140
1141 /*
1142 * Allocate the pollfd structure and the local view of the out fds to avoid
1143 * doing a lookup in the linked list and concurrency issues when writing is
1144 * needed. Called with consumer_data.lock held.
1145 *
1146 * Returns the number of fds in the structures.
1147 */
1148 static int update_poll_array(struct lttng_consumer_local_data *ctx,
1149 struct pollfd **pollfd,
1150 struct lttng_consumer_stream **local_stream,
1151 struct lttng_ht *ht,
1152 int *nb_inactive_fd)
1153 {
1154 int i = 0;
1155 struct lttng_ht_iter iter;
1156 struct lttng_consumer_stream *stream;
1157
1158 LTTNG_ASSERT(ctx);
1159 LTTNG_ASSERT(ht);
1160 LTTNG_ASSERT(pollfd);
1161 LTTNG_ASSERT(local_stream);
1162
1163 DBG("Updating poll fd array");
1164 *nb_inactive_fd = 0;
1165
1166 {
1167 lttng::urcu::read_lock_guard read_lock;
1168 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1169 /*
1170 * Only active streams with an active end point can be added to the
1171 * poll set and local stream storage of the thread.
1172 *
1173 * There is a potential race here for endpoint_status to be updated
1174 * just after the check. However, this is OK since the stream(s) will
1175 * be deleted once the thread is notified that the end point state has
1176 * changed where this function will be called back again.
1177 *
1178 * We track the number of inactive FDs because they still need to be
1179 * closed by the polling thread after a wakeup on the data_pipe or
1180 * metadata_pipe.
1181 */
1182 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1183 (*nb_inactive_fd)++;
1184 continue;
1185 }
1186
1187 (*pollfd)[i].fd = stream->wait_fd;
1188 (*pollfd)[i].events = POLLIN | POLLPRI;
1189 local_stream[i] = stream;
1190 i++;
1191 }
1192 }
1193
1194 /*
1195 * Insert the consumer_data_pipe at the end of the array and don't
1196 * increment i so nb_fd is the number of real FD.
1197 */
1198 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
1199 (*pollfd)[i].events = POLLIN | POLLPRI;
1200
1201 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1202 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
1203 return i;
1204 }
1205
1206 /*
1207 * Poll on the should_quit pipe and the command socket return -1 on
1208 * error, 1 if should exit, 0 if data is available on the command socket
1209 */
1210 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1211 {
1212 int num_rdy;
1213
1214 restart:
1215 num_rdy = poll(consumer_sockpoll, 2, -1);
1216 if (num_rdy == -1) {
1217 /*
1218 * Restart interrupted system call.
1219 */
1220 if (errno == EINTR) {
1221 goto restart;
1222 }
1223 PERROR("Poll error");
1224 return -1;
1225 }
1226 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
1227 DBG("consumer_should_quit wake up");
1228 return 1;
1229 }
1230 return 0;
1231 }
1232
1233 /*
1234 * Set the error socket.
1235 */
1236 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
1237 {
1238 ctx->consumer_error_socket = sock;
1239 }
1240
1241 /*
1242 * Set the command socket path.
1243 */
1244 void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
1245 {
1246 ctx->consumer_command_sock_path = sock;
1247 }
1248
1249 /*
1250 * Send return code to the session daemon.
1251 * If the socket is not defined, we return 0, it is not a fatal error
1252 */
1253 int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
1254 {
1255 if (ctx->consumer_error_socket > 0) {
1256 return lttcomm_send_unix_sock(
1257 ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
1258 }
1259
1260 return 0;
1261 }
1262
1263 /*
1264 * Close all the tracefiles and stream fds and MUST be called when all
1265 * instances are destroyed i.e. when all threads were joined and are ended.
1266 */
1267 void lttng_consumer_cleanup()
1268 {
1269 struct lttng_ht_iter iter;
1270 struct lttng_consumer_channel *channel;
1271 unsigned int trace_chunks_left;
1272
1273 {
1274 lttng::urcu::read_lock_guard read_lock;
1275
1276 cds_lfht_for_each_entry (
1277 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
1278 consumer_del_channel(channel);
1279 }
1280 }
1281
1282 lttng_ht_destroy(the_consumer_data.channel_ht);
1283 lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
1284
1285 cleanup_relayd_ht();
1286
1287 lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
1288
1289 /*
1290 * This HT contains streams that are freed by either the metadata thread or
1291 * the data thread so we do *nothing* on the hash table and simply destroy
1292 * it.
1293 */
1294 lttng_ht_destroy(the_consumer_data.stream_list_ht);
1295
1296 /*
1297 * Trace chunks in the registry may still exist if the session
1298 * daemon has encountered an internal error and could not
1299 * tear down its sessions and/or trace chunks properly.
1300 *
1301 * Release the session daemon's implicit reference to any remaining
1302 * trace chunk and print an error if any trace chunk was found. Note
1303 * that there are _no_ legitimate cases for trace chunks to be left,
1304 * it is a leak. However, it can happen following a crash of the
1305 * session daemon and not emptying the registry would cause an assertion
1306 * to hit.
1307 */
1308 trace_chunks_left =
1309 lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
1310 if (trace_chunks_left) {
1311 ERR("%u trace chunks are leaked by lttng-consumerd. "
1312 "This can be caused by an internal error of the session daemon.",
1313 trace_chunks_left);
1314 }
1315 /* Run all callbacks freeing each chunk. */
1316 rcu_barrier();
1317 lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
1318 }
1319
1320 /*
1321 * Called from signal handler.
1322 */
1323 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1324 {
1325 ssize_t ret;
1326
1327 CMM_STORE_SHARED(consumer_quit, 1);
1328 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1329 if (ret < 1) {
1330 PERROR("write consumer quit");
1331 }
1332
1333 DBG("Consumer flag that it should quit");
1334 }
1335
1336 /*
1337 * Flush pending writes to trace output disk file.
1338 */
1339 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
1340 {
1341 int outfd = stream->out_fd;
1342
1343 /*
1344 * This does a blocking write-and-wait on any page that belongs to the
1345 * subbuffer prior to the one we just wrote.
1346 * Don't care about error values, as these are just hints and ways to
1347 * limit the amount of page cache used.
1348 */
1349 if (orig_offset < stream->max_sb_size) {
1350 return;
1351 }
1352 lttng::io::hint_flush_range_dont_need_sync(
1353 outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
1354 }
1355
1356 /*
1357 * Initialise the necessary environnement :
1358 * - create a new context
1359 * - create the poll_pipe
1360 * - create the should_quit pipe (for signal handler)
1361 * - create the thread pipe (for splice)
1362 *
1363 * Takes a function pointer as argument, this function is called when data is
1364 * available on a buffer. This function is responsible to do the
1365 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1366 * buffer configuration and then kernctl_put_next_subbuf at the end.
1367 *
1368 * Returns a pointer to the new context or NULL on error.
1369 */
1370 struct lttng_consumer_local_data *
1371 lttng_consumer_create(enum lttng_consumer_type type,
1372 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1373 struct lttng_consumer_local_data *ctx,
1374 bool locked_by_caller),
1375 int (*recv_channel)(struct lttng_consumer_channel *channel),
1376 int (*recv_stream)(struct lttng_consumer_stream *stream),
1377 int (*update_stream)(uint64_t stream_key, uint32_t state))
1378 {
1379 int ret;
1380 struct lttng_consumer_local_data *ctx;
1381
1382 LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1383 the_consumer_data.type == type);
1384 the_consumer_data.type = type;
1385
1386 ctx = zmalloc<lttng_consumer_local_data>();
1387 if (ctx == nullptr) {
1388 PERROR("allocating context");
1389 goto error;
1390 }
1391
1392 ctx->consumer_error_socket = -1;
1393 ctx->consumer_metadata_socket = -1;
1394 pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
1395 /* assign the callbacks */
1396 ctx->on_buffer_ready = buffer_ready;
1397 ctx->on_recv_channel = recv_channel;
1398 ctx->on_recv_stream = recv_stream;
1399 ctx->on_update_stream = update_stream;
1400
1401 ctx->consumer_data_pipe = lttng_pipe_open(0);
1402 if (!ctx->consumer_data_pipe) {
1403 goto error_poll_pipe;
1404 }
1405
1406 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1407 if (!ctx->consumer_wakeup_pipe) {
1408 goto error_wakeup_pipe;
1409 }
1410
1411 ret = pipe(ctx->consumer_should_quit);
1412 if (ret < 0) {
1413 PERROR("Error creating recv pipe");
1414 goto error_quit_pipe;
1415 }
1416
1417 ret = pipe(ctx->consumer_channel_pipe);
1418 if (ret < 0) {
1419 PERROR("Error creating channel pipe");
1420 goto error_channel_pipe;
1421 }
1422
1423 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1424 if (!ctx->consumer_metadata_pipe) {
1425 goto error_metadata_pipe;
1426 }
1427
1428 ctx->channel_monitor_pipe = -1;
1429
1430 return ctx;
1431
1432 error_metadata_pipe:
1433 utils_close_pipe(ctx->consumer_channel_pipe);
1434 error_channel_pipe:
1435 utils_close_pipe(ctx->consumer_should_quit);
1436 error_quit_pipe:
1437 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1438 error_wakeup_pipe:
1439 lttng_pipe_destroy(ctx->consumer_data_pipe);
1440 error_poll_pipe:
1441 free(ctx);
1442 error:
1443 return nullptr;
1444 }
1445
1446 /*
1447 * Iterate over all streams of the hashtable and free them properly.
1448 */
1449 static void destroy_data_stream_ht(struct lttng_ht *ht)
1450 {
1451 struct lttng_ht_iter iter;
1452 struct lttng_consumer_stream *stream;
1453
1454 if (ht == nullptr) {
1455 return;
1456 }
1457
1458 {
1459 lttng::urcu::read_lock_guard read_lock;
1460 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1461 /*
1462 * Ignore return value since we are currently cleaning up so any error
1463 * can't be handled.
1464 */
1465 (void) consumer_del_stream(stream, ht);
1466 }
1467 }
1468
1469 lttng_ht_destroy(ht);
1470 }
1471
1472 /*
1473 * Iterate over all streams of the metadata hashtable and free them
1474 * properly.
1475 */
1476 static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1477 {
1478 struct lttng_ht_iter iter;
1479 struct lttng_consumer_stream *stream;
1480
1481 if (ht == nullptr) {
1482 return;
1483 }
1484
1485 {
1486 lttng::urcu::read_lock_guard read_lock;
1487 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1488 /*
1489 * Ignore return value since we are currently cleaning up so any error
1490 * can't be handled.
1491 */
1492 (void) consumer_del_metadata_stream(stream, ht);
1493 }
1494 }
1495
1496 lttng_ht_destroy(ht);
1497 }
1498
1499 /*
1500 * Close all fds associated with the instance and free the context.
1501 */
1502 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1503 {
1504 int ret;
1505
1506 DBG("Consumer destroying it. Closing everything.");
1507
1508 if (!ctx) {
1509 return;
1510 }
1511
1512 destroy_data_stream_ht(data_ht);
1513 destroy_metadata_stream_ht(metadata_ht);
1514
1515 ret = close(ctx->consumer_error_socket);
1516 if (ret) {
1517 PERROR("close");
1518 }
1519 ret = close(ctx->consumer_metadata_socket);
1520 if (ret) {
1521 PERROR("close");
1522 }
1523 utils_close_pipe(ctx->consumer_channel_pipe);
1524 lttng_pipe_destroy(ctx->consumer_data_pipe);
1525 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1526 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1527 utils_close_pipe(ctx->consumer_should_quit);
1528
1529 unlink(ctx->consumer_command_sock_path);
1530 free(ctx);
1531 }
1532
1533 /*
1534 * Write the metadata stream id on the specified file descriptor.
1535 */
1536 static int
1537 write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
1538 {
1539 ssize_t ret;
1540 struct lttcomm_relayd_metadata_payload hdr;
1541
1542 hdr.stream_id = htobe64(stream->relayd_stream_id);
1543 hdr.padding_size = htobe32(padding);
1544 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1545 if (ret < sizeof(hdr)) {
1546 /*
1547 * This error means that the fd's end is closed so ignore the PERROR
1548 * not to clubber the error output since this can happen in a normal
1549 * code path.
1550 */
1551 if (errno != EPIPE) {
1552 PERROR("write metadata stream id");
1553 }
1554 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
1555 /*
1556 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1557 * handle writting the missing part so report that as an error and
1558 * don't lie to the caller.
1559 */
1560 ret = -1;
1561 goto end;
1562 }
1563 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1564 stream->relayd_stream_id,
1565 padding);
1566
1567 end:
1568 return (int) ret;
1569 }
1570
1571 /*
1572 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1573 * core function for writing trace buffers to either the local filesystem or
1574 * the network.
1575 *
1576 * It must be called with the stream and the channel lock held.
1577 *
1578 * Careful review MUST be put if any changes occur!
1579 *
1580 * Returns the number of bytes written
1581 */
1582 ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
1583 const struct lttng_buffer_view *buffer,
1584 unsigned long padding)
1585 {
1586 ssize_t ret = 0;
1587 off_t orig_offset = stream->out_fd_offset;
1588 /* Default is on the disk */
1589 int outfd = stream->out_fd;
1590 struct consumer_relayd_sock_pair *relayd = nullptr;
1591 unsigned int relayd_hang_up = 0;
1592 const size_t subbuf_content_size = buffer->size - padding;
1593 size_t write_len;
1594
1595 /* RCU lock for the relayd pointer */
1596 lttng::urcu::read_lock_guard read_lock;
1597 LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
1598
1599 /* Flag that the current stream if set for network streaming. */
1600 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1601 relayd = consumer_find_relayd(stream->net_seq_idx);
1602 if (relayd == nullptr) {
1603 ret = -EPIPE;
1604 goto end;
1605 }
1606 }
1607
1608 /* Handle stream on the relayd if the output is on the network */
1609 if (relayd) {
1610 unsigned long netlen = subbuf_content_size;
1611
1612 /*
1613 * Lock the control socket for the complete duration of the function
1614 * since from this point on we will use the socket.
1615 */
1616 if (stream->metadata_flag) {
1617 /* Metadata requires the control socket. */
1618 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1619 if (stream->reset_metadata_flag) {
1620 ret = relayd_reset_metadata(&relayd->control_sock,
1621 stream->relayd_stream_id,
1622 stream->metadata_version);
1623 if (ret < 0) {
1624 relayd_hang_up = 1;
1625 goto write_error;
1626 }
1627 stream->reset_metadata_flag = 0;
1628 }
1629 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1630 }
1631
1632 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1633 if (ret < 0) {
1634 relayd_hang_up = 1;
1635 goto write_error;
1636 }
1637 /* Use the returned socket. */
1638 outfd = ret;
1639
1640 /* Write metadata stream id before payload */
1641 if (stream->metadata_flag) {
1642 ret = write_relayd_metadata_id(outfd, stream, padding);
1643 if (ret < 0) {
1644 relayd_hang_up = 1;
1645 goto write_error;
1646 }
1647 }
1648
1649 write_len = subbuf_content_size;
1650 } else {
1651 /* No streaming; we have to write the full padding. */
1652 if (stream->metadata_flag && stream->reset_metadata_flag) {
1653 ret = utils_truncate_stream_file(stream->out_fd, 0);
1654 if (ret < 0) {
1655 ERR("Reset metadata file");
1656 goto end;
1657 }
1658 stream->reset_metadata_flag = 0;
1659 }
1660
1661 /*
1662 * Check if we need to change the tracefile before writing the packet.
1663 */
1664 if (stream->chan->tracefile_size > 0 &&
1665 (stream->tracefile_size_current + buffer->size) >
1666 stream->chan->tracefile_size) {
1667 ret = consumer_stream_rotate_output_files(stream);
1668 if (ret) {
1669 goto end;
1670 }
1671 outfd = stream->out_fd;
1672 orig_offset = 0;
1673 }
1674 stream->tracefile_size_current += buffer->size;
1675 write_len = buffer->size;
1676 }
1677
1678 /*
1679 * This call guarantee that len or less is returned. It's impossible to
1680 * receive a ret value that is bigger than len.
1681 */
1682 ret = lttng_write(outfd, buffer->data, write_len);
1683 DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
1684 if (ret < 0 || ((size_t) ret != write_len)) {
1685 /*
1686 * Report error to caller if nothing was written else at least send the
1687 * amount written.
1688 */
1689 if (ret < 0) {
1690 ret = -errno;
1691 }
1692 relayd_hang_up = 1;
1693
1694 /* Socket operation failed. We consider the relayd dead */
1695 if (errno == EPIPE) {
1696 /*
1697 * This is possible if the fd is closed on the other side
1698 * (outfd) or any write problem. It can be verbose a bit for a
1699 * normal execution if for instance the relayd is stopped
1700 * abruptly. This can happen so set this to a DBG statement.
1701 */
1702 DBG("Consumer mmap write detected relayd hang up");
1703 } else {
1704 /* Unhandled error, print it and stop function right now. */
1705 PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
1706 }
1707 goto write_error;
1708 }
1709 stream->output_written += ret;
1710
1711 /* This call is useless on a socket so better save a syscall. */
1712 if (!relayd) {
1713 /* This won't block, but will start writeout asynchronously */
1714 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
1715 stream->out_fd_offset += write_len;
1716 lttng_consumer_sync_trace_file(stream, orig_offset);
1717 }
1718
1719 write_error:
1720 /*
1721 * This is a special case that the relayd has closed its socket. Let's
1722 * cleanup the relayd object and all associated streams.
1723 */
1724 if (relayd && relayd_hang_up) {
1725 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
1726 lttng_consumer_cleanup_relayd(relayd);
1727 }
1728
1729 end:
1730 /* Unlock only if ctrl socket used */
1731 if (relayd && stream->metadata_flag) {
1732 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1733 }
1734
1735 return ret;
1736 }
1737
1738 /*
1739 * Splice the data from the ring buffer to the tracefile.
1740 *
1741 * It must be called with the stream lock held.
1742 *
1743 * Returns the number of bytes spliced.
1744 */
1745 ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
1746 struct lttng_consumer_stream *stream,
1747 unsigned long len,
1748 unsigned long padding)
1749 {
1750 ssize_t ret = 0, written = 0, ret_splice = 0;
1751 loff_t offset = 0;
1752 off_t orig_offset = stream->out_fd_offset;
1753 int fd = stream->wait_fd;
1754 /* Default is on the disk */
1755 int outfd = stream->out_fd;
1756 struct consumer_relayd_sock_pair *relayd = nullptr;
1757 int *splice_pipe;
1758 unsigned int relayd_hang_up = 0;
1759
1760 switch (the_consumer_data.type) {
1761 case LTTNG_CONSUMER_KERNEL:
1762 break;
1763 case LTTNG_CONSUMER32_UST:
1764 case LTTNG_CONSUMER64_UST:
1765 /* Not supported for user space tracing */
1766 return -ENOSYS;
1767 default:
1768 ERR("Unknown consumer_data type");
1769 abort();
1770 }
1771
1772 /* RCU lock for the relayd pointer */
1773 lttng::urcu::read_lock_guard read_lock;
1774
1775 /* Flag that the current stream if set for network streaming. */
1776 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1777 relayd = consumer_find_relayd(stream->net_seq_idx);
1778 if (relayd == nullptr) {
1779 written = -ret;
1780 goto end;
1781 }
1782 }
1783 splice_pipe = stream->splice_pipe;
1784
1785 /* Write metadata stream id before payload */
1786 if (relayd) {
1787 unsigned long total_len = len;
1788
1789 if (stream->metadata_flag) {
1790 /*
1791 * Lock the control socket for the complete duration of the function
1792 * since from this point on we will use the socket.
1793 */
1794 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1795
1796 if (stream->reset_metadata_flag) {
1797 ret = relayd_reset_metadata(&relayd->control_sock,
1798 stream->relayd_stream_id,
1799 stream->metadata_version);
1800 if (ret < 0) {
1801 relayd_hang_up = 1;
1802 goto write_error;
1803 }
1804 stream->reset_metadata_flag = 0;
1805 }
1806 ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
1807 if (ret < 0) {
1808 written = ret;
1809 relayd_hang_up = 1;
1810 goto write_error;
1811 }
1812
1813 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1814 }
1815
1816 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1817 if (ret < 0) {
1818 written = ret;
1819 relayd_hang_up = 1;
1820 goto write_error;
1821 }
1822 /* Use the returned socket. */
1823 outfd = ret;
1824 } else {
1825 /* No streaming, we have to set the len with the full padding */
1826 len += padding;
1827
1828 if (stream->metadata_flag && stream->reset_metadata_flag) {
1829 ret = utils_truncate_stream_file(stream->out_fd, 0);
1830 if (ret < 0) {
1831 ERR("Reset metadata file");
1832 goto end;
1833 }
1834 stream->reset_metadata_flag = 0;
1835 }
1836 /*
1837 * Check if we need to change the tracefile before writing the packet.
1838 */
1839 if (stream->chan->tracefile_size > 0 &&
1840 (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
1841 ret = consumer_stream_rotate_output_files(stream);
1842 if (ret < 0) {
1843 written = ret;
1844 goto end;
1845 }
1846 outfd = stream->out_fd;
1847 orig_offset = 0;
1848 }
1849 stream->tracefile_size_current += len;
1850 }
1851
1852 while (len > 0) {
1853 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1854 (unsigned long) offset,
1855 len,
1856 fd,
1857 splice_pipe[1]);
1858 ret_splice = splice(
1859 fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
1860 DBG("splice chan to pipe, ret %zd", ret_splice);
1861 if (ret_splice < 0) {
1862 ret = errno;
1863 written = -ret;
1864 PERROR("Error in relay splice");
1865 goto splice_error;
1866 }
1867
1868 /* Handle stream on the relayd if the output is on the network */
1869 if (relayd && stream->metadata_flag) {
1870 size_t metadata_payload_size =
1871 sizeof(struct lttcomm_relayd_metadata_payload);
1872
1873 /* Update counter to fit the spliced data */
1874 ret_splice += metadata_payload_size;
1875 len += metadata_payload_size;
1876 /*
1877 * We do this so the return value can match the len passed as
1878 * argument to this function.
1879 */
1880 written -= metadata_payload_size;
1881 }
1882
1883 /* Splice data out */
1884 ret_splice = splice(splice_pipe[0],
1885 nullptr,
1886 outfd,
1887 nullptr,
1888 ret_splice,
1889 SPLICE_F_MOVE | SPLICE_F_MORE);
1890 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
1891 if (ret_splice < 0) {
1892 ret = errno;
1893 written = -ret;
1894 relayd_hang_up = 1;
1895 goto write_error;
1896 } else if (ret_splice > len) {
1897 /*
1898 * We don't expect this code path to be executed but you never know
1899 * so this is an extra protection agains a buggy splice().
1900 */
1901 ret = errno;
1902 written += ret_splice;
1903 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
1904 goto splice_error;
1905 } else {
1906 /* All good, update current len and continue. */
1907 len -= ret_splice;
1908 }
1909
1910 /* This call is useless on a socket so better save a syscall. */
1911 if (!relayd) {
1912 /* This won't block, but will start writeout asynchronously */
1913 lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
1914 stream->out_fd_offset += ret_splice;
1915 }
1916 stream->output_written += ret_splice;
1917 written += ret_splice;
1918 }
1919 if (!relayd) {
1920 lttng_consumer_sync_trace_file(stream, orig_offset);
1921 }
1922 goto end;
1923
1924 write_error:
1925 /*
1926 * This is a special case that the relayd has closed its socket. Let's
1927 * cleanup the relayd object and all associated streams.
1928 */
1929 if (relayd && relayd_hang_up) {
1930 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
1931 lttng_consumer_cleanup_relayd(relayd);
1932 /* Skip splice error so the consumer does not fail */
1933 goto end;
1934 }
1935
1936 splice_error:
1937 /* send the appropriate error description to sessiond */
1938 switch (ret) {
1939 case EINVAL:
1940 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
1941 break;
1942 case ENOMEM:
1943 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
1944 break;
1945 case ESPIPE:
1946 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
1947 break;
1948 }
1949
1950 end:
1951 if (relayd && stream->metadata_flag) {
1952 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1953 }
1954
1955 return written;
1956 }
1957
1958 /*
1959 * Sample the snapshot positions for a specific fd
1960 *
1961 * Returns 0 on success, < 0 on error
1962 */
1963 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
1964 {
1965 switch (the_consumer_data.type) {
1966 case LTTNG_CONSUMER_KERNEL:
1967 return lttng_kconsumer_sample_snapshot_positions(stream);
1968 case LTTNG_CONSUMER32_UST:
1969 case LTTNG_CONSUMER64_UST:
1970 return lttng_ustconsumer_sample_snapshot_positions(stream);
1971 default:
1972 ERR("Unknown consumer_data type");
1973 abort();
1974 return -ENOSYS;
1975 }
1976 }
1977 /*
1978 * Take a snapshot for a specific fd
1979 *
1980 * Returns 0 on success, < 0 on error
1981 */
1982 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
1983 {
1984 switch (the_consumer_data.type) {
1985 case LTTNG_CONSUMER_KERNEL:
1986 return lttng_kconsumer_take_snapshot(stream);
1987 case LTTNG_CONSUMER32_UST:
1988 case LTTNG_CONSUMER64_UST:
1989 return lttng_ustconsumer_take_snapshot(stream);
1990 default:
1991 ERR("Unknown consumer_data type");
1992 abort();
1993 return -ENOSYS;
1994 }
1995 }
1996
1997 /*
1998 * Get the produced position
1999 *
2000 * Returns 0 on success, < 0 on error
2001 */
2002 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
2003 {
2004 switch (the_consumer_data.type) {
2005 case LTTNG_CONSUMER_KERNEL:
2006 return lttng_kconsumer_get_produced_snapshot(stream, pos);
2007 case LTTNG_CONSUMER32_UST:
2008 case LTTNG_CONSUMER64_UST:
2009 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
2010 default:
2011 ERR("Unknown consumer_data type");
2012 abort();
2013 return -ENOSYS;
2014 }
2015 }
2016
2017 /*
2018 * Get the consumed position (free-running counter position in bytes).
2019 *
2020 * Returns 0 on success, < 0 on error
2021 */
2022 int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
2023 {
2024 switch (the_consumer_data.type) {
2025 case LTTNG_CONSUMER_KERNEL:
2026 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2027 case LTTNG_CONSUMER32_UST:
2028 case LTTNG_CONSUMER64_UST:
2029 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2030 default:
2031 ERR("Unknown consumer_data type");
2032 abort();
2033 return -ENOSYS;
2034 }
2035 }
2036
2037 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
2038 int sock,
2039 struct pollfd *consumer_sockpoll)
2040 {
2041 switch (the_consumer_data.type) {
2042 case LTTNG_CONSUMER_KERNEL:
2043 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2044 case LTTNG_CONSUMER32_UST:
2045 case LTTNG_CONSUMER64_UST:
2046 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2047 default:
2048 ERR("Unknown consumer_data type");
2049 abort();
2050 return -ENOSYS;
2051 }
2052 }
2053
2054 static void lttng_consumer_close_all_metadata()
2055 {
2056 switch (the_consumer_data.type) {
2057 case LTTNG_CONSUMER_KERNEL:
2058 /*
2059 * The Kernel consumer has a different metadata scheme so we don't
2060 * close anything because the stream will be closed by the session
2061 * daemon.
2062 */
2063 break;
2064 case LTTNG_CONSUMER32_UST:
2065 case LTTNG_CONSUMER64_UST:
2066 /*
2067 * Close all metadata streams. The metadata hash table is passed and
2068 * this call iterates over it by closing all wakeup fd. This is safe
2069 * because at this point we are sure that the metadata producer is
2070 * either dead or blocked.
2071 */
2072 lttng_ustconsumer_close_all_metadata(metadata_ht);
2073 break;
2074 default:
2075 ERR("Unknown consumer_data type");
2076 abort();
2077 }
2078 }
2079
2080 /*
2081 * Clean up a metadata stream and free its memory.
2082 */
2083 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
2084 {
2085 struct lttng_consumer_channel *channel = nullptr;
2086 bool free_channel = false;
2087
2088 LTTNG_ASSERT(stream);
2089 /*
2090 * This call should NEVER receive regular stream. It must always be
2091 * metadata stream and this is crucial for data structure synchronization.
2092 */
2093 LTTNG_ASSERT(stream->metadata_flag);
2094
2095 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2096
2097 pthread_mutex_lock(&the_consumer_data.lock);
2098 /*
2099 * Note that this assumes that a stream's channel is never changed and
2100 * that the stream's lock doesn't need to be taken to sample its
2101 * channel.
2102 */
2103 channel = stream->chan;
2104 pthread_mutex_lock(&channel->lock);
2105 pthread_mutex_lock(&stream->lock);
2106 if (channel->metadata_cache) {
2107 /* Only applicable to userspace consumers. */
2108 pthread_mutex_lock(&channel->metadata_cache->lock);
2109 }
2110
2111 /* Remove any reference to that stream. */
2112 consumer_stream_delete(stream, ht);
2113
2114 /* Close down everything including the relayd if one. */
2115 consumer_stream_close_output(stream);
2116 /* Destroy tracer buffers of the stream. */
2117 consumer_stream_destroy_buffers(stream);
2118
2119 /* Atomically decrement channel refcount since other threads can use it. */
2120 if (!uatomic_sub_return(&channel->refcount, 1) &&
2121 !uatomic_read(&channel->nb_init_stream_left)) {
2122 /* Go for channel deletion! */
2123 free_channel = true;
2124 }
2125 stream->chan = nullptr;
2126
2127 /*
2128 * Nullify the stream reference so it is not used after deletion. The
2129 * channel lock MUST be acquired before being able to check for a NULL
2130 * pointer value.
2131 */
2132 channel->metadata_stream = nullptr;
2133
2134 if (channel->metadata_cache) {
2135 pthread_mutex_unlock(&channel->metadata_cache->lock);
2136 }
2137 pthread_mutex_unlock(&stream->lock);
2138 pthread_mutex_unlock(&channel->lock);
2139 pthread_mutex_unlock(&the_consumer_data.lock);
2140
2141 if (free_channel) {
2142 consumer_del_channel(channel);
2143 }
2144
2145 lttng_trace_chunk_put(stream->trace_chunk);
2146 stream->trace_chunk = nullptr;
2147 consumer_stream_free(stream);
2148 }
2149
2150 /*
2151 * Action done with the metadata stream when adding it to the consumer internal
2152 * data structures to handle it.
2153 */
2154 void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
2155 {
2156 struct lttng_ht *ht = metadata_ht;
2157 struct lttng_ht_iter iter;
2158 struct lttng_ht_node_u64 *node;
2159
2160 LTTNG_ASSERT(stream);
2161 LTTNG_ASSERT(ht);
2162
2163 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
2164
2165 pthread_mutex_lock(&the_consumer_data.lock);
2166 pthread_mutex_lock(&stream->chan->lock);
2167 pthread_mutex_lock(&stream->chan->timer_lock);
2168 pthread_mutex_lock(&stream->lock);
2169
2170 /*
2171 * From here, refcounts are updated so be _careful_ when returning an error
2172 * after this point.
2173 */
2174
2175 lttng::urcu::read_lock_guard read_lock;
2176
2177 /*
2178 * Lookup the stream just to make sure it does not exist in our internal
2179 * state. This should NEVER happen.
2180 */
2181 lttng_ht_lookup(ht, &stream->key, &iter);
2182 node = lttng_ht_iter_get_node_u64(&iter);
2183 LTTNG_ASSERT(!node);
2184
2185 /*
2186 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2187 * in terms of destroying the associated channel, because the action that
2188 * causes the count to become 0 also causes a stream to be added. The
2189 * channel deletion will thus be triggered by the following removal of this
2190 * stream.
2191 */
2192 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
2193 /* Increment refcount before decrementing nb_init_stream_left */
2194 cmm_smp_wmb();
2195 uatomic_dec(&stream->chan->nb_init_stream_left);
2196 }
2197
2198 lttng_ht_add_unique_u64(ht, &stream->node);
2199
2200 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
2201
2202 /*
2203 * Add stream to the stream_list_ht of the consumer data. No need to steal
2204 * the key since the HT does not use it and we allow to add redundant keys
2205 * into this table.
2206 */
2207 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
2208
2209 pthread_mutex_unlock(&stream->lock);
2210 pthread_mutex_unlock(&stream->chan->lock);
2211 pthread_mutex_unlock(&stream->chan->timer_lock);
2212 pthread_mutex_unlock(&the_consumer_data.lock);
2213 }
2214
2215 /*
2216 * Delete data stream that are flagged for deletion (endpoint_status).
2217 */
2218 static void validate_endpoint_status_data_stream()
2219 {
2220 struct lttng_ht_iter iter;
2221 struct lttng_consumer_stream *stream;
2222
2223 DBG("Consumer delete flagged data stream");
2224
2225 {
2226 lttng::urcu::read_lock_guard read_lock;
2227
2228 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
2229 /* Validate delete flag of the stream */
2230 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2231 continue;
2232 }
2233 /* Delete it right now */
2234 consumer_del_stream(stream, data_ht);
2235 }
2236 }
2237 }
2238
2239 /*
2240 * Delete metadata stream that are flagged for deletion (endpoint_status).
2241 */
2242 static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
2243 {
2244 struct lttng_ht_iter iter;
2245 struct lttng_consumer_stream *stream;
2246
2247 DBG("Consumer delete flagged metadata stream");
2248
2249 LTTNG_ASSERT(pollset);
2250
2251 {
2252 lttng::urcu::read_lock_guard read_lock;
2253 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
2254 /* Validate delete flag of the stream */
2255 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2256 continue;
2257 }
2258 /*
2259 * Remove from pollset so the metadata thread can continue without
2260 * blocking on a deleted stream.
2261 */
2262 lttng_poll_del(pollset, stream->wait_fd);
2263
2264 /* Delete it right now */
2265 consumer_del_metadata_stream(stream, metadata_ht);
2266 }
2267 }
2268 }
2269
2270 /*
2271 * Thread polls on metadata file descriptor and write them on disk or on the
2272 * network.
2273 */
2274 void *consumer_thread_metadata_poll(void *data)
2275 {
2276 int ret, i, pollfd, err = -1;
2277 uint32_t revents, nb_fd;
2278 struct lttng_consumer_stream *stream = nullptr;
2279 struct lttng_ht_iter iter;
2280 struct lttng_ht_node_u64 *node;
2281 struct lttng_poll_event events;
2282 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2283 ssize_t len;
2284
2285 rcu_register_thread();
2286
2287 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2288
2289 if (testpoint(consumerd_thread_metadata)) {
2290 goto error_testpoint;
2291 }
2292
2293 health_code_update();
2294
2295 DBG("Thread metadata poll started");
2296
2297 /* Size is set to 1 for the consumer_metadata pipe */
2298 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2299 if (ret < 0) {
2300 ERR("Poll set creation failed");
2301 goto end_poll;
2302 }
2303
2304 ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
2305 if (ret < 0) {
2306 goto end;
2307 }
2308
2309 /* Main loop */
2310 DBG("Metadata main loop started");
2311
2312 while (true) {
2313 restart:
2314 health_code_update();
2315 health_poll_entry();
2316 DBG("Metadata poll wait");
2317 ret = lttng_poll_wait(&events, -1);
2318 DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2319 health_poll_exit();
2320 DBG("Metadata event caught in thread");
2321 if (ret < 0) {
2322 if (errno == EINTR) {
2323 ERR("Poll EINTR caught");
2324 goto restart;
2325 }
2326 if (LTTNG_POLL_GETNB(&events) == 0) {
2327 err = 0; /* All is OK */
2328 }
2329 goto end;
2330 }
2331
2332 nb_fd = ret;
2333
2334 /* From here, the event is a metadata wait fd */
2335 for (i = 0; i < nb_fd; i++) {
2336 health_code_update();
2337
2338 revents = LTTNG_POLL_GETEV(&events, i);
2339 pollfd = LTTNG_POLL_GETFD(&events, i);
2340
2341 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
2342 if (revents & LPOLLIN) {
2343 ssize_t pipe_len;
2344
2345 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2346 &stream,
2347 sizeof(stream)); /* NOLINT sizeof
2348 used on a
2349 pointer. */
2350 if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
2351 pointer. */
2352 if (pipe_len < 0) {
2353 PERROR("read metadata stream");
2354 }
2355 /*
2356 * Remove the pipe from the poll set and continue
2357 * the loop since their might be data to consume.
2358 */
2359 lttng_poll_del(
2360 &events,
2361 lttng_pipe_get_readfd(
2362 ctx->consumer_metadata_pipe));
2363 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2364 continue;
2365 }
2366
2367 /* A NULL stream means that the state has changed. */
2368 if (stream == nullptr) {
2369 /* Check for deleted streams. */
2370 validate_endpoint_status_metadata_stream(&events);
2371 goto restart;
2372 }
2373
2374 DBG("Adding metadata stream %d to poll set",
2375 stream->wait_fd);
2376
2377 /* Add metadata stream to the global poll events list */
2378 lttng_poll_add(
2379 &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
2380 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2381 DBG("Metadata thread pipe hung up");
2382 /*
2383 * Remove the pipe from the poll set and continue the loop
2384 * since their might be data to consume.
2385 */
2386 lttng_poll_del(
2387 &events,
2388 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2389 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2390 continue;
2391 } else {
2392 ERR("Unexpected poll events %u for sock %d",
2393 revents,
2394 pollfd);
2395 goto end;
2396 }
2397
2398 /* Handle other stream */
2399 continue;
2400 }
2401
2402 lttng::urcu::read_lock_guard read_lock;
2403 {
2404 uint64_t tmp_id = (uint64_t) pollfd;
2405
2406 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2407 }
2408 node = lttng_ht_iter_get_node_u64(&iter);
2409 LTTNG_ASSERT(node);
2410
2411 stream = caa_container_of(node, struct lttng_consumer_stream, node);
2412
2413 if (revents & (LPOLLIN | LPOLLPRI)) {
2414 /* Get the data out of the metadata file descriptor */
2415 DBG("Metadata available on fd %d", pollfd);
2416 LTTNG_ASSERT(stream->wait_fd == pollfd);
2417
2418 do {
2419 health_code_update();
2420
2421 len = ctx->on_buffer_ready(stream, ctx, false);
2422 /*
2423 * We don't check the return value here since if we get
2424 * a negative len, it means an error occurred thus we
2425 * simply remove it from the poll set and free the
2426 * stream.
2427 */
2428 } while (len > 0);
2429
2430 /* It's ok to have an unavailable sub-buffer */
2431 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2432 /* Clean up stream from consumer and free it. */
2433 lttng_poll_del(&events, stream->wait_fd);
2434 consumer_del_metadata_stream(stream, metadata_ht);
2435 }
2436 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2437 DBG("Metadata fd %d is hup|err.", pollfd);
2438 if (!stream->hangup_flush_done &&
2439 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2440 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
2441 DBG("Attempting to flush and consume the UST buffers");
2442 lttng_ustconsumer_on_stream_hangup(stream);
2443
2444 /* We just flushed the stream now read it. */
2445 do {
2446 health_code_update();
2447
2448 len = ctx->on_buffer_ready(stream, ctx, false);
2449 /*
2450 * We don't check the return value here since if we
2451 * get a negative len, it means an error occurred
2452 * thus we simply remove it from the poll set and
2453 * free the stream.
2454 */
2455 } while (len > 0);
2456 }
2457
2458 lttng_poll_del(&events, stream->wait_fd);
2459 /*
2460 * This call update the channel states, closes file descriptors
2461 * and securely free the stream.
2462 */
2463 consumer_del_metadata_stream(stream, metadata_ht);
2464 } else {
2465 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2466 goto end;
2467 }
2468 /* Release RCU lock for the stream looked up */
2469 }
2470 }
2471
2472 /* All is OK */
2473 err = 0;
2474 end:
2475 DBG("Metadata poll thread exiting");
2476
2477 lttng_poll_clean(&events);
2478 end_poll:
2479 error_testpoint:
2480 if (err) {
2481 health_error();
2482 ERR("Health error occurred in %s", __func__);
2483 }
2484 health_unregister(health_consumerd);
2485 rcu_unregister_thread();
2486 return nullptr;
2487 }
2488
2489 /*
2490 * This thread polls the fds in the set to consume the data and write
2491 * it to tracefile if necessary.
2492 */
2493 void *consumer_thread_data_poll(void *data)
2494 {
2495 int num_rdy, high_prio, ret, i, err = -1;
2496 struct pollfd *pollfd = nullptr;
2497 /* local view of the streams */
2498 struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
2499 /* local view of consumer_data.fds_count */
2500 int nb_fd = 0;
2501 /* 2 for the consumer_data_pipe and wake up pipe */
2502 const int nb_pipes_fd = 2;
2503 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2504 int nb_inactive_fd = 0;
2505 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2506 ssize_t len;
2507
2508 rcu_register_thread();
2509
2510 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2511
2512 if (testpoint(consumerd_thread_data)) {
2513 goto error_testpoint;
2514 }
2515
2516 health_code_update();
2517
2518 local_stream = zmalloc<lttng_consumer_stream *>();
2519 if (local_stream == nullptr) {
2520 PERROR("local_stream malloc");
2521 goto end;
2522 }
2523
2524 while (true) {
2525 health_code_update();
2526
2527 high_prio = 0;
2528
2529 /*
2530 * the fds set has been updated, we need to update our
2531 * local array as well
2532 */
2533 pthread_mutex_lock(&the_consumer_data.lock);
2534 if (the_consumer_data.need_update) {
2535 free(pollfd);
2536 pollfd = nullptr;
2537
2538 free(local_stream);
2539 local_stream = nullptr;
2540
2541 /* Allocate for all fds */
2542 pollfd =
2543 calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
2544 if (pollfd == nullptr) {
2545 PERROR("pollfd malloc");
2546 pthread_mutex_unlock(&the_consumer_data.lock);
2547 goto end;
2548 }
2549
2550 local_stream = calloc<lttng_consumer_stream *>(
2551 the_consumer_data.stream_count + nb_pipes_fd);
2552 if (local_stream == nullptr) {
2553 PERROR("local_stream malloc");
2554 pthread_mutex_unlock(&the_consumer_data.lock);
2555 goto end;
2556 }
2557 ret = update_poll_array(
2558 ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
2559 if (ret < 0) {
2560 ERR("Error in allocating pollfd or local_outfds");
2561 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2562 pthread_mutex_unlock(&the_consumer_data.lock);
2563 goto end;
2564 }
2565 nb_fd = ret;
2566 the_consumer_data.need_update = 0;
2567 }
2568 pthread_mutex_unlock(&the_consumer_data.lock);
2569
2570 /* No FDs and consumer_quit, consumer_cleanup the thread */
2571 if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
2572 err = 0; /* All is OK */
2573 goto end;
2574 }
2575 /* poll on the array of fds */
2576 restart:
2577 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
2578 if (testpoint(consumerd_thread_data_poll)) {
2579 goto end;
2580 }
2581 health_poll_entry();
2582 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
2583 health_poll_exit();
2584 DBG("poll num_rdy : %d", num_rdy);
2585 if (num_rdy == -1) {
2586 /*
2587 * Restart interrupted system call.
2588 */
2589 if (errno == EINTR) {
2590 goto restart;
2591 }
2592 PERROR("Poll error");
2593 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2594 goto end;
2595 } else if (num_rdy == 0) {
2596 DBG("Polling thread timed out");
2597 goto end;
2598 }
2599
2600 if (caa_unlikely(data_consumption_paused)) {
2601 DBG("Data consumption paused, sleeping...");
2602 sleep(1);
2603 goto restart;
2604 }
2605
2606 /*
2607 * If the consumer_data_pipe triggered poll go directly to the
2608 * beginning of the loop to update the array. We want to prioritize
2609 * array update over low-priority reads.
2610 */
2611 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2612 ssize_t pipe_readlen;
2613
2614 DBG("consumer_data_pipe wake up");
2615 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2616 &new_stream,
2617 sizeof(new_stream)); /* NOLINT sizeof used on
2618 a pointer. */
2619 if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
2620 */
2621 PERROR("Consumer data pipe");
2622 /* Continue so we can at least handle the current stream(s). */
2623 continue;
2624 }
2625
2626 /*
2627 * If the stream is NULL, just ignore it. It's also possible that
2628 * the sessiond poll thread changed the consumer_quit state and is
2629 * waking us up to test it.
2630 */
2631 if (new_stream == nullptr) {
2632 validate_endpoint_status_data_stream();
2633 continue;
2634 }
2635
2636 /* Continue to update the local streams and handle prio ones */
2637 continue;
2638 }
2639
2640 /* Handle wakeup pipe. */
2641 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2642 char dummy;
2643 ssize_t pipe_readlen;
2644
2645 pipe_readlen =
2646 lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
2647 if (pipe_readlen < 0) {
2648 PERROR("Consumer data wakeup pipe");
2649 }
2650 /* We've been awakened to handle stream(s). */
2651 ctx->has_wakeup = 0;
2652 }
2653
2654 /* Take care of high priority channels first. */
2655 for (i = 0; i < nb_fd; i++) {
2656 health_code_update();
2657
2658 if (local_stream[i] == nullptr) {
2659 continue;
2660 }
2661 if (pollfd[i].revents & POLLPRI) {
2662 DBG("Urgent read on fd %d", pollfd[i].fd);
2663 high_prio = 1;
2664 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
2665 /* it's ok to have an unavailable sub-buffer */
2666 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2667 /* Clean the stream and free it. */
2668 consumer_del_stream(local_stream[i], data_ht);
2669 local_stream[i] = nullptr;
2670 } else if (len > 0) {
2671 local_stream[i]->has_data_left_to_be_read_before_teardown =
2672 1;
2673 }
2674 }
2675 }
2676
2677 /*
2678 * If we read high prio channel in this loop, try again
2679 * for more high prio data.
2680 */
2681 if (high_prio) {
2682 continue;
2683 }
2684
2685 /* Take care of low priority channels. */
2686 for (i = 0; i < nb_fd; i++) {
2687 health_code_update();
2688
2689 if (local_stream[i] == nullptr) {
2690 continue;
2691 }
2692 if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
2693 local_stream[i]->has_data) {
2694 DBG("Normal read on fd %d", pollfd[i].fd);
2695 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
2696 /* it's ok to have an unavailable sub-buffer */
2697 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2698 /* Clean the stream and free it. */
2699 consumer_del_stream(local_stream[i], data_ht);
2700 local_stream[i] = nullptr;
2701 } else if (len > 0) {
2702 local_stream[i]->has_data_left_to_be_read_before_teardown =
2703 1;
2704 }
2705 }
2706 }
2707
2708 /* Handle hangup and errors */
2709 for (i = 0; i < nb_fd; i++) {
2710 health_code_update();
2711
2712 if (local_stream[i] == nullptr) {
2713 continue;
2714 }
2715 if (!local_stream[i]->hangup_flush_done &&
2716 (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
2717 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2718 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
2719 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2720 pollfd[i].fd);
2721 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2722 /* Attempt read again, for the data we just flushed. */
2723 local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
2724 }
2725 /*
2726 * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
2727 * performed. This type of flush ensures that a new packet is produced no
2728 * matter the consumed/produced positions are.
2729 *
2730 * This, in turn, causes the next pass to see that data available for the
2731 * stream. When we come back here, we can be assured that all available
2732 * data has been consumed and we can finally destroy the stream.
2733 *
2734 * If the poll flag is HUP/ERR/NVAL and we have
2735 * read no data in this pass, we can remove the
2736 * stream from its hash table.
2737 */
2738 if ((pollfd[i].revents & POLLHUP)) {
2739 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2740 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2741 consumer_del_stream(local_stream[i], data_ht);
2742 local_stream[i] = nullptr;
2743 }
2744 } else if (pollfd[i].revents & POLLERR) {
2745 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2746 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2747 consumer_del_stream(local_stream[i], data_ht);
2748 local_stream[i] = nullptr;
2749 }
2750 } else if (pollfd[i].revents & POLLNVAL) {
2751 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2752 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2753 consumer_del_stream(local_stream[i], data_ht);
2754 local_stream[i] = nullptr;
2755 }
2756 }
2757 if (local_stream[i] != nullptr) {
2758 local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
2759 }
2760 }
2761 }
2762 /* All is OK */
2763 err = 0;
2764 end:
2765 DBG("polling thread exiting");
2766 free(pollfd);
2767 free(local_stream);
2768
2769 /*
2770 * Close the write side of the pipe so epoll_wait() in
2771 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2772 * read side of the pipe. If we close them both, epoll_wait strangely does
2773 * not return and could create a endless wait period if the pipe is the
2774 * only tracked fd in the poll set. The thread will take care of closing
2775 * the read side.
2776 */
2777 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
2778
2779 error_testpoint:
2780 if (err) {
2781 health_error();
2782 ERR("Health error occurred in %s", __func__);
2783 }
2784 health_unregister(health_consumerd);
2785
2786 rcu_unregister_thread();
2787 return nullptr;
2788 }
2789
2790 /*
2791 * Close wake-up end of each stream belonging to the channel. This will
2792 * allow the poll() on the stream read-side to detect when the
2793 * write-side (application) finally closes them.
2794 */
2795 static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2796 {
2797 struct lttng_ht *ht;
2798 struct lttng_consumer_stream *stream;
2799 struct lttng_ht_iter iter;
2800
2801 ht = the_consumer_data.stream_per_chan_id_ht;
2802
2803 lttng::urcu::read_lock_guard read_lock;
2804 cds_lfht_for_each_entry_duplicate(ht->ht,
2805 ht->hash_fct(&channel->key, lttng_ht_seed),
2806 ht->match_fct,
2807 &channel->key,
2808 &iter.iter,
2809 stream,
2810 node_channel_id.node)
2811 {
2812 /*
2813 * Protect against teardown with mutex.
2814 */
2815 pthread_mutex_lock(&stream->lock);
2816 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2817 goto next;
2818 }
2819 switch (the_consumer_data.type) {
2820 case LTTNG_CONSUMER_KERNEL:
2821 break;
2822 case LTTNG_CONSUMER32_UST:
2823 case LTTNG_CONSUMER64_UST:
2824 if (stream->metadata_flag) {
2825 /* Safe and protected by the stream lock. */
2826 lttng_ustconsumer_close_metadata(stream->chan);
2827 } else {
2828 /*
2829 * Note: a mutex is taken internally within
2830 * liblttng-ust-ctl to protect timer wakeup_fd
2831 * use from concurrent close.
2832 */
2833 lttng_ustconsumer_close_stream_wakeup(stream);
2834 }
2835 break;
2836 default:
2837 ERR("Unknown consumer_data type");
2838 abort();
2839 }
2840 next:
2841 pthread_mutex_unlock(&stream->lock);
2842 }
2843 }
2844
2845 static void destroy_channel_ht(struct lttng_ht *ht)
2846 {
2847 struct lttng_ht_iter iter;
2848 struct lttng_consumer_channel *channel;
2849 int ret;
2850
2851 if (ht == nullptr) {
2852 return;
2853 }
2854
2855 {
2856 lttng::urcu::read_lock_guard read_lock;
2857
2858 cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
2859 ret = lttng_ht_del(ht, &iter);
2860 LTTNG_ASSERT(ret != 0);
2861 }
2862 }
2863
2864 lttng_ht_destroy(ht);
2865 }
2866
2867 /*
2868 * This thread polls the channel fds to detect when they are being
2869 * closed. It closes all related streams if the channel is detected as
2870 * closed. It is currently only used as a shim layer for UST because the
2871 * consumerd needs to keep the per-stream wakeup end of pipes open for
2872 * periodical flush.
2873 */
2874 void *consumer_thread_channel_poll(void *data)
2875 {
2876 int ret, i, pollfd, err = -1;
2877 uint32_t revents, nb_fd;
2878 struct lttng_consumer_channel *chan = nullptr;
2879 struct lttng_ht_iter iter;
2880 struct lttng_ht_node_u64 *node;
2881 struct lttng_poll_event events;
2882 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2883 struct lttng_ht *channel_ht;
2884
2885 rcu_register_thread();
2886
2887 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2888
2889 if (testpoint(consumerd_thread_channel)) {
2890 goto error_testpoint;
2891 }
2892
2893 health_code_update();
2894
2895 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2896 if (!channel_ht) {
2897 /* ENOMEM at this point. Better to bail out. */
2898 goto end_ht;
2899 }
2900
2901 DBG("Thread channel poll started");
2902
2903 /* Size is set to 1 for the consumer_channel pipe */
2904 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2905 if (ret < 0) {
2906 ERR("Poll set creation failed");
2907 goto end_poll;
2908 }
2909
2910 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2911 if (ret < 0) {
2912 goto end;
2913 }
2914
2915 /* Main loop */
2916 DBG("Channel main loop started");
2917
2918 while (true) {
2919 restart:
2920 health_code_update();
2921 DBG("Channel poll wait");
2922 health_poll_entry();
2923 ret = lttng_poll_wait(&events, -1);
2924 DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2925 health_poll_exit();
2926 DBG("Channel event caught in thread");
2927 if (ret < 0) {
2928 if (errno == EINTR) {
2929 ERR("Poll EINTR caught");
2930 goto restart;
2931 }
2932 if (LTTNG_POLL_GETNB(&events) == 0) {
2933 err = 0; /* All is OK */
2934 }
2935 goto end;
2936 }
2937
2938 nb_fd = ret;
2939
2940 /* From here, the event is a channel wait fd */
2941 for (i = 0; i < nb_fd; i++) {
2942 health_code_update();
2943
2944 revents = LTTNG_POLL_GETEV(&events, i);
2945 pollfd = LTTNG_POLL_GETFD(&events, i);
2946
2947 if (pollfd == ctx->consumer_channel_pipe[0]) {
2948 if (revents & LPOLLIN) {
2949 enum consumer_channel_action action;
2950 uint64_t key;
2951
2952 ret = read_channel_pipe(ctx, &chan, &key, &action);
2953 if (ret <= 0) {
2954 if (ret < 0) {
2955 ERR("Error reading channel pipe");
2956 }
2957 lttng_poll_del(&events,
2958 ctx->consumer_channel_pipe[0]);
2959 continue;
2960 }
2961
2962 switch (action) {
2963 case CONSUMER_CHANNEL_ADD:
2964 {
2965 DBG("Adding channel %d to poll set", chan->wait_fd);
2966
2967 lttng_ht_node_init_u64(&chan->wait_fd_node,
2968 chan->wait_fd);
2969 lttng::urcu::read_lock_guard read_lock;
2970 lttng_ht_add_unique_u64(channel_ht,
2971 &chan->wait_fd_node);
2972 /* Add channel to the global poll events list */
2973 // FIXME: Empty flag on a pipe pollset, this might
2974 // hang on FreeBSD.
2975 lttng_poll_add(&events, chan->wait_fd, 0);
2976 break;
2977 }
2978 case CONSUMER_CHANNEL_DEL:
2979 {
2980 /*
2981 * This command should never be called if the
2982 * channel has streams monitored by either the data
2983 * or metadata thread. The consumer only notify this
2984 * thread with a channel del. command if it receives
2985 * a destroy channel command from the session daemon
2986 * that send it if a command prior to the
2987 * GET_CHANNEL failed.
2988 */
2989
2990 lttng::urcu::read_lock_guard read_lock;
2991 chan = consumer_find_channel(key);
2992 if (!chan) {
2993 ERR("UST consumer get channel key %" PRIu64
2994 " not found for del channel",
2995 key);
2996 break;
2997 }
2998 lttng_poll_del(&events, chan->wait_fd);
2999 iter.iter.node = &chan->wait_fd_node.node;
3000 ret = lttng_ht_del(channel_ht, &iter);
3001 LTTNG_ASSERT(ret == 0);
3002
3003 switch (the_consumer_data.type) {
3004 case LTTNG_CONSUMER_KERNEL:
3005 break;
3006 case LTTNG_CONSUMER32_UST:
3007 case LTTNG_CONSUMER64_UST:
3008 health_code_update();
3009 /* Destroy streams that might have been left
3010 * in the stream list. */
3011 clean_channel_stream_list(chan);
3012 break;
3013 default:
3014 ERR("Unknown consumer_data type");
3015 abort();
3016 }
3017
3018 /*
3019 * Release our own refcount. Force channel deletion
3020 * even if streams were not initialized.
3021 */
3022 if (!uatomic_sub_return(&chan->refcount, 1)) {
3023 consumer_del_channel(chan);
3024 }
3025 goto restart;
3026 }
3027 case CONSUMER_CHANNEL_QUIT:
3028 /*
3029 * Remove the pipe from the poll set and continue
3030 * the loop since their might be data to consume.
3031 */
3032 lttng_poll_del(&events,
3033 ctx->consumer_channel_pipe[0]);
3034 continue;
3035 default:
3036 ERR("Unknown action");
3037 break;
3038 }
3039 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3040 DBG("Channel thread pipe hung up");
3041 /*
3042 * Remove the pipe from the poll set and continue the loop
3043 * since their might be data to consume.
3044 */
3045 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3046 continue;
3047 } else {
3048 ERR("Unexpected poll events %u for sock %d",
3049 revents,
3050 pollfd);
3051 goto end;
3052 }
3053
3054 /* Handle other stream */
3055 continue;
3056 }
3057
3058 lttng::urcu::read_lock_guard read_lock;
3059 {
3060 uint64_t tmp_id = (uint64_t) pollfd;
3061
3062 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3063 }
3064 node = lttng_ht_iter_get_node_u64(&iter);
3065 LTTNG_ASSERT(node);
3066
3067 chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
3068
3069 /* Check for error event */
3070 if (revents & (LPOLLERR | LPOLLHUP)) {
3071 DBG("Channel fd %d is hup|err.", pollfd);
3072
3073 lttng_poll_del(&events, chan->wait_fd);
3074 ret = lttng_ht_del(channel_ht, &iter);
3075 LTTNG_ASSERT(ret == 0);
3076
3077 /*
3078 * This will close the wait fd for each stream associated to
3079 * this channel AND monitored by the data/metadata thread thus
3080 * will be clean by the right thread.
3081 */
3082 consumer_close_channel_streams(chan);
3083
3084 /* Release our own refcount */
3085 if (!uatomic_sub_return(&chan->refcount, 1) &&
3086 !uatomic_read(&chan->nb_init_stream_left)) {
3087 consumer_del_channel(chan);
3088 }
3089 } else {
3090 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3091 goto end;
3092 }
3093
3094 /* Release RCU lock for the channel looked up */
3095 }
3096 }
3097
3098 /* All is OK */
3099 err = 0;
3100 end:
3101 lttng_poll_clean(&events);
3102 end_poll:
3103 destroy_channel_ht(channel_ht);
3104 end_ht:
3105 error_testpoint:
3106 DBG("Channel poll thread exiting");
3107 if (err) {
3108 health_error();
3109 ERR("Health error occurred in %s", __func__);
3110 }
3111 health_unregister(health_consumerd);
3112 rcu_unregister_thread();
3113 return nullptr;
3114 }
3115
3116 static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
3117 struct pollfd *sockpoll,
3118 int client_socket)
3119 {
3120 int ret;
3121
3122 LTTNG_ASSERT(ctx);
3123 LTTNG_ASSERT(sockpoll);
3124
3125 ret = lttng_consumer_poll_socket(sockpoll);
3126 if (ret) {
3127 goto error;
3128 }
3129 DBG("Metadata connection on client_socket");
3130
3131 /* Blocking call, waiting for transmission */
3132 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3133 if (ctx->consumer_metadata_socket < 0) {
3134 WARN("On accept metadata");
3135 ret = -1;
3136 goto error;
3137 }
3138 ret = 0;
3139
3140 error:
3141 return ret;
3142 }
3143
3144 /*
3145 * This thread listens on the consumerd socket and receives the file
3146 * descriptors from the session daemon.
3147 */
3148 void *consumer_thread_sessiond_poll(void *data)
3149 {
3150 int sock = -1, client_socket, ret, err = -1;
3151 /*
3152 * structure to poll for incoming data on communication socket avoids
3153 * making blocking sockets.
3154 */
3155 struct pollfd consumer_sockpoll[2];
3156 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
3157
3158 rcu_register_thread();
3159
3160 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3161
3162 if (testpoint(consumerd_thread_sessiond)) {
3163 goto error_testpoint;
3164 }
3165
3166 health_code_update();
3167
3168 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3169 unlink(ctx->consumer_command_sock_path);
3170 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3171 if (client_socket < 0) {
3172 ERR("Cannot create command socket");
3173 goto end;
3174 }
3175
3176 ret = lttcomm_listen_unix_sock(client_socket);
3177 if (ret < 0) {
3178 goto end;
3179 }
3180
3181 DBG("Sending ready command to lttng-sessiond");
3182 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3183 /* return < 0 on error, but == 0 is not fatal */
3184 if (ret < 0) {
3185 ERR("Error sending ready command to lttng-sessiond");
3186 goto end;
3187 }
3188
3189 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3190 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3191 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3192 consumer_sockpoll[1].fd = client_socket;
3193 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3194
3195 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3196 if (ret) {
3197 if (ret > 0) {
3198 /* should exit */
3199 err = 0;
3200 }
3201 goto end;
3202 }
3203 DBG("Connection on client_socket");
3204
3205 /* Blocking call, waiting for transmission */
3206 sock = lttcomm_accept_unix_sock(client_socket);
3207 if (sock < 0) {
3208 WARN("On accept");
3209 goto end;
3210 }
3211
3212 /*
3213 * Setup metadata socket which is the second socket connection on the
3214 * command unix socket.
3215 */
3216 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
3217 if (ret) {
3218 if (ret > 0) {
3219 /* should exit */
3220 err = 0;
3221 }
3222 goto end;
3223 }
3224
3225 /* This socket is not useful anymore. */
3226 ret = close(client_socket);
3227 if (ret < 0) {
3228 PERROR("close client_socket");
3229 }
3230 client_socket = -1;
3231
3232 /* update the polling structure to poll on the established socket */
3233 consumer_sockpoll[1].fd = sock;
3234 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3235
3236 while (true) {
3237 health_code_update();
3238
3239 health_poll_entry();
3240 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3241 health_poll_exit();
3242 if (ret) {
3243 if (ret > 0) {
3244 /* should exit */
3245 err = 0;
3246 }
3247 goto end;
3248 }
3249 DBG("Incoming command on sock");
3250 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
3251 if (ret <= 0) {
3252 /*
3253 * This could simply be a session daemon quitting. Don't output
3254 * ERR() here.
3255 */
3256 DBG("Communication interrupted on command socket");
3257 err = 0;
3258 goto end;
3259 }
3260 if (CMM_LOAD_SHARED(consumer_quit)) {
3261 DBG("consumer_thread_receive_fds received quit from signal");
3262 err = 0; /* All is OK */
3263 goto end;
3264 }
3265 DBG("Received command on sock");
3266 }
3267 /* All is OK */
3268 err = 0;
3269
3270 end:
3271 DBG("Consumer thread sessiond poll exiting");
3272
3273 /*
3274 * Close metadata streams since the producer is the session daemon which
3275 * just died.
3276 *
3277 * NOTE: for now, this only applies to the UST tracer.
3278 */
3279 lttng_consumer_close_all_metadata();
3280
3281 /*
3282 * when all fds have hung up, the polling thread
3283 * can exit cleanly
3284 */
3285 CMM_STORE_SHARED(consumer_quit, 1);
3286
3287 /*
3288 * Notify the data poll thread to poll back again and test the
3289 * consumer_quit state that we just set so to quit gracefully.
3290 */
3291 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
3292
3293 notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
3294
3295 notify_health_quit_pipe(health_quit_pipe);
3296
3297 /* Cleaning up possibly open sockets. */
3298 if (sock >= 0) {
3299 ret = close(sock);
3300 if (ret < 0) {
3301 PERROR("close sock sessiond poll");
3302 }
3303 }
3304 if (client_socket >= 0) {
3305 ret = close(client_socket);
3306 if (ret < 0) {
3307 PERROR("close client_socket sessiond poll");
3308 }
3309 }
3310
3311 error_testpoint:
3312 if (err) {
3313 health_error();
3314 ERR("Health error occurred in %s", __func__);
3315 }
3316 health_unregister(health_consumerd);
3317
3318 rcu_unregister_thread();
3319 return nullptr;
3320 }
3321
3322 static int post_consume(struct lttng_consumer_stream *stream,
3323 const struct stream_subbuffer *subbuffer,
3324 struct lttng_consumer_local_data *ctx)
3325 {
3326 size_t i;
3327 int ret = 0;
3328 const size_t count =
3329 lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
3330
3331 for (i = 0; i < count; i++) {
3332 const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
3333 &stream->read_subbuffer_ops.post_consume_cbs, i);
3334
3335 ret = op(stream, subbuffer, ctx);
3336 if (ret) {
3337 goto end;
3338 }
3339 }
3340 end:
3341 return ret;
3342 }
3343
3344 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
3345 struct lttng_consumer_local_data *ctx,
3346 bool locked_by_caller)
3347 {
3348 ssize_t ret, written_bytes = 0;
3349 int rotation_ret;
3350 struct stream_subbuffer subbuffer = {};
3351 enum get_next_subbuffer_status get_next_status;
3352
3353 if (!locked_by_caller) {
3354 stream->read_subbuffer_ops.lock(stream);
3355 } else {
3356 stream->read_subbuffer_ops.assert_locked(stream);
3357 }
3358
3359 if (stream->read_subbuffer_ops.on_wake_up) {
3360 ret = stream->read_subbuffer_ops.on_wake_up(stream);
3361 if (ret) {
3362 goto end;
3363 }
3364 }
3365
3366 /*
3367 * If the stream was flagged to be ready for rotation before we extract
3368 * the next packet, rotate it now.
3369 */
3370 if (stream->rotate_ready) {
3371 DBG("Rotate stream before consuming data");
3372 ret = lttng_consumer_rotate_stream(stream);
3373 if (ret < 0) {
3374 ERR("Stream rotation error before consuming data");
3375 goto end;
3376 }
3377 }
3378
3379 get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
3380 switch (get_next_status) {
3381 case GET_NEXT_SUBBUFFER_STATUS_OK:
3382 break;
3383 case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
3384 /* Not an error. */
3385 ret = 0;
3386 goto sleep_stream;
3387 case GET_NEXT_SUBBUFFER_STATUS_ERROR:
3388 ret = -1;
3389 goto end;
3390 default:
3391 abort();
3392 }
3393
3394 ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
3395 if (ret) {
3396 goto error_put_subbuf;
3397 }
3398
3399 written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
3400 if (written_bytes <= 0) {
3401 ERR("Error consuming subbuffer: (%zd)", written_bytes);
3402 ret = (int) written_bytes;
3403 goto error_put_subbuf;
3404 }
3405
3406 ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3407 if (ret) {
3408 goto end;
3409 }
3410
3411 ret = post_consume(stream, &subbuffer, ctx);
3412 if (ret) {
3413 goto end;
3414 }
3415
3416 /*
3417 * After extracting the packet, we check if the stream is now ready to
3418 * be rotated and perform the action immediately.
3419 *
3420 * Don't overwrite `ret` as callers expect the number of bytes
3421 * consumed to be returned on success.
3422 */
3423 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3424 if (rotation_ret == 1) {
3425 rotation_ret = lttng_consumer_rotate_stream(stream);
3426 if (rotation_ret < 0) {
3427 ret = rotation_ret;
3428 ERR("Stream rotation error after consuming data");
3429 goto end;
3430 }
3431
3432 } else if (rotation_ret < 0) {
3433 ret = rotation_ret;
3434 ERR("Failed to check if stream was ready to rotate after consuming data");
3435 goto end;
3436 }
3437
3438 sleep_stream:
3439 if (stream->read_subbuffer_ops.on_sleep) {
3440 stream->read_subbuffer_ops.on_sleep(stream, ctx);
3441 }
3442
3443 ret = written_bytes;
3444 end:
3445 if (!locked_by_caller) {
3446 stream->read_subbuffer_ops.unlock(stream);
3447 }
3448
3449 return ret;
3450 error_put_subbuf:
3451 (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3452 goto end;
3453 }
3454
3455 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3456 {
3457 switch (the_consumer_data.type) {
3458 case LTTNG_CONSUMER_KERNEL:
3459 return lttng_kconsumer_on_recv_stream(stream);
3460 case LTTNG_CONSUMER32_UST:
3461 case LTTNG_CONSUMER64_UST:
3462 return lttng_ustconsumer_on_recv_stream(stream);
3463 default:
3464 ERR("Unknown consumer_data type");
3465 abort();
3466 return -ENOSYS;
3467 }
3468 }
3469
3470 /*
3471 * Allocate and set consumer data hash tables.
3472 */
3473 int lttng_consumer_init()
3474 {
3475 the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3476 if (!the_consumer_data.channel_ht) {
3477 goto error;
3478 }
3479
3480 the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3481 if (!the_consumer_data.channels_by_session_id_ht) {
3482 goto error;
3483 }
3484
3485 the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3486 if (!the_consumer_data.relayd_ht) {
3487 goto error;
3488 }
3489
3490 the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3491 if (!the_consumer_data.stream_list_ht) {
3492 goto error;
3493 }
3494
3495 the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3496 if (!the_consumer_data.stream_per_chan_id_ht) {
3497 goto error;
3498 }
3499
3500 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3501 if (!data_ht) {
3502 goto error;
3503 }
3504
3505 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3506 if (!metadata_ht) {
3507 goto error;
3508 }
3509
3510 the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3511 if (!the_consumer_data.chunk_registry) {
3512 goto error;
3513 }
3514
3515 return 0;
3516
3517 error:
3518 return -1;
3519 }
3520
3521 /*
3522 * Process the ADD_RELAYD command receive by a consumer.
3523 *
3524 * This will create a relayd socket pair and add it to the relayd hash table.
3525 * The caller MUST acquire a RCU read side lock before calling it.
3526 */
3527 void consumer_add_relayd_socket(uint64_t net_seq_idx,
3528 int sock_type,
3529 struct lttng_consumer_local_data *ctx,
3530 int sock,
3531 struct pollfd *consumer_sockpoll,
3532 uint64_t sessiond_id,
3533 uint64_t relayd_session_id,
3534 uint32_t relayd_version_major,
3535 uint32_t relayd_version_minor,
3536 enum lttcomm_sock_proto relayd_socket_protocol)
3537 {
3538 int fd = -1, ret = -1, relayd_created = 0;
3539 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3540 struct consumer_relayd_sock_pair *relayd = nullptr;
3541
3542 LTTNG_ASSERT(ctx);
3543 LTTNG_ASSERT(sock >= 0);
3544 ASSERT_RCU_READ_LOCKED();
3545
3546 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
3547
3548 /* Get relayd reference if exists. */
3549 relayd = consumer_find_relayd(net_seq_idx);
3550 if (relayd == nullptr) {
3551 LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
3552 /* Not found. Allocate one. */
3553 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3554 if (relayd == nullptr) {
3555 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3556 goto error;
3557 } else {
3558 relayd->sessiond_session_id = sessiond_id;
3559 relayd_created = 1;
3560 }
3561
3562 /*
3563 * This code path MUST continue to the consumer send status message to
3564 * we can notify the session daemon and continue our work without
3565 * killing everything.
3566 */
3567 } else {
3568 /*
3569 * relayd key should never be found for control socket.
3570 */
3571 LTTNG_ASSERT(sock_type != LTTNG_STREAM_CONTROL);
3572 }
3573
3574 /* First send a status message before receiving the fds. */
3575 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
3576 if (ret < 0) {
3577 /* Somehow, the session daemon is not responding anymore. */
3578 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3579 goto error_nosignal;
3580 }
3581
3582 /* Poll on consumer socket. */
3583 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3584 if (ret) {
3585 /* Needing to exit in the middle of a command: error. */
3586 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3587 goto error_nosignal;
3588 }
3589
3590 /* Get relayd socket from session daemon */
3591 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3592 if (ret != sizeof(fd)) {
3593 fd = -1; /* Just in case it gets set with an invalid value. */
3594
3595 /*
3596 * Failing to receive FDs might indicate a major problem such as
3597 * reaching a fd limit during the receive where the kernel returns a
3598 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3599 * don't take any chances and stop everything.
3600 *
3601 * XXX: Feature request #558 will fix that and avoid this possible
3602 * issue when reaching the fd limit.
3603 */
3604 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
3605 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
3606 goto error;
3607 }
3608
3609 /* Copy socket information and received FD */
3610 switch (sock_type) {
3611 case LTTNG_STREAM_CONTROL:
3612 /* Copy received lttcomm socket */
3613 ret = lttcomm_populate_sock_from_open_socket(
3614 &relayd->control_sock.sock, fd, relayd_socket_protocol);
3615
3616 /* Assign version values. */
3617 relayd->control_sock.major = relayd_version_major;
3618 relayd->control_sock.minor = relayd_version_minor;
3619
3620 relayd->relayd_session_id = relayd_session_id;
3621
3622 break;
3623 case LTTNG_STREAM_DATA:
3624 /* Copy received lttcomm socket */
3625 ret = lttcomm_populate_sock_from_open_socket(
3626 &relayd->data_sock.sock, fd, relayd_socket_protocol);
3627 /* Assign version values. */
3628 relayd->data_sock.major = relayd_version_major;
3629 relayd->data_sock.minor = relayd_version_minor;
3630 break;
3631 default:
3632 ERR("Unknown relayd socket type (%d)", sock_type);
3633 ret_code = LTTCOMM_CONSUMERD_FATAL;
3634 goto error;
3635 }
3636
3637 if (ret < 0) {
3638 ret_code = LTTCOMM_CONSUMERD_FATAL;
3639 goto error;
3640 }
3641
3642 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
3643 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3644 relayd->net_seq_idx,
3645 fd);
3646 /*
3647 * We gave the ownership of the fd to the relayd structure. Set the
3648 * fd to -1 so we don't call close() on it in the error path below.
3649 */
3650 fd = -1;
3651
3652 /* We successfully added the socket. Send status back. */
3653 ret = consumer_send_status_msg(sock, ret_code);
3654 if (ret < 0) {
3655 /* Somehow, the session daemon is not responding anymore. */
3656 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3657 goto error_nosignal;
3658 }
3659
3660 /*
3661 * Add relayd socket pair to consumer data hashtable. If object already
3662 * exists or on error, the function gracefully returns.
3663 */
3664 relayd->ctx = ctx;
3665 add_relayd(relayd);
3666
3667 /* All good! */
3668 return;
3669
3670 error:
3671 if (consumer_send_status_msg(sock, ret_code) < 0) {
3672 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3673 }
3674
3675 error_nosignal:
3676 /* Close received socket if valid. */
3677 if (fd >= 0) {
3678 if (close(fd)) {
3679 PERROR("close received socket");
3680 }
3681 }
3682
3683 if (relayd_created) {
3684 free(relayd);
3685 }
3686 }
3687
3688 /*
3689 * Search for a relayd associated to the session id and return the reference.
3690 *
3691 * A rcu read side lock MUST be acquire before calling this function and locked
3692 * until the relayd object is no longer necessary.
3693 */
3694 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3695 {
3696 struct lttng_ht_iter iter;
3697 struct consumer_relayd_sock_pair *relayd = nullptr;
3698
3699 ASSERT_RCU_READ_LOCKED();
3700
3701 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3702 cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
3703 /*
3704 * Check by sessiond id which is unique here where the relayd session
3705 * id might not be when having multiple relayd.
3706 */
3707 if (relayd->sessiond_session_id == id) {
3708 /* Found the relayd. There can be only one per id. */
3709 goto found;
3710 }
3711 }
3712
3713 return nullptr;
3714
3715 found:
3716 return relayd;
3717 }
3718
3719 /*
3720 * Check if for a given session id there is still data needed to be extract
3721 * from the buffers.
3722 *
3723 * Return 1 if data is pending or else 0 meaning ready to be read.
3724 */
3725 int consumer_data_pending(uint64_t id)
3726 {
3727 int ret;
3728 struct lttng_ht_iter iter;
3729 struct lttng_ht *ht;
3730 struct lttng_consumer_stream *stream;
3731 struct consumer_relayd_sock_pair *relayd = nullptr;
3732 int (*data_pending)(struct lttng_consumer_stream *);
3733
3734 DBG("Consumer data pending command on session id %" PRIu64, id);
3735
3736 lttng::urcu::read_lock_guard read_lock;
3737 pthread_mutex_lock(&the_consumer_data.lock);
3738
3739 switch (the_consumer_data.type) {
3740 case LTTNG_CONSUMER_KERNEL:
3741 data_pending = lttng_kconsumer_data_pending;
3742 break;
3743 case LTTNG_CONSUMER32_UST:
3744 case LTTNG_CONSUMER64_UST:
3745 data_pending = lttng_ustconsumer_data_pending;
3746 break;
3747 default:
3748 ERR("Unknown consumer data type");
3749 abort();
3750 }
3751
3752 /* Ease our life a bit */
3753 ht = the_consumer_data.stream_list_ht;
3754
3755 cds_lfht_for_each_entry_duplicate(ht->ht,
3756 ht->hash_fct(&id, lttng_ht_seed),
3757 ht->match_fct,
3758 &id,
3759 &iter.iter,
3760 stream,
3761 node_session_id.node)
3762 {
3763 pthread_mutex_lock(&stream->lock);
3764
3765 /*
3766 * A removed node from the hash table indicates that the stream has
3767 * been deleted thus having a guarantee that the buffers are closed
3768 * on the consumer side. However, data can still be transmitted
3769 * over the network so don't skip the relayd check.
3770 */
3771 ret = cds_lfht_is_node_deleted(&stream->node.node);
3772 if (!ret) {
3773 /* Check the stream if there is data in the buffers. */
3774 ret = data_pending(stream);
3775 if (ret == 1) {
3776 pthread_mutex_unlock(&stream->lock);
3777 goto data_pending;
3778 }
3779 }
3780
3781 pthread_mutex_unlock(&stream->lock);
3782 }
3783
3784 relayd = find_relayd_by_session_id(id);
3785 if (relayd) {
3786 unsigned int is_data_inflight = 0;
3787
3788 /* Send init command for data pending. */
3789 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3790 ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
3791 if (ret < 0) {
3792 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3793 /* Communication error thus the relayd so no data pending. */
3794 goto data_not_pending;
3795 }
3796
3797 cds_lfht_for_each_entry_duplicate(ht->ht,
3798 ht->hash_fct(&id, lttng_ht_seed),
3799 ht->match_fct,
3800 &id,
3801 &iter.iter,
3802 stream,
3803 node_session_id.node)
3804 {
3805 if (stream->metadata_flag) {
3806 ret = relayd_quiescent_control(&relayd->control_sock,
3807 stream->relayd_stream_id);
3808 } else {
3809 ret = relayd_data_pending(&relayd->control_sock,
3810 stream->relayd_stream_id,
3811 stream->next_net_seq_num - 1);
3812 }
3813
3814 if (ret == 1) {
3815 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3816 goto data_pending;
3817 } else if (ret < 0) {
3818 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
3819 relayd->net_seq_idx);
3820 lttng_consumer_cleanup_relayd(relayd);
3821 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3822 goto data_not_pending;
3823 }
3824 }
3825
3826 /* Send end command for data pending. */
3827 ret = relayd_end_data_pending(
3828 &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
3829 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3830 if (ret < 0) {
3831 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
3832 relayd->net_seq_idx);
3833 lttng_consumer_cleanup_relayd(relayd);
3834 goto data_not_pending;
3835 }
3836 if (is_data_inflight) {
3837 goto data_pending;
3838 }
3839 }
3840
3841 /*
3842 * Finding _no_ node in the hash table and no inflight data means that the
3843 * stream(s) have been removed thus data is guaranteed to be available for
3844 * analysis from the trace files.
3845 */
3846
3847 data_not_pending:
3848 /* Data is available to be read by a viewer. */
3849 pthread_mutex_unlock(&the_consumer_data.lock);
3850 return 0;
3851
3852 data_pending:
3853 /* Data is still being extracted from buffers. */
3854 pthread_mutex_unlock(&the_consumer_data.lock);
3855 return 1;
3856 }
3857
3858 /*
3859 * Send a ret code status message to the sessiond daemon.
3860 *
3861 * Return the sendmsg() return value.
3862 */
3863 int consumer_send_status_msg(int sock, int ret_code)
3864 {
3865 struct lttcomm_consumer_status_msg msg;
3866
3867 memset(&msg, 0, sizeof(msg));
3868 msg.ret_code = (lttcomm_return_code) ret_code;
3869
3870 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3871 }
3872
3873 /*
3874 * Send a channel status message to the sessiond daemon.
3875 *
3876 * Return the sendmsg() return value.
3877 */
3878 int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
3879 {
3880 struct lttcomm_consumer_status_channel msg;
3881
3882 LTTNG_ASSERT(sock >= 0);
3883
3884 memset(&msg, 0, sizeof(msg));
3885 if (!channel) {
3886 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
3887 } else {
3888 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3889 msg.key = channel->key;
3890 msg.stream_count = channel->streams.count;
3891 }
3892
3893 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3894 }
3895
3896 unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
3897 unsigned long produced_pos,
3898 uint64_t nb_packets_per_stream,
3899 uint64_t max_sb_size)
3900 {
3901 unsigned long start_pos;
3902
3903 if (!nb_packets_per_stream) {
3904 return consumed_pos; /* Grab everything */
3905 }
3906 start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
3907 start_pos -= max_sb_size * nb_packets_per_stream;
3908 if ((long) (start_pos - consumed_pos) < 0) {
3909 return consumed_pos; /* Grab everything */
3910 }
3911 return start_pos;
3912 }
3913
3914 /* Stream lock must be held by the caller. */
3915 static int sample_stream_positions(struct lttng_consumer_stream *stream,
3916 unsigned long *produced,
3917 unsigned long *consumed)
3918 {
3919 int ret;
3920
3921 ASSERT_LOCKED(stream->lock);
3922
3923 ret = lttng_consumer_sample_snapshot_positions(stream);
3924 if (ret < 0) {
3925 ERR("Failed to sample snapshot positions");
3926 goto end;
3927 }
3928
3929 ret = lttng_consumer_get_produced_snapshot(stream, produced);
3930 if (ret < 0) {
3931 ERR("Failed to sample produced position");
3932 goto end;
3933 }
3934
3935 ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
3936 if (ret < 0) {
3937 ERR("Failed to sample consumed position");
3938 goto end;
3939 }
3940
3941 end:
3942 return ret;
3943 }
3944
3945 /*
3946 * Sample the rotate position for all the streams of a channel. If a stream
3947 * is already at the rotate position (produced == consumed), we flag it as
3948 * ready for rotation. The rotation of ready streams occurs after we have
3949 * replied to the session daemon that we have finished sampling the positions.
3950 * Must be called with RCU read-side lock held to ensure existence of channel.
3951 *
3952 * Returns 0 on success, < 0 on error
3953 */
3954 int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
3955 uint64_t key,
3956 uint64_t relayd_id)
3957 {
3958 int ret;
3959 struct lttng_consumer_stream *stream;
3960 struct lttng_ht_iter iter;
3961 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
3962 struct lttng_dynamic_array stream_rotation_positions;
3963 uint64_t next_chunk_id, stream_count = 0;
3964 enum lttng_trace_chunk_status chunk_status;
3965 const bool is_local_trace = relayd_id == -1ULL;
3966 struct consumer_relayd_sock_pair *relayd = nullptr;
3967 bool rotating_to_new_chunk = true;
3968 /* Array of `struct lttng_consumer_stream *` */
3969 struct lttng_dynamic_pointer_array streams_packet_to_open;
3970 size_t stream_idx;
3971
3972 ASSERT_RCU_READ_LOCKED();
3973
3974 DBG("Consumer sample rotate position for channel %" PRIu64, key);
3975
3976 lttng_dynamic_array_init(&stream_rotation_positions,
3977 sizeof(struct relayd_stream_rotation_position),
3978 nullptr);
3979 lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
3980
3981 lttng::urcu::read_lock_guard read_lock;
3982
3983 pthread_mutex_lock(&channel->lock);
3984 LTTNG_ASSERT(channel->trace_chunk);
3985 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
3986 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
3987 ret = -1;
3988 goto end_unlock_channel;
3989 }
3990
3991 cds_lfht_for_each_entry_duplicate(ht->ht,
3992 ht->hash_fct(&channel->key, lttng_ht_seed),
3993 ht->match_fct,
3994 &channel->key,
3995 &iter.iter,
3996 stream,
3997 node_channel_id.node)
3998 {
3999 unsigned long produced_pos = 0, consumed_pos = 0;
4000
4001 health_code_update();
4002
4003 /*
4004 * Lock stream because we are about to change its state.
4005 */
4006 pthread_mutex_lock(&stream->lock);
4007
4008 if (stream->trace_chunk == stream->chan->trace_chunk) {
4009 rotating_to_new_chunk = false;
4010 }
4011
4012 /*
4013 * Do not flush a packet when rotating from a NULL trace
4014 * chunk. The stream has no means to output data, and the prior
4015 * rotation which rotated to NULL performed that side-effect
4016 * already. No new data can be produced when a stream has no
4017 * associated trace chunk (e.g. a stop followed by a rotate).
4018 */
4019 if (stream->trace_chunk) {
4020 bool flush_active;
4021
4022 if (stream->metadata_flag) {
4023 /*
4024 * Don't produce an empty metadata packet,
4025 * simply close the current one.
4026 *
4027 * Metadata is regenerated on every trace chunk
4028 * switch; there is no concern that no data was
4029 * produced.
4030 */
4031 flush_active = true;
4032 } else {
4033 /*
4034 * Only flush an empty packet if the "packet
4035 * open" could not be performed on transition
4036 * to a new trace chunk and no packets were
4037 * consumed within the chunk's lifetime.
4038 */
4039 if (stream->opened_packet_in_current_trace_chunk) {
4040 flush_active = true;
4041 } else {
4042 /*
4043 * Stream could have been full at the
4044 * time of rotation, but then have had
4045 * no activity at all.
4046 *
4047 * It is important to flush a packet
4048 * to prevent 0-length files from being
4049 * produced as most viewers choke on
4050 * them.
4051 *
4052 * Unfortunately viewers will not be
4053 * able to know that tracing was active
4054 * for this stream during this trace
4055 * chunk's lifetime.
4056 */
4057 ret = sample_stream_positions(
4058 stream, &produced_pos, &consumed_pos);
4059 if (ret) {
4060 goto end_unlock_stream;
4061 }
4062
4063 /*
4064 * Don't flush an empty packet if data
4065 * was produced; it will be consumed
4066 * before the rotation completes.
4067 */
4068 flush_active = produced_pos != consumed_pos;
4069 if (!flush_active) {
4070 const char *trace_chunk_name;
4071 uint64_t trace_chunk_id;
4072
4073 chunk_status = lttng_trace_chunk_get_name(
4074 stream->trace_chunk,
4075 &trace_chunk_name,
4076 nullptr);
4077 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
4078 trace_chunk_name = "none";
4079 }
4080
4081 /*
4082 * Consumer trace chunks are
4083 * never anonymous.
4084 */
4085 chunk_status = lttng_trace_chunk_get_id(
4086 stream->trace_chunk, &trace_chunk_id);
4087 LTTNG_ASSERT(chunk_status ==
4088 LTTNG_TRACE_CHUNK_STATUS_OK);
4089
4090 DBG("Unable to open packet for stream during trace chunk's lifetime. "
4091 "Flushing an empty packet to prevent an empty file from being created: "
4092 "stream id = %" PRIu64
4093 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
4094 stream->key,
4095 trace_chunk_name,
4096 trace_chunk_id);
4097 }
4098 }
4099 }
4100
4101 /*
4102 * Close the current packet before sampling the
4103 * ring buffer positions.
4104 */
4105 ret = consumer_stream_flush_buffer(stream, flush_active);
4106 if (ret < 0) {
4107 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
4108 stream->key);
4109 goto end_unlock_stream;
4110 }
4111 }
4112
4113 ret = lttng_consumer_take_snapshot(stream);
4114 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4115 ERR("Failed to sample snapshot position during channel rotation");
4116 goto end_unlock_stream;
4117 }
4118 if (!ret) {
4119 ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
4120 if (ret < 0) {
4121 ERR("Failed to sample produced position during channel rotation");
4122 goto end_unlock_stream;
4123 }
4124
4125 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
4126 if (ret < 0) {
4127 ERR("Failed to sample consumed position during channel rotation");
4128 goto end_unlock_stream;
4129 }
4130 }
4131 /*
4132 * Align produced position on the start-of-packet boundary of the first
4133 * packet going into the next trace chunk.
4134 */
4135 produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
4136 if (consumed_pos == produced_pos) {
4137 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
4138 stream->key,
4139 produced_pos,
4140 consumed_pos);
4141 stream->rotate_ready = true;
4142 } else {
4143 DBG("Different consumed and produced positions "
4144 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4145 stream->key,
4146 produced_pos,
4147 consumed_pos);
4148 }
4149 /*
4150 * The rotation position is based on the packet_seq_num of the
4151 * packet following the last packet that was consumed for this
4152 * stream, incremented by the offset between produced and
4153 * consumed positions. This rotation position is a lower bound
4154 * (inclusive) at which the next trace chunk starts. Since it
4155 * is a lower bound, it is OK if the packet_seq_num does not
4156 * correspond exactly to the same packet identified by the
4157 * consumed_pos, which can happen in overwrite mode.
4158 */
4159 if (stream->sequence_number_unavailable) {
4160 /*
4161 * Rotation should never be performed on a session which
4162 * interacts with a pre-2.8 lttng-modules, which does
4163 * not implement packet sequence number.
4164 */
4165 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
4166 stream->key);
4167 ret = -1;
4168 goto end_unlock_stream;
4169 }
4170 stream->rotate_position = stream->last_sequence_number + 1 +
4171 ((produced_pos - consumed_pos) / stream->max_sb_size);
4172 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
4173 stream->key,
4174 stream->rotate_position);
4175
4176 if (!is_local_trace) {
4177 /*
4178 * The relay daemon control protocol expects a rotation
4179 * position as "the sequence number of the first packet
4180 * _after_ the current trace chunk".
4181 */
4182 const struct relayd_stream_rotation_position position = {
4183 .stream_id = stream->relayd_stream_id,
4184 .rotate_at_seq_num = stream->rotate_position,
4185 };
4186
4187 ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
4188 &position);
4189 if (ret) {
4190 ERR("Failed to allocate stream rotation position");
4191 goto end_unlock_stream;
4192 }
4193 stream_count++;
4194 }
4195
4196 stream->opened_packet_in_current_trace_chunk = false;
4197
4198 if (rotating_to_new_chunk && !stream->metadata_flag) {
4199 /*
4200 * Attempt to flush an empty packet as close to the
4201 * rotation point as possible. In the event where a
4202 * stream remains inactive after the rotation point,
4203 * this ensures that the new trace chunk has a
4204 * beginning timestamp set at the begining of the
4205 * trace chunk instead of only creating an empty
4206 * packet when the trace chunk is stopped.
4207 *
4208 * This indicates to the viewers that the stream
4209 * was being recorded, but more importantly it
4210 * allows viewers to determine a useable trace
4211 * intersection.
4212 *
4213 * This presents a problem in the case where the
4214 * ring-buffer is completely full.
4215 *
4216 * Consider the following scenario:
4217 * - The consumption of data is slow (slow network,
4218 * for instance),
4219 * - The ring buffer is full,
4220 * - A rotation is initiated,
4221 * - The flush below does nothing (no space left to
4222 * open a new packet),
4223 * - The other streams rotate very soon, and new
4224 * data is produced in the new chunk,
4225 * - This stream completes its rotation long after the
4226 * rotation was initiated
4227 * - The session is stopped before any event can be
4228 * produced in this stream's buffers.
4229 *
4230 * The resulting trace chunk will have a single packet
4231 * temporaly at the end of the trace chunk for this
4232 * stream making the stream intersection more narrow
4233 * than it should be.
4234 *
4235 * To work-around this, an empty flush is performed
4236 * after the first consumption of a packet during a
4237 * rotation if open_packet fails. The idea is that
4238 * consuming a packet frees enough space to switch
4239 * packets in this scenario and allows the tracer to
4240 * "stamp" the beginning of the new trace chunk at the
4241 * earliest possible point.
4242 *
4243 * The packet open is performed after the channel
4244 * rotation to ensure that no attempt to open a packet
4245 * is performed in a stream that has no active trace
4246 * chunk.
4247 */
4248 ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
4249 stream);
4250 if (ret) {
4251 PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
4252 ret = -1;
4253 goto end_unlock_stream;
4254 }
4255 }
4256
4257 pthread_mutex_unlock(&stream->lock);
4258 }
4259 stream = nullptr;
4260
4261 if (!is_local_trace) {
4262 relayd = consumer_find_relayd(relayd_id);
4263 if (!relayd) {
4264 ERR("Failed to find relayd %" PRIu64, relayd_id);
4265 ret = -1;
4266 goto end_unlock_channel;
4267 }
4268
4269 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4270 ret = relayd_rotate_streams(&relayd->control_sock,
4271 stream_count,
4272 rotating_to_new_chunk ? &next_chunk_id : nullptr,
4273 (const struct relayd_stream_rotation_position *)
4274 stream_rotation_positions.buffer.data);
4275 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4276 if (ret < 0) {
4277 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
4278 relayd->net_seq_idx);
4279 lttng_consumer_cleanup_relayd(relayd);
4280 goto end_unlock_channel;
4281 }
4282 }
4283
4284 for (stream_idx = 0;
4285 stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
4286 stream_idx++) {
4287 enum consumer_stream_open_packet_status status;
4288
4289 stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
4290 &streams_packet_to_open, stream_idx);
4291
4292 pthread_mutex_lock(&stream->lock);
4293 status = consumer_stream_open_packet(stream);
4294 pthread_mutex_unlock(&stream->lock);
4295 switch (status) {
4296 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
4297 DBG("Opened a packet after a rotation: stream id = %" PRIu64
4298 ", channel name = %s, session id = %" PRIu64,
4299 stream->key,
4300 stream->chan->name,
4301 stream->chan->session_id);
4302 break;
4303 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
4304 /*
4305 * Can't open a packet as there is no space left
4306 * in the buffer. A new packet will be opened
4307 * once one has been consumed.
4308 */
4309 DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
4310 ", channel name = %s, session id = %" PRIu64,
4311 stream->key,
4312 stream->chan->name,
4313 stream->chan->session_id);
4314 break;
4315 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
4316 /* Logged by callee. */
4317 ret = -1;
4318 goto end_unlock_channel;
4319 default:
4320 abort();
4321 }
4322 }
4323
4324 pthread_mutex_unlock(&channel->lock);
4325 ret = 0;
4326 goto end;
4327
4328 end_unlock_stream:
4329 pthread_mutex_unlock(&stream->lock);
4330 end_unlock_channel:
4331 pthread_mutex_unlock(&channel->lock);
4332 end:
4333 lttng_dynamic_array_reset(&stream_rotation_positions);
4334 lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
4335 return ret;
4336 }
4337
4338 static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
4339 {
4340 int ret = 0;
4341 unsigned long consumed_pos_before, consumed_pos_after;
4342
4343 ret = lttng_consumer_sample_snapshot_positions(stream);
4344 if (ret < 0) {
4345 ERR("Taking snapshot positions");
4346 goto end;
4347 }
4348
4349 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4350 if (ret < 0) {
4351 ERR("Consumed snapshot position");
4352 goto end;
4353 }
4354
4355 switch (the_consumer_data.type) {
4356 case LTTNG_CONSUMER_KERNEL:
4357 ret = kernctl_buffer_clear(stream->wait_fd);
4358 if (ret < 0) {
4359 ERR("Failed to clear kernel stream (ret = %d)", ret);
4360 goto end;
4361 }
4362 break;
4363 case LTTNG_CONSUMER32_UST:
4364 case LTTNG_CONSUMER64_UST:
4365 ret = lttng_ustconsumer_clear_buffer(stream);
4366 if (ret < 0) {
4367 ERR("Failed to clear ust stream (ret = %d)", ret);
4368 goto end;
4369 }
4370 break;
4371 default:
4372 ERR("Unknown consumer_data type");
4373 abort();
4374 }
4375
4376 ret = lttng_consumer_sample_snapshot_positions(stream);
4377 if (ret < 0) {
4378 ERR("Taking snapshot positions");
4379 goto end;
4380 }
4381 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4382 if (ret < 0) {
4383 ERR("Consumed snapshot position");
4384 goto end;
4385 }
4386 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4387 end:
4388 return ret;
4389 }
4390
4391 static int consumer_clear_stream(struct lttng_consumer_stream *stream)
4392 {
4393 int ret;
4394
4395 ret = consumer_stream_flush_buffer(stream, true);
4396 if (ret < 0) {
4397 ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
4398 ret = LTTCOMM_CONSUMERD_FATAL;
4399 goto error;
4400 }
4401
4402 ret = consumer_clear_buffer(stream);
4403 if (ret < 0) {
4404 ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
4405 ret = LTTCOMM_CONSUMERD_FATAL;
4406 goto error;
4407 }
4408
4409 ret = LTTCOMM_CONSUMERD_SUCCESS;
4410 error:
4411 return ret;
4412 }
4413
4414 static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
4415 {
4416 int ret;
4417 struct lttng_consumer_stream *stream;
4418
4419 lttng::urcu::read_lock_guard read_lock;
4420 pthread_mutex_lock(&channel->lock);
4421 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
4422 health_code_update();
4423 pthread_mutex_lock(&stream->lock);
4424 ret = consumer_clear_stream(stream);
4425 if (ret) {
4426 goto error_unlock;
4427 }
4428 pthread_mutex_unlock(&stream->lock);
4429 }
4430 pthread_mutex_unlock(&channel->lock);
4431 return 0;
4432
4433 error_unlock:
4434 pthread_mutex_unlock(&stream->lock);
4435 pthread_mutex_unlock(&channel->lock);
4436 return ret;
4437 }
4438
4439 /*
4440 * Check if a stream is ready to be rotated after extracting it.
4441 *
4442 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4443 * error. Stream lock must be held.
4444 */
4445 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4446 {
4447 DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
4448 " last_sequence_number %" PRIu64,
4449 stream->key,
4450 stream->rotate_ready,
4451 stream->rotate_position,
4452 stream->last_sequence_number);
4453 if (stream->rotate_ready) {
4454 return 1;
4455 }
4456
4457 /*
4458 * If packet seq num is unavailable, it means we are interacting
4459 * with a pre-2.8 lttng-modules which does not implement the
4460 * sequence number. Rotation should never be used by sessiond in this
4461 * scenario.
4462 */
4463 if (stream->sequence_number_unavailable) {
4464 ERR("Internal error: rotation used on stream %" PRIu64
4465 " with unavailable sequence number",
4466 stream->key);
4467 return -1;
4468 }
4469
4470 if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
4471 return 0;
4472 }
4473
4474 /*
4475 * Rotate position not reached yet. The stream rotate position is
4476 * the position of the next packet belonging to the next trace chunk,
4477 * but consumerd considers rotation ready when reaching the last
4478 * packet of the current chunk, hence the "rotate_position - 1".
4479 */
4480
4481 DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
4482 " rotate_position %" PRIu64,
4483 stream->key,
4484 stream->last_sequence_number,
4485 stream->rotate_position);
4486 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4487 return 1;
4488 }
4489
4490 return 0;
4491 }
4492
4493 /*
4494 * Reset the state for a stream after a rotation occurred.
4495 */
4496 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4497 {
4498 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
4499 stream->rotate_position = -1ULL;
4500 stream->rotate_ready = false;
4501 }
4502
4503 /*
4504 * Perform the rotation a local stream file.
4505 */
4506 static int rotate_local_stream(struct lttng_consumer_stream *stream)
4507 {
4508 int ret = 0;
4509
4510 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
4511 stream->key,
4512 stream->chan->key);
4513 stream->tracefile_size_current = 0;
4514 stream->tracefile_count_current = 0;
4515
4516 if (stream->out_fd >= 0) {
4517 ret = close(stream->out_fd);
4518 if (ret) {
4519 PERROR("Failed to close stream out_fd of channel \"%s\"",
4520 stream->chan->name);
4521 }
4522 stream->out_fd = -1;
4523 }
4524
4525 if (stream->index_file) {
4526 lttng_index_file_put(stream->index_file);
4527 stream->index_file = nullptr;
4528 }
4529
4530 if (!stream->trace_chunk) {
4531 goto end;
4532 }
4533
4534 ret = consumer_stream_create_output_files(stream, true);
4535 end:
4536 return ret;
4537 }
4538
4539 /*
4540 * Performs the stream rotation for the rotate session feature if needed.
4541 * It must be called with the channel and stream locks held.
4542 *
4543 * Return 0 on success, a negative number of error.
4544 */
4545 int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
4546 {
4547 int ret;
4548
4549 DBG("Consumer rotate stream %" PRIu64, stream->key);
4550
4551 /*
4552 * Update the stream's 'current' chunk to the session's (channel)
4553 * now-current chunk.
4554 */
4555 lttng_trace_chunk_put(stream->trace_chunk);
4556 if (stream->chan->trace_chunk == stream->trace_chunk) {
4557 /*
4558 * A channel can be rotated and not have a "next" chunk
4559 * to transition to. In that case, the channel's "current chunk"
4560 * has not been closed yet, but it has not been updated to
4561 * a "next" trace chunk either. Hence, the stream, like its
4562 * parent channel, becomes part of no chunk and can't output
4563 * anything until a new trace chunk is created.
4564 */
4565 stream->trace_chunk = nullptr;
4566 } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
4567 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4568 ret = -1;
4569 goto error;
4570 } else {
4571 /*
4572 * Update the stream's trace chunk to its parent channel's
4573 * current trace chunk.
4574 */
4575 stream->trace_chunk = stream->chan->trace_chunk;
4576 }
4577
4578 if (stream->net_seq_idx == (uint64_t) -1ULL) {
4579 ret = rotate_local_stream(stream);
4580 if (ret < 0) {
4581 ERR("Failed to rotate stream, ret = %i", ret);
4582 goto error;
4583 }
4584 }
4585
4586 if (stream->metadata_flag && stream->trace_chunk) {
4587 /*
4588 * If the stream has transitioned to a new trace
4589 * chunk, the metadata should be re-dumped to the
4590 * newest chunk.
4591 *
4592 * However, it is possible for a stream to transition to
4593 * a "no-chunk" state. This can happen if a rotation
4594 * occurs on an inactive session. In such cases, the metadata
4595 * regeneration will happen when the next trace chunk is
4596 * created.
4597 */
4598 ret = consumer_metadata_stream_dump(stream);
4599 if (ret) {
4600 goto error;
4601 }
4602 }
4603 lttng_consumer_reset_stream_rotate_state(stream);
4604
4605 ret = 0;
4606
4607 error:
4608 return ret;
4609 }
4610
4611 /*
4612 * Rotate all the ready streams now.
4613 *
4614 * This is especially important for low throughput streams that have already
4615 * been consumed, we cannot wait for their next packet to perform the
4616 * rotation.
4617 * Need to be called with RCU read-side lock held to ensure existence of
4618 * channel.
4619 *
4620 * Returns 0 on success, < 0 on error
4621 */
4622 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
4623 {
4624 int ret;
4625 struct lttng_consumer_stream *stream;
4626 struct lttng_ht_iter iter;
4627 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
4628
4629 ASSERT_RCU_READ_LOCKED();
4630
4631 lttng::urcu::read_lock_guard read_lock;
4632
4633 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4634
4635 cds_lfht_for_each_entry_duplicate(ht->ht,
4636 ht->hash_fct(&channel->key, lttng_ht_seed),
4637 ht->match_fct,
4638 &channel->key,
4639 &iter.iter,
4640 stream,
4641 node_channel_id.node)
4642 {
4643 health_code_update();
4644
4645 pthread_mutex_lock(&stream->chan->lock);
4646 pthread_mutex_lock(&stream->lock);
4647
4648 if (!stream->rotate_ready) {
4649 pthread_mutex_unlock(&stream->lock);
4650 pthread_mutex_unlock(&stream->chan->lock);
4651 continue;
4652 }
4653 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4654
4655 ret = lttng_consumer_rotate_stream(stream);
4656 pthread_mutex_unlock(&stream->lock);
4657 pthread_mutex_unlock(&stream->chan->lock);
4658 if (ret) {
4659 goto end;
4660 }
4661 }
4662
4663 ret = 0;
4664
4665 end:
4666 return ret;
4667 }
4668
4669 enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
4670 const lttng_uuid& sessiond_uuid)
4671 {
4672 enum lttcomm_return_code ret;
4673 char uuid_str[LTTNG_UUID_STR_LEN];
4674
4675 if (ctx->sessiond_uuid.is_set) {
4676 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
4677 goto end;
4678 }
4679
4680 ctx->sessiond_uuid.is_set = true;
4681 ctx->sessiond_uuid.value = sessiond_uuid;
4682 ret = LTTCOMM_CONSUMERD_SUCCESS;
4683 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4684 DBG("Received session daemon UUID: %s", uuid_str);
4685 end:
4686 return ret;
4687 }
4688
4689 enum lttcomm_return_code
4690 lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
4691 uint64_t session_id,
4692 uint64_t chunk_id,
4693 time_t chunk_creation_timestamp,
4694 const char *chunk_override_name,
4695 const struct lttng_credentials *credentials,
4696 struct lttng_directory_handle *chunk_directory_handle)
4697 {
4698 int ret;
4699 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4700 struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
4701 enum lttng_trace_chunk_status chunk_status;
4702 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4703 char creation_timestamp_buffer[ISO8601_STR_LEN];
4704 const char *relayd_id_str = "(none)";
4705 const char *creation_timestamp_str;
4706 struct lttng_ht_iter iter;
4707 struct lttng_consumer_channel *channel;
4708
4709 if (relayd_id) {
4710 /* Only used for logging purposes. */
4711 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
4712 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4713 relayd_id_str = relayd_id_buffer;
4714 } else {
4715 relayd_id_str = "(formatting error)";
4716 }
4717 }
4718
4719 /* Local protocol error. */
4720 LTTNG_ASSERT(chunk_creation_timestamp);
4721 ret = time_to_iso8601_str(chunk_creation_timestamp,
4722 creation_timestamp_buffer,
4723 sizeof(creation_timestamp_buffer));
4724 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
4725
4726 DBG("Consumer create trace chunk command: relay_id = %s"
4727 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
4728 ", chunk_creation_timestamp = %s",
4729 relayd_id_str,
4730 session_id,
4731 chunk_id,
4732 chunk_override_name ?: "(none)",
4733 creation_timestamp_str);
4734
4735 /*
4736 * The trace chunk registry, as used by the consumer daemon, implicitly
4737 * owns the trace chunks. This is only needed in the consumer since
4738 * the consumer has no notion of a session beyond session IDs being
4739 * used to identify other objects.
4740 *
4741 * The lttng_trace_chunk_registry_publish() call below provides a
4742 * reference which is not released; it implicitly becomes the session
4743 * daemon's reference to the chunk in the consumer daemon.
4744 *
4745 * The lifetime of trace chunks in the consumer daemon is managed by
4746 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4747 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
4748 */
4749 created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
4750 if (!created_chunk) {
4751 ERR("Failed to create trace chunk");
4752 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4753 goto error;
4754 }
4755
4756 if (chunk_override_name) {
4757 chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
4758 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4759 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4760 goto error;
4761 }
4762 }
4763
4764 if (chunk_directory_handle) {
4765 chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
4766 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4767 ERR("Failed to set trace chunk credentials");
4768 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4769 goto error;
4770 }
4771 /*
4772 * The consumer daemon has no ownership of the chunk output
4773 * directory.
4774 */
4775 chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
4776 chunk_directory_handle = nullptr;
4777 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4778 ERR("Failed to set trace chunk's directory handle");
4779 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4780 goto error;
4781 }
4782 }
4783
4784 published_chunk = lttng_trace_chunk_registry_publish_chunk(
4785 the_consumer_data.chunk_registry, session_id, created_chunk);
4786 lttng_trace_chunk_put(created_chunk);
4787 created_chunk = nullptr;
4788 if (!published_chunk) {
4789 ERR("Failed to publish trace chunk");
4790 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4791 goto error;
4792 }
4793
4794 {
4795 lttng::urcu::read_lock_guard read_lock;
4796 cds_lfht_for_each_entry_duplicate(
4797 the_consumer_data.channels_by_session_id_ht->ht,
4798 the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
4799 lttng_ht_seed),
4800 the_consumer_data.channels_by_session_id_ht->match_fct,
4801 &session_id,
4802 &iter.iter,
4803 channel,
4804 channels_by_session_id_ht_node.node)
4805 {
4806 ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
4807 if (ret) {
4808 /*
4809 * Roll-back the creation of this chunk.
4810 *
4811 * This is important since the session daemon will
4812 * assume that the creation of this chunk failed and
4813 * will never ask for it to be closed, resulting
4814 * in a leak and an inconsistent state for some
4815 * channels.
4816 */
4817 enum lttcomm_return_code close_ret;
4818 char path[LTTNG_PATH_MAX];
4819
4820 DBG("Failed to set new trace chunk on existing channels, rolling back");
4821 close_ret =
4822 lttng_consumer_close_trace_chunk(relayd_id,
4823 session_id,
4824 chunk_id,
4825 chunk_creation_timestamp,
4826 nullptr,
4827 path);
4828 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4829 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4830 ", chunk_id = %" PRIu64,
4831 session_id,
4832 chunk_id);
4833 }
4834
4835 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4836 break;
4837 }
4838 }
4839 }
4840
4841 if (relayd_id) {
4842 struct consumer_relayd_sock_pair *relayd;
4843
4844 relayd = consumer_find_relayd(*relayd_id);
4845 if (relayd) {
4846 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4847 ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
4848 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4849 } else {
4850 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4851 }
4852
4853 if (!relayd || ret) {
4854 enum lttcomm_return_code close_ret;
4855 char path[LTTNG_PATH_MAX];
4856
4857 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
4858 session_id,
4859 chunk_id,
4860 chunk_creation_timestamp,
4861 nullptr,
4862 path);
4863 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4864 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4865 ", chunk_id = %" PRIu64,
4866 session_id,
4867 chunk_id);
4868 }
4869
4870 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4871 goto error_unlock;
4872 }
4873 }
4874 error_unlock:
4875 error:
4876 /* Release the reference returned by the "publish" operation. */
4877 lttng_trace_chunk_put(published_chunk);
4878 lttng_trace_chunk_put(created_chunk);
4879 return ret_code;
4880 }
4881
4882 enum lttcomm_return_code
4883 lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
4884 uint64_t session_id,
4885 uint64_t chunk_id,
4886 time_t chunk_close_timestamp,
4887 const enum lttng_trace_chunk_command_type *close_command,
4888 char *path)
4889 {
4890 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4891 struct lttng_trace_chunk *chunk;
4892 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4893 const char *relayd_id_str = "(none)";
4894 const char *close_command_name = "none";
4895 struct lttng_ht_iter iter;
4896 struct lttng_consumer_channel *channel;
4897 enum lttng_trace_chunk_status chunk_status;
4898
4899 if (relayd_id) {
4900 int ret;
4901
4902 /* Only used for logging purposes. */
4903 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
4904 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4905 relayd_id_str = relayd_id_buffer;
4906 } else {
4907 relayd_id_str = "(formatting error)";
4908 }
4909 }
4910 if (close_command) {
4911 close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
4912 }
4913
4914 DBG("Consumer close trace chunk command: relayd_id = %s"
4915 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
4916 relayd_id_str,
4917 session_id,
4918 chunk_id,
4919 close_command_name);
4920
4921 chunk = lttng_trace_chunk_registry_find_chunk(
4922 the_consumer_data.chunk_registry, session_id, chunk_id);
4923 if (!chunk) {
4924 ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4925 session_id,
4926 chunk_id);
4927 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4928 goto end;
4929 }
4930
4931 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
4932 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4933 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4934 goto end;
4935 }
4936
4937 if (close_command) {
4938 chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
4939 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4940 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4941 goto end;
4942 }
4943 }
4944
4945 /*
4946 * chunk is now invalid to access as we no longer hold a reference to
4947 * it; it is only kept around to compare it (by address) to the
4948 * current chunk found in the session's channels.
4949 */
4950 {
4951 lttng::urcu::read_lock_guard read_lock;
4952 cds_lfht_for_each_entry (
4953 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
4954 int ret;
4955
4956 /*
4957 * Only change the channel's chunk to NULL if it still
4958 * references the chunk being closed. The channel may
4959 * reference a newer channel in the case of a session
4960 * rotation. When a session rotation occurs, the "next"
4961 * chunk is created before the "current" chunk is closed.
4962 */
4963 if (channel->trace_chunk != chunk) {
4964 continue;
4965 }
4966 ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
4967 if (ret) {
4968 /*
4969 * Attempt to close the chunk on as many channels as
4970 * possible.
4971 */
4972 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4973 }
4974 }
4975 }
4976 if (relayd_id) {
4977 int ret;
4978 struct consumer_relayd_sock_pair *relayd;
4979
4980 relayd = consumer_find_relayd(*relayd_id);
4981 if (relayd) {
4982 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4983 ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
4984 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4985 } else {
4986 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4987 }
4988
4989 if (!relayd || ret) {
4990 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4991 goto error_unlock;
4992 }
4993 }
4994 error_unlock:
4995 end:
4996 /*
4997 * Release the reference returned by the "find" operation and
4998 * the session daemon's implicit reference to the chunk.
4999 */
5000 lttng_trace_chunk_put(chunk);
5001 lttng_trace_chunk_put(chunk);
5002
5003 return ret_code;
5004 }
5005
5006 enum lttcomm_return_code
5007 lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
5008 {
5009 int ret;
5010 enum lttcomm_return_code ret_code;
5011 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
5012 const char *relayd_id_str = "(none)";
5013 const bool is_local_trace = !relayd_id;
5014 struct consumer_relayd_sock_pair *relayd = nullptr;
5015 bool chunk_exists_local, chunk_exists_remote;
5016 lttng::urcu::read_lock_guard read_lock;
5017
5018 if (relayd_id) {
5019 /* Only used for logging purposes. */
5020 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
5021 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
5022 relayd_id_str = relayd_id_buffer;
5023 } else {
5024 relayd_id_str = "(formatting error)";
5025 }
5026 }
5027
5028 DBG("Consumer trace chunk exists command: relayd_id = %s"
5029 ", chunk_id = %" PRIu64,
5030 relayd_id_str,
5031 chunk_id);
5032 ret = lttng_trace_chunk_registry_chunk_exists(
5033 the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
5034 if (ret) {
5035 /* Internal error. */
5036 ERR("Failed to query the existence of a trace chunk");
5037 ret_code = LTTCOMM_CONSUMERD_FATAL;
5038 goto end;
5039 }
5040 DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
5041 if (chunk_exists_local) {
5042 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
5043 goto end;
5044 } else if (is_local_trace) {
5045 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5046 goto end;
5047 }
5048
5049 relayd = consumer_find_relayd(*relayd_id);
5050 if (!relayd) {
5051 ERR("Failed to find relayd %" PRIu64, *relayd_id);
5052 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5053 goto end_rcu_unlock;
5054 }
5055 DBG("Looking up existence of trace chunk on relay daemon");
5056 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
5057 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
5058 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5059 if (ret < 0) {
5060 ERR("Failed to look-up the existence of trace chunk on relay daemon");
5061 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
5062 goto end_rcu_unlock;
5063 }
5064
5065 ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
5066 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5067 DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
5068
5069 end_rcu_unlock:
5070 end:
5071 return ret_code;
5072 }
5073
5074 static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
5075 {
5076 struct lttng_ht *ht;
5077 struct lttng_consumer_stream *stream;
5078 struct lttng_ht_iter iter;
5079 int ret;
5080
5081 ht = the_consumer_data.stream_per_chan_id_ht;
5082
5083 lttng::urcu::read_lock_guard read_lock;
5084 cds_lfht_for_each_entry_duplicate(ht->ht,
5085 ht->hash_fct(&channel->key, lttng_ht_seed),
5086 ht->match_fct,
5087 &channel->key,
5088 &iter.iter,
5089 stream,
5090 node_channel_id.node)
5091 {
5092 /*
5093 * Protect against teardown with mutex.
5094 */
5095 pthread_mutex_lock(&stream->lock);
5096 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5097 goto next;
5098 }
5099 ret = consumer_clear_stream(stream);
5100 if (ret) {
5101 goto error_unlock;
5102 }
5103 next:
5104 pthread_mutex_unlock(&stream->lock);
5105 }
5106 return LTTCOMM_CONSUMERD_SUCCESS;
5107
5108 error_unlock:
5109 pthread_mutex_unlock(&stream->lock);
5110 return ret;
5111 }
5112
5113 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
5114 {
5115 int ret;
5116
5117 DBG("Consumer clear channel %" PRIu64, channel->key);
5118
5119 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
5120 /*
5121 * Nothing to do for the metadata channel/stream.
5122 * Snapshot mechanism already take care of the metadata
5123 * handling/generation, and monitored channels only need to
5124 * have their data stream cleared..
5125 */
5126 ret = LTTCOMM_CONSUMERD_SUCCESS;
5127 goto end;
5128 }
5129
5130 if (!channel->monitor) {
5131 ret = consumer_clear_unmonitored_channel(channel);
5132 } else {
5133 ret = consumer_clear_monitored_channel(channel);
5134 }
5135 end:
5136 return ret;
5137 }
5138
5139 enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
5140 {
5141 struct lttng_consumer_stream *stream;
5142 enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
5143
5144 if (channel->metadata_stream) {
5145 ERR("Open channel packets command attempted on a metadata channel");
5146 ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5147 goto end;
5148 }
5149
5150 {
5151 lttng::urcu::read_lock_guard read_lock;
5152 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5153 enum consumer_stream_open_packet_status status;
5154
5155 pthread_mutex_lock(&stream->lock);
5156 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5157 goto next;
5158 }
5159
5160 status = consumer_stream_open_packet(stream);
5161 switch (status) {
5162 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
5163 DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
5164 ", channel name = %s, session id = %" PRIu64,
5165 stream->key,
5166 stream->chan->name,
5167 stream->chan->session_id);
5168 stream->opened_packet_in_current_trace_chunk = true;
5169 break;
5170 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
5171 DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
5172 ", channel name = %s, session id = %" PRIu64,
5173 stream->key,
5174 stream->chan->name,
5175 stream->chan->session_id);
5176 break;
5177 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
5178 /*
5179 * Only unexpected internal errors can lead to this
5180 * failing. Report an unknown error.
5181 */
5182 ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
5183 ", channel id = %" PRIu64 ", channel name = %s"
5184 ", session id = %" PRIu64,
5185 stream->key,
5186 channel->key,
5187 channel->name,
5188 channel->session_id);
5189 ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
5190 goto error_unlock;
5191 default:
5192 abort();
5193 }
5194
5195 next:
5196 pthread_mutex_unlock(&stream->lock);
5197 }
5198 }
5199 end_rcu_unlock:
5200 end:
5201 return ret;
5202
5203 error_unlock:
5204 pthread_mutex_unlock(&stream->lock);
5205 goto end_rcu_unlock;
5206 }
5207
5208 void lttng_consumer_sigbus_handle(void *addr)
5209 {
5210 lttng_ustconsumer_sigbus_handle(addr);
5211 }
This page took 0.174145 seconds and 5 git commands to generate.