5f3dc4e621ad303a3b605e134182ee8177448877
[lttng-tools.git] / src / common / consumer / consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <common/align.hpp>
12 #include <common/common.hpp>
13 #include <common/compat/endian.hpp>
14 #include <common/compat/poll.hpp>
15 #include <common/consumer/consumer-metadata-cache.hpp>
16 #include <common/consumer/consumer-stream.hpp>
17 #include <common/consumer/consumer-testpoint.hpp>
18 #include <common/consumer/consumer-timer.hpp>
19 #include <common/consumer/consumer.hpp>
20 #include <common/dynamic-array.hpp>
21 #include <common/index/ctf-index.hpp>
22 #include <common/index/index.hpp>
23 #include <common/kernel-consumer/kernel-consumer.hpp>
24 #include <common/kernel-ctl/kernel-ctl.hpp>
25 #include <common/relayd/relayd.hpp>
26 #include <common/sessiond-comm/relayd.hpp>
27 #include <common/sessiond-comm/sessiond-comm.hpp>
28 #include <common/string-utils/format.hpp>
29 #include <common/time.hpp>
30 #include <common/trace-chunk-registry.hpp>
31 #include <common/trace-chunk.hpp>
32 #include <common/urcu.hpp>
33 #include <common/ust-consumer/ust-consumer.hpp>
34 #include <common/utils.hpp>
35
36 #include <bin/lttng-consumerd/health-consumerd.hpp>
37 #include <inttypes.h>
38 #include <poll.h>
39 #include <pthread.h>
40 #include <signal.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #include <sys/mman.h>
44 #include <sys/socket.h>
45 #include <sys/types.h>
46 #include <unistd.h>
47
48 lttng_consumer_global_data the_consumer_data;
49
50 enum consumer_channel_action {
51 CONSUMER_CHANNEL_ADD,
52 CONSUMER_CHANNEL_DEL,
53 CONSUMER_CHANNEL_QUIT,
54 };
55
56 namespace {
57 struct consumer_channel_msg {
58 enum consumer_channel_action action;
59 struct lttng_consumer_channel *chan; /* add */
60 uint64_t key; /* del */
61 };
62
63 /*
64 * Global hash table containing respectively metadata and data streams. The
65 * stream element in this ht should only be updated by the metadata poll thread
66 * for the metadata and the data poll thread for the data.
67 */
68 struct lttng_ht *metadata_ht;
69 struct lttng_ht *data_ht;
70 } /* namespace */
71
72 /* Flag used to temporarily pause data consumption from testpoints. */
73 int data_consumption_paused;
74
75 /*
76 * Flag to inform the polling thread to quit when all fd hung up. Updated by
77 * the consumer_thread_receive_fds when it notices that all fds has hung up.
78 * Also updated by the signal handler (consumer_should_exit()). Read by the
79 * polling threads.
80 */
81 int consumer_quit;
82
83 static const char *get_consumer_domain()
84 {
85 switch (the_consumer_data.type) {
86 case LTTNG_CONSUMER_KERNEL:
87 return DEFAULT_KERNEL_TRACE_DIR;
88 case LTTNG_CONSUMER64_UST:
89 /* Fall-through. */
90 case LTTNG_CONSUMER32_UST:
91 return DEFAULT_UST_TRACE_DIR;
92 default:
93 abort();
94 }
95 }
96
97 /*
98 * Notify a thread lttng pipe to poll back again. This usually means that some
99 * global state has changed so we just send back the thread in a poll wait
100 * call.
101 */
102 static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
103 {
104 struct lttng_consumer_stream *null_stream = nullptr;
105
106 LTTNG_ASSERT(pipe);
107
108 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
109 pointer. */
110 }
111
112 static void notify_health_quit_pipe(int *pipe)
113 {
114 ssize_t ret;
115
116 ret = lttng_write(pipe[1], "4", 1);
117 if (ret < 1) {
118 PERROR("write consumer health quit");
119 }
120 }
121
122 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
123 struct lttng_consumer_channel *chan,
124 uint64_t key,
125 enum consumer_channel_action action)
126 {
127 struct consumer_channel_msg msg;
128 ssize_t ret;
129
130 memset(&msg, 0, sizeof(msg));
131
132 msg.action = action;
133 msg.chan = chan;
134 msg.key = key;
135 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
136 if (ret < sizeof(msg)) {
137 PERROR("notify_channel_pipe write error");
138 }
139 }
140
141 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
142 {
143 notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
144 }
145
146 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
147 struct lttng_consumer_channel **chan,
148 uint64_t *key,
149 enum consumer_channel_action *action)
150 {
151 struct consumer_channel_msg msg;
152 ssize_t ret;
153
154 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
155 if (ret < sizeof(msg)) {
156 ret = -1;
157 goto error;
158 }
159 *action = msg.action;
160 *chan = msg.chan;
161 *key = msg.key;
162 error:
163 return (int) ret;
164 }
165
166 /*
167 * Cleanup the stream list of a channel. Those streams are not yet globally
168 * visible
169 */
170 static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
171 {
172 struct lttng_consumer_stream *stream, *stmp;
173
174 LTTNG_ASSERT(channel);
175
176 /* Delete streams that might have been left in the stream list. */
177 cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
178 /*
179 * Once a stream is added to this list, the buffers were created so we
180 * have a guarantee that this call will succeed. Setting the monitor
181 * mode to 0 so we don't lock nor try to delete the stream from the
182 * global hash table.
183 */
184 stream->monitor = 0;
185 consumer_stream_destroy(stream, nullptr);
186 }
187 }
188
189 /*
190 * Find a stream. The consumer_data.lock must be locked during this
191 * call.
192 */
193 static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *ht)
194 {
195 struct lttng_ht_iter iter;
196 struct lttng_ht_node_u64 *node;
197 struct lttng_consumer_stream *stream = nullptr;
198
199 LTTNG_ASSERT(ht);
200
201 /* -1ULL keys are lookup failures */
202 if (key == (uint64_t) -1ULL) {
203 return nullptr;
204 }
205
206 lttng::urcu::read_lock_guard read_lock;
207
208 lttng_ht_lookup(ht, &key, &iter);
209 node = lttng_ht_iter_get_node_u64(&iter);
210 if (node != nullptr) {
211 stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
212 }
213
214 return stream;
215 }
216
217 static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
218 {
219 struct lttng_consumer_stream *stream;
220
221 lttng::urcu::read_lock_guard read_lock;
222 stream = find_stream(key, ht);
223 if (stream) {
224 stream->key = (uint64_t) -1ULL;
225 /*
226 * We don't want the lookup to match, but we still need
227 * to iterate on this stream when iterating over the hash table. Just
228 * change the node key.
229 */
230 stream->node.key = (uint64_t) -1ULL;
231 }
232 }
233
234 /*
235 * Return a channel object for the given key.
236 *
237 * RCU read side lock MUST be acquired before calling this function and
238 * protects the channel ptr.
239 */
240 struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
241 {
242 struct lttng_ht_iter iter;
243 struct lttng_ht_node_u64 *node;
244 struct lttng_consumer_channel *channel = nullptr;
245
246 ASSERT_RCU_READ_LOCKED();
247
248 /* -1ULL keys are lookup failures */
249 if (key == (uint64_t) -1ULL) {
250 return nullptr;
251 }
252
253 lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
254 node = lttng_ht_iter_get_node_u64(&iter);
255 if (node != nullptr) {
256 channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
257 }
258
259 return channel;
260 }
261
262 /*
263 * There is a possibility that the consumer does not have enough time between
264 * the close of the channel on the session daemon and the cleanup in here thus
265 * once we have a channel add with an existing key, we know for sure that this
266 * channel will eventually get cleaned up by all streams being closed.
267 *
268 * This function just nullifies the already existing channel key.
269 */
270 static void steal_channel_key(uint64_t key)
271 {
272 struct lttng_consumer_channel *channel;
273
274 lttng::urcu::read_lock_guard read_lock;
275 channel = consumer_find_channel(key);
276 if (channel) {
277 channel->key = (uint64_t) -1ULL;
278 /*
279 * We don't want the lookup to match, but we still need to iterate on
280 * this channel when iterating over the hash table. Just change the
281 * node key.
282 */
283 channel->node.key = (uint64_t) -1ULL;
284 }
285 }
286
287 static void free_channel_rcu(struct rcu_head *head)
288 {
289 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
290 struct lttng_consumer_channel *channel =
291 lttng::utils::container_of(node, &lttng_consumer_channel::node);
292
293 switch (the_consumer_data.type) {
294 case LTTNG_CONSUMER_KERNEL:
295 break;
296 case LTTNG_CONSUMER32_UST:
297 case LTTNG_CONSUMER64_UST:
298 lttng_ustconsumer_free_channel(channel);
299 break;
300 default:
301 ERR("Unknown consumer_data type");
302 abort();
303 }
304 free(channel);
305 }
306
307 /*
308 * RCU protected relayd socket pair free.
309 */
310 static void free_relayd_rcu(struct rcu_head *head)
311 {
312 struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, &lttng_ht_node_u64::head);
313 struct consumer_relayd_sock_pair *relayd =
314 lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
315
316 /*
317 * Close all sockets. This is done in the call RCU since we don't want the
318 * socket fds to be reassigned thus potentially creating bad state of the
319 * relayd object.
320 *
321 * We do not have to lock the control socket mutex here since at this stage
322 * there is no one referencing to this relayd object.
323 */
324 (void) relayd_close(&relayd->control_sock);
325 (void) relayd_close(&relayd->data_sock);
326
327 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
328 free(relayd);
329 }
330
331 /*
332 * Destroy and free relayd socket pair object.
333 */
334 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
335 {
336 int ret;
337 struct lttng_ht_iter iter;
338
339 if (relayd == nullptr) {
340 return;
341 }
342
343 DBG("Consumer destroy and close relayd socket pair");
344
345 iter.iter.node = &relayd->node.node;
346 ret = lttng_ht_del(the_consumer_data.relayd_ht, &iter);
347 if (ret != 0) {
348 /* We assume the relayd is being or is destroyed */
349 return;
350 }
351
352 /* RCU free() call */
353 call_rcu(&relayd->node.head, free_relayd_rcu);
354 }
355
356 /*
357 * Remove a channel from the global list protected by a mutex. This function is
358 * also responsible for freeing its data structures.
359 */
360 void consumer_del_channel(struct lttng_consumer_channel *channel)
361 {
362 struct lttng_ht_iter iter;
363
364 DBG("Consumer delete channel key %" PRIu64, channel->key);
365
366 pthread_mutex_lock(&the_consumer_data.lock);
367 pthread_mutex_lock(&channel->lock);
368
369 /* Destroy streams that might have been left in the stream list. */
370 clean_channel_stream_list(channel);
371
372 if (channel->live_timer_enabled == 1) {
373 consumer_timer_live_stop(channel);
374 }
375 if (channel->monitor_timer_enabled == 1) {
376 consumer_timer_monitor_stop(channel);
377 }
378
379 /*
380 * Send a last buffer statistics sample to the session daemon
381 * to ensure it tracks the amount of data consumed by this channel.
382 */
383 sample_and_send_channel_buffer_stats(channel);
384
385 switch (the_consumer_data.type) {
386 case LTTNG_CONSUMER_KERNEL:
387 break;
388 case LTTNG_CONSUMER32_UST:
389 case LTTNG_CONSUMER64_UST:
390 lttng_ustconsumer_del_channel(channel);
391 break;
392 default:
393 ERR("Unknown consumer_data type");
394 abort();
395 goto end;
396 }
397
398 lttng_trace_chunk_put(channel->trace_chunk);
399 channel->trace_chunk = nullptr;
400
401 if (channel->is_published) {
402 int ret;
403
404 lttng::urcu::read_lock_guard read_lock;
405 iter.iter.node = &channel->node.node;
406 ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
407 LTTNG_ASSERT(!ret);
408
409 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
410 ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
411 LTTNG_ASSERT(!ret);
412 }
413
414 channel->is_deleted = true;
415 call_rcu(&channel->node.head, free_channel_rcu);
416 end:
417 pthread_mutex_unlock(&channel->lock);
418 pthread_mutex_unlock(&the_consumer_data.lock);
419 }
420
421 /*
422 * Iterate over the relayd hash table and destroy each element. Finally,
423 * destroy the whole hash table.
424 */
425 static void cleanup_relayd_ht()
426 {
427 struct lttng_ht_iter iter;
428 struct consumer_relayd_sock_pair *relayd;
429
430 {
431 lttng::urcu::read_lock_guard read_lock;
432
433 cds_lfht_for_each_entry (
434 the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
435 consumer_destroy_relayd(relayd);
436 }
437 }
438
439 lttng_ht_destroy(the_consumer_data.relayd_ht);
440 }
441
442 /*
443 * Update the end point status of all streams having the given network sequence
444 * index (relayd index).
445 *
446 * It's atomically set without having the stream mutex locked which is fine
447 * because we handle the write/read race with a pipe wakeup for each thread.
448 */
449 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
450 enum consumer_endpoint_status status)
451 {
452 struct lttng_ht_iter iter;
453 struct lttng_consumer_stream *stream;
454
455 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
456
457 lttng::urcu::read_lock_guard read_lock;
458
459 /* Let's begin with metadata */
460 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
461 if (stream->net_seq_idx == net_seq_idx) {
462 uatomic_set(&stream->endpoint_status, status);
463 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
464 }
465 }
466
467 /* Follow up by the data streams */
468 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
469 if (stream->net_seq_idx == net_seq_idx) {
470 uatomic_set(&stream->endpoint_status, status);
471 DBG("Delete flag set to data stream %d", stream->wait_fd);
472 }
473 }
474 }
475
476 /*
477 * Cleanup a relayd object by flagging every associated streams for deletion,
478 * destroying the object meaning removing it from the relayd hash table,
479 * closing the sockets and freeing the memory in a RCU call.
480 *
481 * If a local data context is available, notify the threads that the streams'
482 * state have changed.
483 */
484 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
485 {
486 uint64_t netidx;
487
488 LTTNG_ASSERT(relayd);
489
490 DBG("Cleaning up relayd object ID %" PRIu64, relayd->net_seq_idx);
491
492 /* Save the net sequence index before destroying the object */
493 netidx = relayd->net_seq_idx;
494
495 /*
496 * Delete the relayd from the relayd hash table, close the sockets and free
497 * the object in a RCU call.
498 */
499 consumer_destroy_relayd(relayd);
500
501 /* Set inactive endpoint to all streams */
502 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
503
504 /*
505 * With a local data context, notify the threads that the streams' state
506 * have changed. The write() action on the pipe acts as an "implicit"
507 * memory barrier ordering the updates of the end point status from the
508 * read of this status which happens AFTER receiving this notify.
509 */
510 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
511 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
512 }
513
514 /*
515 * Flag a relayd socket pair for destruction. Destroy it if the refcount
516 * reaches zero.
517 *
518 * RCU read side lock MUST be aquired before calling this function.
519 */
520 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
521 {
522 LTTNG_ASSERT(relayd);
523 ASSERT_RCU_READ_LOCKED();
524
525 /* Set destroy flag for this object */
526 uatomic_set(&relayd->destroy_flag, 1);
527
528 /* Destroy the relayd if refcount is 0 */
529 if (uatomic_read(&relayd->refcount) == 0) {
530 consumer_destroy_relayd(relayd);
531 }
532 }
533
534 /*
535 * Completly destroy stream from every visiable data structure and the given
536 * hash table if one.
537 *
538 * One this call returns, the stream object is not longer usable nor visible.
539 */
540 void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
541 {
542 consumer_stream_destroy(stream, ht);
543 }
544
545 /*
546 * XXX naming of del vs destroy is all mixed up.
547 */
548 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
549 {
550 consumer_stream_destroy(stream, data_ht);
551 }
552
553 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
554 {
555 consumer_stream_destroy(stream, metadata_ht);
556 }
557
558 void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
559 struct lttng_consumer_channel *channel)
560 {
561 stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size;
562 }
563
564 /*
565 * Add a stream to the global list protected by a mutex.
566 */
567 void consumer_add_data_stream(struct lttng_consumer_stream *stream)
568 {
569 struct lttng_ht *ht = data_ht;
570
571 LTTNG_ASSERT(stream);
572 LTTNG_ASSERT(ht);
573
574 DBG3("Adding consumer stream %" PRIu64, stream->key);
575
576 pthread_mutex_lock(&the_consumer_data.lock);
577 pthread_mutex_lock(&stream->chan->lock);
578 pthread_mutex_lock(&stream->chan->timer_lock);
579 pthread_mutex_lock(&stream->lock);
580 lttng::urcu::read_lock_guard read_lock;
581
582 /* Steal stream identifier to avoid having streams with the same key */
583 steal_stream_key(stream->key, ht);
584
585 lttng_ht_add_unique_u64(ht, &stream->node);
586
587 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
588
589 /*
590 * Add stream to the stream_list_ht of the consumer data. No need to steal
591 * the key since the HT does not use it and we allow to add redundant keys
592 * into this table.
593 */
594 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
595
596 /*
597 * When nb_init_stream_left reaches 0, we don't need to trigger any action
598 * in terms of destroying the associated channel, because the action that
599 * causes the count to become 0 also causes a stream to be added. The
600 * channel deletion will thus be triggered by the following removal of this
601 * stream.
602 */
603 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
604 /* Increment refcount before decrementing nb_init_stream_left */
605 cmm_smp_wmb();
606 uatomic_dec(&stream->chan->nb_init_stream_left);
607 }
608
609 /* Update consumer data once the node is inserted. */
610 the_consumer_data.stream_count++;
611 the_consumer_data.need_update = 1;
612
613 pthread_mutex_unlock(&stream->lock);
614 pthread_mutex_unlock(&stream->chan->timer_lock);
615 pthread_mutex_unlock(&stream->chan->lock);
616 pthread_mutex_unlock(&the_consumer_data.lock);
617 }
618
619 /*
620 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
621 * be acquired before calling this.
622 */
623 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
624 {
625 int ret = 0;
626 struct lttng_ht_node_u64 *node;
627 struct lttng_ht_iter iter;
628
629 LTTNG_ASSERT(relayd);
630 ASSERT_RCU_READ_LOCKED();
631
632 lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
633 node = lttng_ht_iter_get_node_u64(&iter);
634 if (node != nullptr) {
635 goto end;
636 }
637 lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
638
639 end:
640 return ret;
641 }
642
643 /*
644 * Allocate and return a consumer relayd socket.
645 */
646 static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
647 {
648 struct consumer_relayd_sock_pair *obj = nullptr;
649
650 /* net sequence index of -1 is a failure */
651 if (net_seq_idx == (uint64_t) -1ULL) {
652 goto error;
653 }
654
655 obj = zmalloc<consumer_relayd_sock_pair>();
656 if (obj == nullptr) {
657 PERROR("zmalloc relayd sock");
658 goto error;
659 }
660
661 obj->net_seq_idx = net_seq_idx;
662 obj->refcount = 0;
663 obj->destroy_flag = 0;
664 obj->control_sock.sock.fd = -1;
665 obj->data_sock.sock.fd = -1;
666 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
667 pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
668
669 error:
670 return obj;
671 }
672
673 /*
674 * Find a relayd socket pair in the global consumer data.
675 *
676 * Return the object if found else NULL.
677 * RCU read-side lock must be held across this call and while using the
678 * returned object.
679 */
680 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
681 {
682 struct lttng_ht_iter iter;
683 struct lttng_ht_node_u64 *node;
684 struct consumer_relayd_sock_pair *relayd = nullptr;
685
686 ASSERT_RCU_READ_LOCKED();
687
688 /* Negative keys are lookup failures */
689 if (key == (uint64_t) -1ULL) {
690 goto error;
691 }
692
693 lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
694 node = lttng_ht_iter_get_node_u64(&iter);
695 if (node != nullptr) {
696 relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
697 }
698
699 error:
700 return relayd;
701 }
702
703 /*
704 * Find a relayd and send the stream
705 *
706 * Returns 0 on success, < 0 on error
707 */
708 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
709 {
710 int ret = 0;
711 struct consumer_relayd_sock_pair *relayd;
712
713 LTTNG_ASSERT(stream);
714 LTTNG_ASSERT(stream->net_seq_idx != -1ULL);
715 LTTNG_ASSERT(path);
716
717 /* The stream is not metadata. Get relayd reference if exists. */
718 lttng::urcu::read_lock_guard read_lock;
719 relayd = consumer_find_relayd(stream->net_seq_idx);
720 if (relayd != nullptr) {
721 /* Add stream on the relayd */
722 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
723 ret = relayd_add_stream(&relayd->control_sock,
724 stream->name,
725 get_consumer_domain(),
726 path,
727 &stream->relayd_stream_id,
728 stream->chan->tracefile_size,
729 stream->chan->tracefile_count,
730 stream->trace_chunk);
731 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
732 if (ret < 0) {
733 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64 ".",
734 relayd->net_seq_idx);
735 lttng_consumer_cleanup_relayd(relayd);
736 goto end;
737 }
738
739 uatomic_inc(&relayd->refcount);
740 stream->sent_to_relayd = 1;
741 } else {
742 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
743 stream->key,
744 stream->net_seq_idx);
745 ret = -1;
746 goto end;
747 }
748
749 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
750 stream->name,
751 stream->key,
752 stream->net_seq_idx);
753
754 end:
755 return ret;
756 }
757
758 /*
759 * Find a relayd and send the streams sent message
760 *
761 * Returns 0 on success, < 0 on error
762 */
763 int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
764 {
765 int ret = 0;
766 struct consumer_relayd_sock_pair *relayd;
767
768 LTTNG_ASSERT(net_seq_idx != -1ULL);
769
770 /* The stream is not metadata. Get relayd reference if exists. */
771 lttng::urcu::read_lock_guard read_lock;
772 relayd = consumer_find_relayd(net_seq_idx);
773 if (relayd != nullptr) {
774 /* Add stream on the relayd */
775 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
776 ret = relayd_streams_sent(&relayd->control_sock);
777 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
778 if (ret < 0) {
779 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64 ".",
780 relayd->net_seq_idx);
781 lttng_consumer_cleanup_relayd(relayd);
782 goto end;
783 }
784 } else {
785 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", net_seq_idx);
786 ret = -1;
787 goto end;
788 }
789
790 ret = 0;
791 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
792
793 end:
794 return ret;
795 }
796
797 /*
798 * Find a relayd and close the stream
799 */
800 void close_relayd_stream(struct lttng_consumer_stream *stream)
801 {
802 struct consumer_relayd_sock_pair *relayd;
803
804 /* The stream is not metadata. Get relayd reference if exists. */
805 lttng::urcu::read_lock_guard read_lock;
806 relayd = consumer_find_relayd(stream->net_seq_idx);
807 if (relayd) {
808 consumer_stream_relayd_close(stream, relayd);
809 }
810 }
811
812 /*
813 * Handle stream for relayd transmission if the stream applies for network
814 * streaming where the net sequence index is set.
815 *
816 * Return destination file descriptor or negative value on error.
817 */
818 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
819 size_t data_size,
820 unsigned long padding,
821 struct consumer_relayd_sock_pair *relayd)
822 {
823 int outfd = -1, ret;
824 struct lttcomm_relayd_data_hdr data_hdr;
825
826 /* Safety net */
827 LTTNG_ASSERT(stream);
828 LTTNG_ASSERT(relayd);
829
830 /* Reset data header */
831 memset(&data_hdr, 0, sizeof(data_hdr));
832
833 if (stream->metadata_flag) {
834 /* Caller MUST acquire the relayd control socket lock */
835 ret = relayd_send_metadata(&relayd->control_sock, data_size);
836 if (ret < 0) {
837 goto error;
838 }
839
840 /* Metadata are always sent on the control socket. */
841 outfd = relayd->control_sock.sock.fd;
842 } else {
843 /* Set header with stream information */
844 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
845 data_hdr.data_size = htobe32(data_size);
846 data_hdr.padding_size = htobe32(padding);
847
848 /*
849 * Note that net_seq_num below is assigned with the *current* value of
850 * next_net_seq_num and only after that the next_net_seq_num will be
851 * increment. This is why when issuing a command on the relayd using
852 * this next value, 1 should always be substracted in order to compare
853 * the last seen sequence number on the relayd side to the last sent.
854 */
855 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
856 /* Other fields are zeroed previously */
857
858 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, sizeof(data_hdr));
859 if (ret < 0) {
860 goto error;
861 }
862
863 ++stream->next_net_seq_num;
864
865 /* Set to go on data socket */
866 outfd = relayd->data_sock.sock.fd;
867 }
868
869 error:
870 return outfd;
871 }
872
873 /*
874 * Write a character on the metadata poll pipe to wake the metadata thread.
875 * Returns 0 on success, -1 on error.
876 */
877 int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
878 {
879 int ret = 0;
880
881 DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", channel->name);
882 if (channel->monitor && channel->metadata_stream) {
883 const char dummy = 'c';
884 const ssize_t write_ret =
885 lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1);
886
887 if (write_ret < 1) {
888 if (errno == EWOULDBLOCK) {
889 /*
890 * This is fine, the metadata poll thread
891 * is having a hard time keeping-up, but
892 * it will eventually wake-up and consume
893 * the available data.
894 */
895 ret = 0;
896 } else {
897 PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
898 ret = -1;
899 goto end;
900 }
901 }
902 }
903
904 end:
905 return ret;
906 }
907
908 /*
909 * Trigger a dump of the metadata content. Following/during the succesful
910 * completion of this call, the metadata poll thread will start receiving
911 * metadata packets to consume.
912 *
913 * The caller must hold the channel and stream locks.
914 */
915 static int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
916 {
917 int ret;
918
919 ASSERT_LOCKED(stream->chan->lock);
920 ASSERT_LOCKED(stream->lock);
921 LTTNG_ASSERT(stream->metadata_flag);
922 LTTNG_ASSERT(stream->chan->trace_chunk);
923
924 switch (the_consumer_data.type) {
925 case LTTNG_CONSUMER_KERNEL:
926 /*
927 * Reset the position of what has been read from the
928 * metadata cache to 0 so we can dump it again.
929 */
930 ret = kernctl_metadata_cache_dump(stream->wait_fd);
931 break;
932 case LTTNG_CONSUMER32_UST:
933 case LTTNG_CONSUMER64_UST:
934 /*
935 * Reset the position pushed from the metadata cache so it
936 * will write from the beginning on the next push.
937 */
938 stream->ust_metadata_pushed = 0;
939 ret = consumer_metadata_wakeup_pipe(stream->chan);
940 break;
941 default:
942 ERR("Unknown consumer_data type");
943 abort();
944 }
945 if (ret < 0) {
946 ERR("Failed to dump the metadata cache");
947 }
948 return ret;
949 }
950
951 static int lttng_consumer_channel_set_trace_chunk(struct lttng_consumer_channel *channel,
952 struct lttng_trace_chunk *new_trace_chunk)
953 {
954 pthread_mutex_lock(&channel->lock);
955 if (channel->is_deleted) {
956 /*
957 * The channel has been logically deleted and should no longer
958 * be used. It has released its reference to its current trace
959 * chunk and should not acquire a new one.
960 *
961 * Return success as there is nothing for the caller to do.
962 */
963 goto end;
964 }
965
966 /*
967 * The acquisition of the reference cannot fail (barring
968 * a severe internal error) since a reference to the published
969 * chunk is already held by the caller.
970 */
971 if (new_trace_chunk) {
972 const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk);
973
974 LTTNG_ASSERT(acquired_reference);
975 }
976
977 lttng_trace_chunk_put(channel->trace_chunk);
978 channel->trace_chunk = new_trace_chunk;
979 end:
980 pthread_mutex_unlock(&channel->lock);
981 return 0;
982 }
983
984 /*
985 * Allocate and return a new lttng_consumer_channel object using the given key
986 * to initialize the hash table node.
987 *
988 * On error, return NULL.
989 */
990 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
991 uint64_t session_id,
992 const uint64_t *chunk_id,
993 const char *pathname,
994 const char *name,
995 uint64_t relayd_id,
996 enum lttng_event_output output,
997 uint64_t tracefile_size,
998 uint64_t tracefile_count,
999 uint64_t session_id_per_pid,
1000 unsigned int monitor,
1001 unsigned int live_timer_interval,
1002 bool is_in_live_session,
1003 const char *root_shm_path,
1004 const char *shm_path)
1005 {
1006 struct lttng_consumer_channel *channel = nullptr;
1007 struct lttng_trace_chunk *trace_chunk = nullptr;
1008
1009 if (chunk_id) {
1010 trace_chunk = lttng_trace_chunk_registry_find_chunk(
1011 the_consumer_data.chunk_registry, session_id, *chunk_id);
1012 if (!trace_chunk) {
1013 ERR("Failed to find trace chunk reference during creation of channel");
1014 goto end;
1015 }
1016 }
1017
1018 channel = zmalloc<lttng_consumer_channel>();
1019 if (channel == nullptr) {
1020 PERROR("malloc struct lttng_consumer_channel");
1021 goto end;
1022 }
1023
1024 channel->key = key;
1025 channel->refcount = 0;
1026 channel->session_id = session_id;
1027 channel->session_id_per_pid = session_id_per_pid;
1028 channel->relayd_id = relayd_id;
1029 channel->tracefile_size = tracefile_size;
1030 channel->tracefile_count = tracefile_count;
1031 channel->monitor = monitor;
1032 channel->live_timer_interval = live_timer_interval;
1033 channel->is_live = is_in_live_session;
1034 pthread_mutex_init(&channel->lock, nullptr);
1035 pthread_mutex_init(&channel->timer_lock, nullptr);
1036
1037 switch (output) {
1038 case LTTNG_EVENT_SPLICE:
1039 channel->output = CONSUMER_CHANNEL_SPLICE;
1040 break;
1041 case LTTNG_EVENT_MMAP:
1042 channel->output = CONSUMER_CHANNEL_MMAP;
1043 break;
1044 default:
1045 abort();
1046 free(channel);
1047 channel = nullptr;
1048 goto end;
1049 }
1050
1051 /*
1052 * In monitor mode, the streams associated with the channel will be put in
1053 * a special list ONLY owned by this channel. So, the refcount is set to 1
1054 * here meaning that the channel itself has streams that are referenced.
1055 *
1056 * On a channel deletion, once the channel is no longer visible, the
1057 * refcount is decremented and checked for a zero value to delete it. With
1058 * streams in no monitor mode, it will now be safe to destroy the channel.
1059 */
1060 if (!channel->monitor) {
1061 channel->refcount = 1;
1062 }
1063
1064 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1065 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1066
1067 strncpy(channel->name, name, sizeof(channel->name));
1068 channel->name[sizeof(channel->name) - 1] = '\0';
1069
1070 if (root_shm_path) {
1071 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1072 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1073 }
1074 if (shm_path) {
1075 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1076 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1077 }
1078
1079 lttng_ht_node_init_u64(&channel->node, channel->key);
1080 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node, channel->session_id);
1081
1082 channel->wait_fd = -1;
1083 CDS_INIT_LIST_HEAD(&channel->streams.head);
1084
1085 if (trace_chunk) {
1086 int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
1087 if (ret) {
1088 goto error;
1089 }
1090 }
1091
1092 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
1093
1094 end:
1095 lttng_trace_chunk_put(trace_chunk);
1096 return channel;
1097 error:
1098 consumer_del_channel(channel);
1099 channel = nullptr;
1100 goto end;
1101 }
1102
1103 /*
1104 * Add a channel to the global list protected by a mutex.
1105 *
1106 * Always return 0 indicating success.
1107 */
1108 int consumer_add_channel(struct lttng_consumer_channel *channel,
1109 struct lttng_consumer_local_data *ctx)
1110 {
1111 pthread_mutex_lock(&the_consumer_data.lock);
1112 pthread_mutex_lock(&channel->lock);
1113 pthread_mutex_lock(&channel->timer_lock);
1114
1115 /*
1116 * This gives us a guarantee that the channel we are about to add to the
1117 * channel hash table will be unique. See this function comment on the why
1118 * we need to steel the channel key at this stage.
1119 */
1120 steal_channel_key(channel->key);
1121
1122 lttng::urcu::read_lock_guard read_lock;
1123 lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
1124 lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
1125 &channel->channels_by_session_id_ht_node);
1126 channel->is_published = true;
1127
1128 pthread_mutex_unlock(&channel->timer_lock);
1129 pthread_mutex_unlock(&channel->lock);
1130 pthread_mutex_unlock(&the_consumer_data.lock);
1131
1132 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
1133 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
1134 }
1135
1136 return 0;
1137 }
1138
1139 /*
1140 * Allocate the pollfd structure and the local view of the out fds to avoid
1141 * doing a lookup in the linked list and concurrency issues when writing is
1142 * needed. Called with consumer_data.lock held.
1143 *
1144 * Returns the number of fds in the structures.
1145 */
1146 static int update_poll_array(struct lttng_consumer_local_data *ctx,
1147 struct pollfd **pollfd,
1148 struct lttng_consumer_stream **local_stream,
1149 struct lttng_ht *ht,
1150 int *nb_inactive_fd)
1151 {
1152 int i = 0;
1153 struct lttng_ht_iter iter;
1154 struct lttng_consumer_stream *stream;
1155
1156 LTTNG_ASSERT(ctx);
1157 LTTNG_ASSERT(ht);
1158 LTTNG_ASSERT(pollfd);
1159 LTTNG_ASSERT(local_stream);
1160
1161 DBG("Updating poll fd array");
1162 *nb_inactive_fd = 0;
1163
1164 {
1165 lttng::urcu::read_lock_guard read_lock;
1166 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1167 /*
1168 * Only active streams with an active end point can be added to the
1169 * poll set and local stream storage of the thread.
1170 *
1171 * There is a potential race here for endpoint_status to be updated
1172 * just after the check. However, this is OK since the stream(s) will
1173 * be deleted once the thread is notified that the end point state has
1174 * changed where this function will be called back again.
1175 *
1176 * We track the number of inactive FDs because they still need to be
1177 * closed by the polling thread after a wakeup on the data_pipe or
1178 * metadata_pipe.
1179 */
1180 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1181 (*nb_inactive_fd)++;
1182 continue;
1183 }
1184
1185 (*pollfd)[i].fd = stream->wait_fd;
1186 (*pollfd)[i].events = POLLIN | POLLPRI;
1187 local_stream[i] = stream;
1188 i++;
1189 }
1190 }
1191
1192 /*
1193 * Insert the consumer_data_pipe at the end of the array and don't
1194 * increment i so nb_fd is the number of real FD.
1195 */
1196 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
1197 (*pollfd)[i].events = POLLIN | POLLPRI;
1198
1199 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1200 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
1201 return i;
1202 }
1203
1204 /*
1205 * Poll on the should_quit pipe and the command socket return -1 on
1206 * error, 1 if should exit, 0 if data is available on the command socket
1207 */
1208 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1209 {
1210 int num_rdy;
1211
1212 restart:
1213 num_rdy = poll(consumer_sockpoll, 2, -1);
1214 if (num_rdy == -1) {
1215 /*
1216 * Restart interrupted system call.
1217 */
1218 if (errno == EINTR) {
1219 goto restart;
1220 }
1221 PERROR("Poll error");
1222 return -1;
1223 }
1224 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
1225 DBG("consumer_should_quit wake up");
1226 return 1;
1227 }
1228 return 0;
1229 }
1230
1231 /*
1232 * Set the error socket.
1233 */
1234 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock)
1235 {
1236 ctx->consumer_error_socket = sock;
1237 }
1238
1239 /*
1240 * Set the command socket path.
1241 */
1242 void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock)
1243 {
1244 ctx->consumer_command_sock_path = sock;
1245 }
1246
1247 /*
1248 * Send return code to the session daemon.
1249 * If the socket is not defined, we return 0, it is not a fatal error
1250 */
1251 int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
1252 {
1253 if (ctx->consumer_error_socket > 0) {
1254 return lttcomm_send_unix_sock(
1255 ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
1256 }
1257
1258 return 0;
1259 }
1260
1261 /*
1262 * Close all the tracefiles and stream fds and MUST be called when all
1263 * instances are destroyed i.e. when all threads were joined and are ended.
1264 */
1265 void lttng_consumer_cleanup()
1266 {
1267 struct lttng_ht_iter iter;
1268 struct lttng_consumer_channel *channel;
1269 unsigned int trace_chunks_left;
1270
1271 {
1272 lttng::urcu::read_lock_guard read_lock;
1273
1274 cds_lfht_for_each_entry (
1275 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
1276 consumer_del_channel(channel);
1277 }
1278 }
1279
1280 lttng_ht_destroy(the_consumer_data.channel_ht);
1281 lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
1282
1283 cleanup_relayd_ht();
1284
1285 lttng_ht_destroy(the_consumer_data.stream_per_chan_id_ht);
1286
1287 /*
1288 * This HT contains streams that are freed by either the metadata thread or
1289 * the data thread so we do *nothing* on the hash table and simply destroy
1290 * it.
1291 */
1292 lttng_ht_destroy(the_consumer_data.stream_list_ht);
1293
1294 /*
1295 * Trace chunks in the registry may still exist if the session
1296 * daemon has encountered an internal error and could not
1297 * tear down its sessions and/or trace chunks properly.
1298 *
1299 * Release the session daemon's implicit reference to any remaining
1300 * trace chunk and print an error if any trace chunk was found. Note
1301 * that there are _no_ legitimate cases for trace chunks to be left,
1302 * it is a leak. However, it can happen following a crash of the
1303 * session daemon and not emptying the registry would cause an assertion
1304 * to hit.
1305 */
1306 trace_chunks_left =
1307 lttng_trace_chunk_registry_put_each_chunk(the_consumer_data.chunk_registry);
1308 if (trace_chunks_left) {
1309 ERR("%u trace chunks are leaked by lttng-consumerd. "
1310 "This can be caused by an internal error of the session daemon.",
1311 trace_chunks_left);
1312 }
1313 /* Run all callbacks freeing each chunk. */
1314 rcu_barrier();
1315 lttng_trace_chunk_registry_destroy(the_consumer_data.chunk_registry);
1316 }
1317
1318 /*
1319 * Called from signal handler.
1320 */
1321 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1322 {
1323 ssize_t ret;
1324
1325 CMM_STORE_SHARED(consumer_quit, 1);
1326 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1327 if (ret < 1) {
1328 PERROR("write consumer quit");
1329 }
1330
1331 DBG("Consumer flag that it should quit");
1332 }
1333
1334 /*
1335 * Flush pending writes to trace output disk file.
1336 */
1337 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
1338 {
1339 int ret;
1340 int outfd = stream->out_fd;
1341
1342 /*
1343 * This does a blocking write-and-wait on any page that belongs to the
1344 * subbuffer prior to the one we just wrote.
1345 * Don't care about error values, as these are just hints and ways to
1346 * limit the amount of page cache used.
1347 */
1348 if (orig_offset < stream->max_sb_size) {
1349 return;
1350 }
1351 lttng_sync_file_range(outfd,
1352 orig_offset - stream->max_sb_size,
1353 stream->max_sb_size,
1354 SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
1355 SYNC_FILE_RANGE_WAIT_AFTER);
1356 /*
1357 * Give hints to the kernel about how we access the file:
1358 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1359 * we write it.
1360 *
1361 * We need to call fadvise again after the file grows because the
1362 * kernel does not seem to apply fadvise to non-existing parts of the
1363 * file.
1364 *
1365 * Call fadvise _after_ having waited for the page writeback to
1366 * complete because the dirty page writeback semantic is not well
1367 * defined. So it can be expected to lead to lower throughput in
1368 * streaming.
1369 */
1370 ret = posix_fadvise(
1371 outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
1372 if (ret && ret != -ENOSYS) {
1373 errno = ret;
1374 PERROR("posix_fadvise on fd %i", outfd);
1375 }
1376 }
1377
1378 /*
1379 * Initialise the necessary environnement :
1380 * - create a new context
1381 * - create the poll_pipe
1382 * - create the should_quit pipe (for signal handler)
1383 * - create the thread pipe (for splice)
1384 *
1385 * Takes a function pointer as argument, this function is called when data is
1386 * available on a buffer. This function is responsible to do the
1387 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1388 * buffer configuration and then kernctl_put_next_subbuf at the end.
1389 *
1390 * Returns a pointer to the new context or NULL on error.
1391 */
1392 struct lttng_consumer_local_data *
1393 lttng_consumer_create(enum lttng_consumer_type type,
1394 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1395 struct lttng_consumer_local_data *ctx,
1396 bool locked_by_caller),
1397 int (*recv_channel)(struct lttng_consumer_channel *channel),
1398 int (*recv_stream)(struct lttng_consumer_stream *stream),
1399 int (*update_stream)(uint64_t stream_key, uint32_t state))
1400 {
1401 int ret;
1402 struct lttng_consumer_local_data *ctx;
1403
1404 LTTNG_ASSERT(the_consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1405 the_consumer_data.type == type);
1406 the_consumer_data.type = type;
1407
1408 ctx = zmalloc<lttng_consumer_local_data>();
1409 if (ctx == nullptr) {
1410 PERROR("allocating context");
1411 goto error;
1412 }
1413
1414 ctx->consumer_error_socket = -1;
1415 ctx->consumer_metadata_socket = -1;
1416 pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
1417 /* assign the callbacks */
1418 ctx->on_buffer_ready = buffer_ready;
1419 ctx->on_recv_channel = recv_channel;
1420 ctx->on_recv_stream = recv_stream;
1421 ctx->on_update_stream = update_stream;
1422
1423 ctx->consumer_data_pipe = lttng_pipe_open(0);
1424 if (!ctx->consumer_data_pipe) {
1425 goto error_poll_pipe;
1426 }
1427
1428 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1429 if (!ctx->consumer_wakeup_pipe) {
1430 goto error_wakeup_pipe;
1431 }
1432
1433 ret = pipe(ctx->consumer_should_quit);
1434 if (ret < 0) {
1435 PERROR("Error creating recv pipe");
1436 goto error_quit_pipe;
1437 }
1438
1439 ret = pipe(ctx->consumer_channel_pipe);
1440 if (ret < 0) {
1441 PERROR("Error creating channel pipe");
1442 goto error_channel_pipe;
1443 }
1444
1445 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1446 if (!ctx->consumer_metadata_pipe) {
1447 goto error_metadata_pipe;
1448 }
1449
1450 ctx->channel_monitor_pipe = -1;
1451
1452 return ctx;
1453
1454 error_metadata_pipe:
1455 utils_close_pipe(ctx->consumer_channel_pipe);
1456 error_channel_pipe:
1457 utils_close_pipe(ctx->consumer_should_quit);
1458 error_quit_pipe:
1459 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1460 error_wakeup_pipe:
1461 lttng_pipe_destroy(ctx->consumer_data_pipe);
1462 error_poll_pipe:
1463 free(ctx);
1464 error:
1465 return nullptr;
1466 }
1467
1468 /*
1469 * Iterate over all streams of the hashtable and free them properly.
1470 */
1471 static void destroy_data_stream_ht(struct lttng_ht *ht)
1472 {
1473 struct lttng_ht_iter iter;
1474 struct lttng_consumer_stream *stream;
1475
1476 if (ht == nullptr) {
1477 return;
1478 }
1479
1480 {
1481 lttng::urcu::read_lock_guard read_lock;
1482 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1483 /*
1484 * Ignore return value since we are currently cleaning up so any error
1485 * can't be handled.
1486 */
1487 (void) consumer_del_stream(stream, ht);
1488 }
1489 }
1490
1491 lttng_ht_destroy(ht);
1492 }
1493
1494 /*
1495 * Iterate over all streams of the metadata hashtable and free them
1496 * properly.
1497 */
1498 static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1499 {
1500 struct lttng_ht_iter iter;
1501 struct lttng_consumer_stream *stream;
1502
1503 if (ht == nullptr) {
1504 return;
1505 }
1506
1507 {
1508 lttng::urcu::read_lock_guard read_lock;
1509 cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
1510 /*
1511 * Ignore return value since we are currently cleaning up so any error
1512 * can't be handled.
1513 */
1514 (void) consumer_del_metadata_stream(stream, ht);
1515 }
1516 }
1517
1518 lttng_ht_destroy(ht);
1519 }
1520
1521 /*
1522 * Close all fds associated with the instance and free the context.
1523 */
1524 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1525 {
1526 int ret;
1527
1528 DBG("Consumer destroying it. Closing everything.");
1529
1530 if (!ctx) {
1531 return;
1532 }
1533
1534 destroy_data_stream_ht(data_ht);
1535 destroy_metadata_stream_ht(metadata_ht);
1536
1537 ret = close(ctx->consumer_error_socket);
1538 if (ret) {
1539 PERROR("close");
1540 }
1541 ret = close(ctx->consumer_metadata_socket);
1542 if (ret) {
1543 PERROR("close");
1544 }
1545 utils_close_pipe(ctx->consumer_channel_pipe);
1546 lttng_pipe_destroy(ctx->consumer_data_pipe);
1547 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1548 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1549 utils_close_pipe(ctx->consumer_should_quit);
1550
1551 unlink(ctx->consumer_command_sock_path);
1552 free(ctx);
1553 }
1554
1555 /*
1556 * Write the metadata stream id on the specified file descriptor.
1557 */
1558 static int
1559 write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, unsigned long padding)
1560 {
1561 ssize_t ret;
1562 struct lttcomm_relayd_metadata_payload hdr;
1563
1564 hdr.stream_id = htobe64(stream->relayd_stream_id);
1565 hdr.padding_size = htobe32(padding);
1566 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1567 if (ret < sizeof(hdr)) {
1568 /*
1569 * This error means that the fd's end is closed so ignore the PERROR
1570 * not to clubber the error output since this can happen in a normal
1571 * code path.
1572 */
1573 if (errno != EPIPE) {
1574 PERROR("write metadata stream id");
1575 }
1576 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
1577 /*
1578 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1579 * handle writting the missing part so report that as an error and
1580 * don't lie to the caller.
1581 */
1582 ret = -1;
1583 goto end;
1584 }
1585 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1586 stream->relayd_stream_id,
1587 padding);
1588
1589 end:
1590 return (int) ret;
1591 }
1592
1593 /*
1594 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1595 * core function for writing trace buffers to either the local filesystem or
1596 * the network.
1597 *
1598 * It must be called with the stream and the channel lock held.
1599 *
1600 * Careful review MUST be put if any changes occur!
1601 *
1602 * Returns the number of bytes written
1603 */
1604 ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
1605 const struct lttng_buffer_view *buffer,
1606 unsigned long padding)
1607 {
1608 ssize_t ret = 0;
1609 off_t orig_offset = stream->out_fd_offset;
1610 /* Default is on the disk */
1611 int outfd = stream->out_fd;
1612 struct consumer_relayd_sock_pair *relayd = nullptr;
1613 unsigned int relayd_hang_up = 0;
1614 const size_t subbuf_content_size = buffer->size - padding;
1615 size_t write_len;
1616
1617 /* RCU lock for the relayd pointer */
1618 lttng::urcu::read_lock_guard read_lock;
1619 LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
1620
1621 /* Flag that the current stream if set for network streaming. */
1622 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1623 relayd = consumer_find_relayd(stream->net_seq_idx);
1624 if (relayd == nullptr) {
1625 ret = -EPIPE;
1626 goto end;
1627 }
1628 }
1629
1630 /* Handle stream on the relayd if the output is on the network */
1631 if (relayd) {
1632 unsigned long netlen = subbuf_content_size;
1633
1634 /*
1635 * Lock the control socket for the complete duration of the function
1636 * since from this point on we will use the socket.
1637 */
1638 if (stream->metadata_flag) {
1639 /* Metadata requires the control socket. */
1640 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1641 if (stream->reset_metadata_flag) {
1642 ret = relayd_reset_metadata(&relayd->control_sock,
1643 stream->relayd_stream_id,
1644 stream->metadata_version);
1645 if (ret < 0) {
1646 relayd_hang_up = 1;
1647 goto write_error;
1648 }
1649 stream->reset_metadata_flag = 0;
1650 }
1651 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1652 }
1653
1654 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1655 if (ret < 0) {
1656 relayd_hang_up = 1;
1657 goto write_error;
1658 }
1659 /* Use the returned socket. */
1660 outfd = ret;
1661
1662 /* Write metadata stream id before payload */
1663 if (stream->metadata_flag) {
1664 ret = write_relayd_metadata_id(outfd, stream, padding);
1665 if (ret < 0) {
1666 relayd_hang_up = 1;
1667 goto write_error;
1668 }
1669 }
1670
1671 write_len = subbuf_content_size;
1672 } else {
1673 /* No streaming; we have to write the full padding. */
1674 if (stream->metadata_flag && stream->reset_metadata_flag) {
1675 ret = utils_truncate_stream_file(stream->out_fd, 0);
1676 if (ret < 0) {
1677 ERR("Reset metadata file");
1678 goto end;
1679 }
1680 stream->reset_metadata_flag = 0;
1681 }
1682
1683 /*
1684 * Check if we need to change the tracefile before writing the packet.
1685 */
1686 if (stream->chan->tracefile_size > 0 &&
1687 (stream->tracefile_size_current + buffer->size) >
1688 stream->chan->tracefile_size) {
1689 ret = consumer_stream_rotate_output_files(stream);
1690 if (ret) {
1691 goto end;
1692 }
1693 outfd = stream->out_fd;
1694 orig_offset = 0;
1695 }
1696 stream->tracefile_size_current += buffer->size;
1697 write_len = buffer->size;
1698 }
1699
1700 /*
1701 * This call guarantee that len or less is returned. It's impossible to
1702 * receive a ret value that is bigger than len.
1703 */
1704 ret = lttng_write(outfd, buffer->data, write_len);
1705 DBG("Consumer mmap write() ret %zd (len %zu)", ret, write_len);
1706 if (ret < 0 || ((size_t) ret != write_len)) {
1707 /*
1708 * Report error to caller if nothing was written else at least send the
1709 * amount written.
1710 */
1711 if (ret < 0) {
1712 ret = -errno;
1713 }
1714 relayd_hang_up = 1;
1715
1716 /* Socket operation failed. We consider the relayd dead */
1717 if (errno == EPIPE) {
1718 /*
1719 * This is possible if the fd is closed on the other side
1720 * (outfd) or any write problem. It can be verbose a bit for a
1721 * normal execution if for instance the relayd is stopped
1722 * abruptly. This can happen so set this to a DBG statement.
1723 */
1724 DBG("Consumer mmap write detected relayd hang up");
1725 } else {
1726 /* Unhandled error, print it and stop function right now. */
1727 PERROR("Error in write mmap (ret %zd != write_len %zu)", ret, write_len);
1728 }
1729 goto write_error;
1730 }
1731 stream->output_written += ret;
1732
1733 /* This call is useless on a socket so better save a syscall. */
1734 if (!relayd) {
1735 /* This won't block, but will start writeout asynchronously */
1736 lttng_sync_file_range(
1737 outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
1738 stream->out_fd_offset += write_len;
1739 lttng_consumer_sync_trace_file(stream, orig_offset);
1740 }
1741
1742 write_error:
1743 /*
1744 * This is a special case that the relayd has closed its socket. Let's
1745 * cleanup the relayd object and all associated streams.
1746 */
1747 if (relayd && relayd_hang_up) {
1748 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
1749 lttng_consumer_cleanup_relayd(relayd);
1750 }
1751
1752 end:
1753 /* Unlock only if ctrl socket used */
1754 if (relayd && stream->metadata_flag) {
1755 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1756 }
1757
1758 return ret;
1759 }
1760
1761 /*
1762 * Splice the data from the ring buffer to the tracefile.
1763 *
1764 * It must be called with the stream lock held.
1765 *
1766 * Returns the number of bytes spliced.
1767 */
1768 ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
1769 struct lttng_consumer_stream *stream,
1770 unsigned long len,
1771 unsigned long padding)
1772 {
1773 ssize_t ret = 0, written = 0, ret_splice = 0;
1774 loff_t offset = 0;
1775 off_t orig_offset = stream->out_fd_offset;
1776 int fd = stream->wait_fd;
1777 /* Default is on the disk */
1778 int outfd = stream->out_fd;
1779 struct consumer_relayd_sock_pair *relayd = nullptr;
1780 int *splice_pipe;
1781 unsigned int relayd_hang_up = 0;
1782
1783 switch (the_consumer_data.type) {
1784 case LTTNG_CONSUMER_KERNEL:
1785 break;
1786 case LTTNG_CONSUMER32_UST:
1787 case LTTNG_CONSUMER64_UST:
1788 /* Not supported for user space tracing */
1789 return -ENOSYS;
1790 default:
1791 ERR("Unknown consumer_data type");
1792 abort();
1793 }
1794
1795 /* RCU lock for the relayd pointer */
1796 lttng::urcu::read_lock_guard read_lock;
1797
1798 /* Flag that the current stream if set for network streaming. */
1799 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1800 relayd = consumer_find_relayd(stream->net_seq_idx);
1801 if (relayd == nullptr) {
1802 written = -ret;
1803 goto end;
1804 }
1805 }
1806 splice_pipe = stream->splice_pipe;
1807
1808 /* Write metadata stream id before payload */
1809 if (relayd) {
1810 unsigned long total_len = len;
1811
1812 if (stream->metadata_flag) {
1813 /*
1814 * Lock the control socket for the complete duration of the function
1815 * since from this point on we will use the socket.
1816 */
1817 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1818
1819 if (stream->reset_metadata_flag) {
1820 ret = relayd_reset_metadata(&relayd->control_sock,
1821 stream->relayd_stream_id,
1822 stream->metadata_version);
1823 if (ret < 0) {
1824 relayd_hang_up = 1;
1825 goto write_error;
1826 }
1827 stream->reset_metadata_flag = 0;
1828 }
1829 ret = write_relayd_metadata_id(splice_pipe[1], stream, padding);
1830 if (ret < 0) {
1831 written = ret;
1832 relayd_hang_up = 1;
1833 goto write_error;
1834 }
1835
1836 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1837 }
1838
1839 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1840 if (ret < 0) {
1841 written = ret;
1842 relayd_hang_up = 1;
1843 goto write_error;
1844 }
1845 /* Use the returned socket. */
1846 outfd = ret;
1847 } else {
1848 /* No streaming, we have to set the len with the full padding */
1849 len += padding;
1850
1851 if (stream->metadata_flag && stream->reset_metadata_flag) {
1852 ret = utils_truncate_stream_file(stream->out_fd, 0);
1853 if (ret < 0) {
1854 ERR("Reset metadata file");
1855 goto end;
1856 }
1857 stream->reset_metadata_flag = 0;
1858 }
1859 /*
1860 * Check if we need to change the tracefile before writing the packet.
1861 */
1862 if (stream->chan->tracefile_size > 0 &&
1863 (stream->tracefile_size_current + len) > stream->chan->tracefile_size) {
1864 ret = consumer_stream_rotate_output_files(stream);
1865 if (ret < 0) {
1866 written = ret;
1867 goto end;
1868 }
1869 outfd = stream->out_fd;
1870 orig_offset = 0;
1871 }
1872 stream->tracefile_size_current += len;
1873 }
1874
1875 while (len > 0) {
1876 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1877 (unsigned long) offset,
1878 len,
1879 fd,
1880 splice_pipe[1]);
1881 ret_splice = splice(
1882 fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
1883 DBG("splice chan to pipe, ret %zd", ret_splice);
1884 if (ret_splice < 0) {
1885 ret = errno;
1886 written = -ret;
1887 PERROR("Error in relay splice");
1888 goto splice_error;
1889 }
1890
1891 /* Handle stream on the relayd if the output is on the network */
1892 if (relayd && stream->metadata_flag) {
1893 size_t metadata_payload_size =
1894 sizeof(struct lttcomm_relayd_metadata_payload);
1895
1896 /* Update counter to fit the spliced data */
1897 ret_splice += metadata_payload_size;
1898 len += metadata_payload_size;
1899 /*
1900 * We do this so the return value can match the len passed as
1901 * argument to this function.
1902 */
1903 written -= metadata_payload_size;
1904 }
1905
1906 /* Splice data out */
1907 ret_splice = splice(splice_pipe[0],
1908 nullptr,
1909 outfd,
1910 nullptr,
1911 ret_splice,
1912 SPLICE_F_MOVE | SPLICE_F_MORE);
1913 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
1914 if (ret_splice < 0) {
1915 ret = errno;
1916 written = -ret;
1917 relayd_hang_up = 1;
1918 goto write_error;
1919 } else if (ret_splice > len) {
1920 /*
1921 * We don't expect this code path to be executed but you never know
1922 * so this is an extra protection agains a buggy splice().
1923 */
1924 ret = errno;
1925 written += ret_splice;
1926 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len);
1927 goto splice_error;
1928 } else {
1929 /* All good, update current len and continue. */
1930 len -= ret_splice;
1931 }
1932
1933 /* This call is useless on a socket so better save a syscall. */
1934 if (!relayd) {
1935 /* This won't block, but will start writeout asynchronously */
1936 lttng_sync_file_range(
1937 outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
1938 stream->out_fd_offset += ret_splice;
1939 }
1940 stream->output_written += ret_splice;
1941 written += ret_splice;
1942 }
1943 if (!relayd) {
1944 lttng_consumer_sync_trace_file(stream, orig_offset);
1945 }
1946 goto end;
1947
1948 write_error:
1949 /*
1950 * This is a special case that the relayd has closed its socket. Let's
1951 * cleanup the relayd object and all associated streams.
1952 */
1953 if (relayd && relayd_hang_up) {
1954 ERR("Relayd hangup. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
1955 lttng_consumer_cleanup_relayd(relayd);
1956 /* Skip splice error so the consumer does not fail */
1957 goto end;
1958 }
1959
1960 splice_error:
1961 /* send the appropriate error description to sessiond */
1962 switch (ret) {
1963 case EINVAL:
1964 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
1965 break;
1966 case ENOMEM:
1967 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
1968 break;
1969 case ESPIPE:
1970 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
1971 break;
1972 }
1973
1974 end:
1975 if (relayd && stream->metadata_flag) {
1976 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1977 }
1978
1979 return written;
1980 }
1981
1982 /*
1983 * Sample the snapshot positions for a specific fd
1984 *
1985 * Returns 0 on success, < 0 on error
1986 */
1987 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
1988 {
1989 switch (the_consumer_data.type) {
1990 case LTTNG_CONSUMER_KERNEL:
1991 return lttng_kconsumer_sample_snapshot_positions(stream);
1992 case LTTNG_CONSUMER32_UST:
1993 case LTTNG_CONSUMER64_UST:
1994 return lttng_ustconsumer_sample_snapshot_positions(stream);
1995 default:
1996 ERR("Unknown consumer_data type");
1997 abort();
1998 return -ENOSYS;
1999 }
2000 }
2001 /*
2002 * Take a snapshot for a specific fd
2003 *
2004 * Returns 0 on success, < 0 on error
2005 */
2006 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
2007 {
2008 switch (the_consumer_data.type) {
2009 case LTTNG_CONSUMER_KERNEL:
2010 return lttng_kconsumer_take_snapshot(stream);
2011 case LTTNG_CONSUMER32_UST:
2012 case LTTNG_CONSUMER64_UST:
2013 return lttng_ustconsumer_take_snapshot(stream);
2014 default:
2015 ERR("Unknown consumer_data type");
2016 abort();
2017 return -ENOSYS;
2018 }
2019 }
2020
2021 /*
2022 * Get the produced position
2023 *
2024 * Returns 0 on success, < 0 on error
2025 */
2026 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
2027 {
2028 switch (the_consumer_data.type) {
2029 case LTTNG_CONSUMER_KERNEL:
2030 return lttng_kconsumer_get_produced_snapshot(stream, pos);
2031 case LTTNG_CONSUMER32_UST:
2032 case LTTNG_CONSUMER64_UST:
2033 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
2034 default:
2035 ERR("Unknown consumer_data type");
2036 abort();
2037 return -ENOSYS;
2038 }
2039 }
2040
2041 /*
2042 * Get the consumed position (free-running counter position in bytes).
2043 *
2044 * Returns 0 on success, < 0 on error
2045 */
2046 int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
2047 {
2048 switch (the_consumer_data.type) {
2049 case LTTNG_CONSUMER_KERNEL:
2050 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2051 case LTTNG_CONSUMER32_UST:
2052 case LTTNG_CONSUMER64_UST:
2053 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2054 default:
2055 ERR("Unknown consumer_data type");
2056 abort();
2057 return -ENOSYS;
2058 }
2059 }
2060
2061 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
2062 int sock,
2063 struct pollfd *consumer_sockpoll)
2064 {
2065 switch (the_consumer_data.type) {
2066 case LTTNG_CONSUMER_KERNEL:
2067 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2068 case LTTNG_CONSUMER32_UST:
2069 case LTTNG_CONSUMER64_UST:
2070 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2071 default:
2072 ERR("Unknown consumer_data type");
2073 abort();
2074 return -ENOSYS;
2075 }
2076 }
2077
2078 static void lttng_consumer_close_all_metadata()
2079 {
2080 switch (the_consumer_data.type) {
2081 case LTTNG_CONSUMER_KERNEL:
2082 /*
2083 * The Kernel consumer has a different metadata scheme so we don't
2084 * close anything because the stream will be closed by the session
2085 * daemon.
2086 */
2087 break;
2088 case LTTNG_CONSUMER32_UST:
2089 case LTTNG_CONSUMER64_UST:
2090 /*
2091 * Close all metadata streams. The metadata hash table is passed and
2092 * this call iterates over it by closing all wakeup fd. This is safe
2093 * because at this point we are sure that the metadata producer is
2094 * either dead or blocked.
2095 */
2096 lttng_ustconsumer_close_all_metadata(metadata_ht);
2097 break;
2098 default:
2099 ERR("Unknown consumer_data type");
2100 abort();
2101 }
2102 }
2103
2104 /*
2105 * Clean up a metadata stream and free its memory.
2106 */
2107 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
2108 {
2109 struct lttng_consumer_channel *channel = nullptr;
2110 bool free_channel = false;
2111
2112 LTTNG_ASSERT(stream);
2113 /*
2114 * This call should NEVER receive regular stream. It must always be
2115 * metadata stream and this is crucial for data structure synchronization.
2116 */
2117 LTTNG_ASSERT(stream->metadata_flag);
2118
2119 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2120
2121 pthread_mutex_lock(&the_consumer_data.lock);
2122 /*
2123 * Note that this assumes that a stream's channel is never changed and
2124 * that the stream's lock doesn't need to be taken to sample its
2125 * channel.
2126 */
2127 channel = stream->chan;
2128 pthread_mutex_lock(&channel->lock);
2129 pthread_mutex_lock(&stream->lock);
2130 if (channel->metadata_cache) {
2131 /* Only applicable to userspace consumers. */
2132 pthread_mutex_lock(&channel->metadata_cache->lock);
2133 }
2134
2135 /* Remove any reference to that stream. */
2136 consumer_stream_delete(stream, ht);
2137
2138 /* Close down everything including the relayd if one. */
2139 consumer_stream_close_output(stream);
2140 /* Destroy tracer buffers of the stream. */
2141 consumer_stream_destroy_buffers(stream);
2142
2143 /* Atomically decrement channel refcount since other threads can use it. */
2144 if (!uatomic_sub_return(&channel->refcount, 1) &&
2145 !uatomic_read(&channel->nb_init_stream_left)) {
2146 /* Go for channel deletion! */
2147 free_channel = true;
2148 }
2149 stream->chan = nullptr;
2150
2151 /*
2152 * Nullify the stream reference so it is not used after deletion. The
2153 * channel lock MUST be acquired before being able to check for a NULL
2154 * pointer value.
2155 */
2156 channel->metadata_stream = nullptr;
2157
2158 if (channel->metadata_cache) {
2159 pthread_mutex_unlock(&channel->metadata_cache->lock);
2160 }
2161 pthread_mutex_unlock(&stream->lock);
2162 pthread_mutex_unlock(&channel->lock);
2163 pthread_mutex_unlock(&the_consumer_data.lock);
2164
2165 if (free_channel) {
2166 consumer_del_channel(channel);
2167 }
2168
2169 lttng_trace_chunk_put(stream->trace_chunk);
2170 stream->trace_chunk = nullptr;
2171 consumer_stream_free(stream);
2172 }
2173
2174 /*
2175 * Action done with the metadata stream when adding it to the consumer internal
2176 * data structures to handle it.
2177 */
2178 void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
2179 {
2180 struct lttng_ht *ht = metadata_ht;
2181 struct lttng_ht_iter iter;
2182 struct lttng_ht_node_u64 *node;
2183
2184 LTTNG_ASSERT(stream);
2185 LTTNG_ASSERT(ht);
2186
2187 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
2188
2189 pthread_mutex_lock(&the_consumer_data.lock);
2190 pthread_mutex_lock(&stream->chan->lock);
2191 pthread_mutex_lock(&stream->chan->timer_lock);
2192 pthread_mutex_lock(&stream->lock);
2193
2194 /*
2195 * From here, refcounts are updated so be _careful_ when returning an error
2196 * after this point.
2197 */
2198
2199 lttng::urcu::read_lock_guard read_lock;
2200
2201 /*
2202 * Lookup the stream just to make sure it does not exist in our internal
2203 * state. This should NEVER happen.
2204 */
2205 lttng_ht_lookup(ht, &stream->key, &iter);
2206 node = lttng_ht_iter_get_node_u64(&iter);
2207 LTTNG_ASSERT(!node);
2208
2209 /*
2210 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2211 * in terms of destroying the associated channel, because the action that
2212 * causes the count to become 0 also causes a stream to be added. The
2213 * channel deletion will thus be triggered by the following removal of this
2214 * stream.
2215 */
2216 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
2217 /* Increment refcount before decrementing nb_init_stream_left */
2218 cmm_smp_wmb();
2219 uatomic_dec(&stream->chan->nb_init_stream_left);
2220 }
2221
2222 lttng_ht_add_unique_u64(ht, &stream->node);
2223
2224 lttng_ht_add_u64(the_consumer_data.stream_per_chan_id_ht, &stream->node_channel_id);
2225
2226 /*
2227 * Add stream to the stream_list_ht of the consumer data. No need to steal
2228 * the key since the HT does not use it and we allow to add redundant keys
2229 * into this table.
2230 */
2231 lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
2232
2233 pthread_mutex_unlock(&stream->lock);
2234 pthread_mutex_unlock(&stream->chan->lock);
2235 pthread_mutex_unlock(&stream->chan->timer_lock);
2236 pthread_mutex_unlock(&the_consumer_data.lock);
2237 }
2238
2239 /*
2240 * Delete data stream that are flagged for deletion (endpoint_status).
2241 */
2242 static void validate_endpoint_status_data_stream()
2243 {
2244 struct lttng_ht_iter iter;
2245 struct lttng_consumer_stream *stream;
2246
2247 DBG("Consumer delete flagged data stream");
2248
2249 {
2250 lttng::urcu::read_lock_guard read_lock;
2251
2252 cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
2253 /* Validate delete flag of the stream */
2254 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2255 continue;
2256 }
2257 /* Delete it right now */
2258 consumer_del_stream(stream, data_ht);
2259 }
2260 }
2261 }
2262
2263 /*
2264 * Delete metadata stream that are flagged for deletion (endpoint_status).
2265 */
2266 static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
2267 {
2268 struct lttng_ht_iter iter;
2269 struct lttng_consumer_stream *stream;
2270
2271 DBG("Consumer delete flagged metadata stream");
2272
2273 LTTNG_ASSERT(pollset);
2274
2275 {
2276 lttng::urcu::read_lock_guard read_lock;
2277 cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
2278 /* Validate delete flag of the stream */
2279 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2280 continue;
2281 }
2282 /*
2283 * Remove from pollset so the metadata thread can continue without
2284 * blocking on a deleted stream.
2285 */
2286 lttng_poll_del(pollset, stream->wait_fd);
2287
2288 /* Delete it right now */
2289 consumer_del_metadata_stream(stream, metadata_ht);
2290 }
2291 }
2292 }
2293
2294 /*
2295 * Thread polls on metadata file descriptor and write them on disk or on the
2296 * network.
2297 */
2298 void *consumer_thread_metadata_poll(void *data)
2299 {
2300 int ret, i, pollfd, err = -1;
2301 uint32_t revents, nb_fd;
2302 struct lttng_consumer_stream *stream = nullptr;
2303 struct lttng_ht_iter iter;
2304 struct lttng_ht_node_u64 *node;
2305 struct lttng_poll_event events;
2306 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2307 ssize_t len;
2308
2309 rcu_register_thread();
2310
2311 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2312
2313 if (testpoint(consumerd_thread_metadata)) {
2314 goto error_testpoint;
2315 }
2316
2317 health_code_update();
2318
2319 DBG("Thread metadata poll started");
2320
2321 /* Size is set to 1 for the consumer_metadata pipe */
2322 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2323 if (ret < 0) {
2324 ERR("Poll set creation failed");
2325 goto end_poll;
2326 }
2327
2328 ret = lttng_poll_add(&events, lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
2329 if (ret < 0) {
2330 goto end;
2331 }
2332
2333 /* Main loop */
2334 DBG("Metadata main loop started");
2335
2336 while (true) {
2337 restart:
2338 health_code_update();
2339 health_poll_entry();
2340 DBG("Metadata poll wait");
2341 ret = lttng_poll_wait(&events, -1);
2342 DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2343 health_poll_exit();
2344 DBG("Metadata event caught in thread");
2345 if (ret < 0) {
2346 if (errno == EINTR) {
2347 ERR("Poll EINTR caught");
2348 goto restart;
2349 }
2350 if (LTTNG_POLL_GETNB(&events) == 0) {
2351 err = 0; /* All is OK */
2352 }
2353 goto end;
2354 }
2355
2356 nb_fd = ret;
2357
2358 /* From here, the event is a metadata wait fd */
2359 for (i = 0; i < nb_fd; i++) {
2360 health_code_update();
2361
2362 revents = LTTNG_POLL_GETEV(&events, i);
2363 pollfd = LTTNG_POLL_GETFD(&events, i);
2364
2365 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
2366 if (revents & LPOLLIN) {
2367 ssize_t pipe_len;
2368
2369 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2370 &stream,
2371 sizeof(stream)); /* NOLINT sizeof
2372 used on a
2373 pointer. */
2374 if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
2375 pointer. */
2376 if (pipe_len < 0) {
2377 PERROR("read metadata stream");
2378 }
2379 /*
2380 * Remove the pipe from the poll set and continue
2381 * the loop since their might be data to consume.
2382 */
2383 lttng_poll_del(
2384 &events,
2385 lttng_pipe_get_readfd(
2386 ctx->consumer_metadata_pipe));
2387 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2388 continue;
2389 }
2390
2391 /* A NULL stream means that the state has changed. */
2392 if (stream == nullptr) {
2393 /* Check for deleted streams. */
2394 validate_endpoint_status_metadata_stream(&events);
2395 goto restart;
2396 }
2397
2398 DBG("Adding metadata stream %d to poll set",
2399 stream->wait_fd);
2400
2401 /* Add metadata stream to the global poll events list */
2402 lttng_poll_add(
2403 &events, stream->wait_fd, LPOLLIN | LPOLLPRI);
2404 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2405 DBG("Metadata thread pipe hung up");
2406 /*
2407 * Remove the pipe from the poll set and continue the loop
2408 * since their might be data to consume.
2409 */
2410 lttng_poll_del(
2411 &events,
2412 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2413 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2414 continue;
2415 } else {
2416 ERR("Unexpected poll events %u for sock %d",
2417 revents,
2418 pollfd);
2419 goto end;
2420 }
2421
2422 /* Handle other stream */
2423 continue;
2424 }
2425
2426 lttng::urcu::read_lock_guard read_lock;
2427 {
2428 uint64_t tmp_id = (uint64_t) pollfd;
2429
2430 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2431 }
2432 node = lttng_ht_iter_get_node_u64(&iter);
2433 LTTNG_ASSERT(node);
2434
2435 stream = caa_container_of(node, struct lttng_consumer_stream, node);
2436
2437 if (revents & (LPOLLIN | LPOLLPRI)) {
2438 /* Get the data out of the metadata file descriptor */
2439 DBG("Metadata available on fd %d", pollfd);
2440 LTTNG_ASSERT(stream->wait_fd == pollfd);
2441
2442 do {
2443 health_code_update();
2444
2445 len = ctx->on_buffer_ready(stream, ctx, false);
2446 /*
2447 * We don't check the return value here since if we get
2448 * a negative len, it means an error occurred thus we
2449 * simply remove it from the poll set and free the
2450 * stream.
2451 */
2452 } while (len > 0);
2453
2454 /* It's ok to have an unavailable sub-buffer */
2455 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2456 /* Clean up stream from consumer and free it. */
2457 lttng_poll_del(&events, stream->wait_fd);
2458 consumer_del_metadata_stream(stream, metadata_ht);
2459 }
2460 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2461 DBG("Metadata fd %d is hup|err.", pollfd);
2462 if (!stream->hangup_flush_done &&
2463 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2464 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
2465 DBG("Attempting to flush and consume the UST buffers");
2466 lttng_ustconsumer_on_stream_hangup(stream);
2467
2468 /* We just flushed the stream now read it. */
2469 do {
2470 health_code_update();
2471
2472 len = ctx->on_buffer_ready(stream, ctx, false);
2473 /*
2474 * We don't check the return value here since if we
2475 * get a negative len, it means an error occurred
2476 * thus we simply remove it from the poll set and
2477 * free the stream.
2478 */
2479 } while (len > 0);
2480 }
2481
2482 lttng_poll_del(&events, stream->wait_fd);
2483 /*
2484 * This call update the channel states, closes file descriptors
2485 * and securely free the stream.
2486 */
2487 consumer_del_metadata_stream(stream, metadata_ht);
2488 } else {
2489 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2490 goto end;
2491 }
2492 /* Release RCU lock for the stream looked up */
2493 }
2494 }
2495
2496 /* All is OK */
2497 err = 0;
2498 end:
2499 DBG("Metadata poll thread exiting");
2500
2501 lttng_poll_clean(&events);
2502 end_poll:
2503 error_testpoint:
2504 if (err) {
2505 health_error();
2506 ERR("Health error occurred in %s", __func__);
2507 }
2508 health_unregister(health_consumerd);
2509 rcu_unregister_thread();
2510 return nullptr;
2511 }
2512
2513 /*
2514 * This thread polls the fds in the set to consume the data and write
2515 * it to tracefile if necessary.
2516 */
2517 void *consumer_thread_data_poll(void *data)
2518 {
2519 int num_rdy, high_prio, ret, i, err = -1;
2520 struct pollfd *pollfd = nullptr;
2521 /* local view of the streams */
2522 struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
2523 /* local view of consumer_data.fds_count */
2524 int nb_fd = 0;
2525 /* 2 for the consumer_data_pipe and wake up pipe */
2526 const int nb_pipes_fd = 2;
2527 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2528 int nb_inactive_fd = 0;
2529 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2530 ssize_t len;
2531
2532 rcu_register_thread();
2533
2534 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2535
2536 if (testpoint(consumerd_thread_data)) {
2537 goto error_testpoint;
2538 }
2539
2540 health_code_update();
2541
2542 local_stream = zmalloc<lttng_consumer_stream *>();
2543 if (local_stream == nullptr) {
2544 PERROR("local_stream malloc");
2545 goto end;
2546 }
2547
2548 while (true) {
2549 health_code_update();
2550
2551 high_prio = 0;
2552
2553 /*
2554 * the fds set has been updated, we need to update our
2555 * local array as well
2556 */
2557 pthread_mutex_lock(&the_consumer_data.lock);
2558 if (the_consumer_data.need_update) {
2559 free(pollfd);
2560 pollfd = nullptr;
2561
2562 free(local_stream);
2563 local_stream = nullptr;
2564
2565 /* Allocate for all fds */
2566 pollfd =
2567 calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
2568 if (pollfd == nullptr) {
2569 PERROR("pollfd malloc");
2570 pthread_mutex_unlock(&the_consumer_data.lock);
2571 goto end;
2572 }
2573
2574 local_stream = calloc<lttng_consumer_stream *>(
2575 the_consumer_data.stream_count + nb_pipes_fd);
2576 if (local_stream == nullptr) {
2577 PERROR("local_stream malloc");
2578 pthread_mutex_unlock(&the_consumer_data.lock);
2579 goto end;
2580 }
2581 ret = update_poll_array(
2582 ctx, &pollfd, local_stream, data_ht, &nb_inactive_fd);
2583 if (ret < 0) {
2584 ERR("Error in allocating pollfd or local_outfds");
2585 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2586 pthread_mutex_unlock(&the_consumer_data.lock);
2587 goto end;
2588 }
2589 nb_fd = ret;
2590 the_consumer_data.need_update = 0;
2591 }
2592 pthread_mutex_unlock(&the_consumer_data.lock);
2593
2594 /* No FDs and consumer_quit, consumer_cleanup the thread */
2595 if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
2596 err = 0; /* All is OK */
2597 goto end;
2598 }
2599 /* poll on the array of fds */
2600 restart:
2601 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
2602 if (testpoint(consumerd_thread_data_poll)) {
2603 goto end;
2604 }
2605 health_poll_entry();
2606 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
2607 health_poll_exit();
2608 DBG("poll num_rdy : %d", num_rdy);
2609 if (num_rdy == -1) {
2610 /*
2611 * Restart interrupted system call.
2612 */
2613 if (errno == EINTR) {
2614 goto restart;
2615 }
2616 PERROR("Poll error");
2617 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2618 goto end;
2619 } else if (num_rdy == 0) {
2620 DBG("Polling thread timed out");
2621 goto end;
2622 }
2623
2624 if (caa_unlikely(data_consumption_paused)) {
2625 DBG("Data consumption paused, sleeping...");
2626 sleep(1);
2627 goto restart;
2628 }
2629
2630 /*
2631 * If the consumer_data_pipe triggered poll go directly to the
2632 * beginning of the loop to update the array. We want to prioritize
2633 * array update over low-priority reads.
2634 */
2635 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2636 ssize_t pipe_readlen;
2637
2638 DBG("consumer_data_pipe wake up");
2639 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2640 &new_stream,
2641 sizeof(new_stream)); /* NOLINT sizeof used on
2642 a pointer. */
2643 if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
2644 */
2645 PERROR("Consumer data pipe");
2646 /* Continue so we can at least handle the current stream(s). */
2647 continue;
2648 }
2649
2650 /*
2651 * If the stream is NULL, just ignore it. It's also possible that
2652 * the sessiond poll thread changed the consumer_quit state and is
2653 * waking us up to test it.
2654 */
2655 if (new_stream == nullptr) {
2656 validate_endpoint_status_data_stream();
2657 continue;
2658 }
2659
2660 /* Continue to update the local streams and handle prio ones */
2661 continue;
2662 }
2663
2664 /* Handle wakeup pipe. */
2665 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2666 char dummy;
2667 ssize_t pipe_readlen;
2668
2669 pipe_readlen =
2670 lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, sizeof(dummy));
2671 if (pipe_readlen < 0) {
2672 PERROR("Consumer data wakeup pipe");
2673 }
2674 /* We've been awakened to handle stream(s). */
2675 ctx->has_wakeup = 0;
2676 }
2677
2678 /* Take care of high priority channels first. */
2679 for (i = 0; i < nb_fd; i++) {
2680 health_code_update();
2681
2682 if (local_stream[i] == nullptr) {
2683 continue;
2684 }
2685 if (pollfd[i].revents & POLLPRI) {
2686 DBG("Urgent read on fd %d", pollfd[i].fd);
2687 high_prio = 1;
2688 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
2689 /* it's ok to have an unavailable sub-buffer */
2690 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2691 /* Clean the stream and free it. */
2692 consumer_del_stream(local_stream[i], data_ht);
2693 local_stream[i] = nullptr;
2694 } else if (len > 0) {
2695 local_stream[i]->has_data_left_to_be_read_before_teardown =
2696 1;
2697 }
2698 }
2699 }
2700
2701 /*
2702 * If we read high prio channel in this loop, try again
2703 * for more high prio data.
2704 */
2705 if (high_prio) {
2706 continue;
2707 }
2708
2709 /* Take care of low priority channels. */
2710 for (i = 0; i < nb_fd; i++) {
2711 health_code_update();
2712
2713 if (local_stream[i] == nullptr) {
2714 continue;
2715 }
2716 if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
2717 local_stream[i]->has_data) {
2718 DBG("Normal read on fd %d", pollfd[i].fd);
2719 len = ctx->on_buffer_ready(local_stream[i], ctx, false);
2720 /* it's ok to have an unavailable sub-buffer */
2721 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2722 /* Clean the stream and free it. */
2723 consumer_del_stream(local_stream[i], data_ht);
2724 local_stream[i] = nullptr;
2725 } else if (len > 0) {
2726 local_stream[i]->has_data_left_to_be_read_before_teardown =
2727 1;
2728 }
2729 }
2730 }
2731
2732 /* Handle hangup and errors */
2733 for (i = 0; i < nb_fd; i++) {
2734 health_code_update();
2735
2736 if (local_stream[i] == nullptr) {
2737 continue;
2738 }
2739 if (!local_stream[i]->hangup_flush_done &&
2740 (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL)) &&
2741 (the_consumer_data.type == LTTNG_CONSUMER32_UST ||
2742 the_consumer_data.type == LTTNG_CONSUMER64_UST)) {
2743 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2744 pollfd[i].fd);
2745 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2746 /* Attempt read again, for the data we just flushed. */
2747 local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
2748 }
2749 /*
2750 * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
2751 * performed. This type of flush ensures that a new packet is produced no
2752 * matter the consumed/produced positions are.
2753 *
2754 * This, in turn, causes the next pass to see that data available for the
2755 * stream. When we come back here, we can be assured that all available
2756 * data has been consumed and we can finally destroy the stream.
2757 *
2758 * If the poll flag is HUP/ERR/NVAL and we have
2759 * read no data in this pass, we can remove the
2760 * stream from its hash table.
2761 */
2762 if ((pollfd[i].revents & POLLHUP)) {
2763 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2764 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2765 consumer_del_stream(local_stream[i], data_ht);
2766 local_stream[i] = nullptr;
2767 }
2768 } else if (pollfd[i].revents & POLLERR) {
2769 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2770 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2771 consumer_del_stream(local_stream[i], data_ht);
2772 local_stream[i] = nullptr;
2773 }
2774 } else if (pollfd[i].revents & POLLNVAL) {
2775 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2776 if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
2777 consumer_del_stream(local_stream[i], data_ht);
2778 local_stream[i] = nullptr;
2779 }
2780 }
2781 if (local_stream[i] != nullptr) {
2782 local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
2783 }
2784 }
2785 }
2786 /* All is OK */
2787 err = 0;
2788 end:
2789 DBG("polling thread exiting");
2790 free(pollfd);
2791 free(local_stream);
2792
2793 /*
2794 * Close the write side of the pipe so epoll_wait() in
2795 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2796 * read side of the pipe. If we close them both, epoll_wait strangely does
2797 * not return and could create a endless wait period if the pipe is the
2798 * only tracked fd in the poll set. The thread will take care of closing
2799 * the read side.
2800 */
2801 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
2802
2803 error_testpoint:
2804 if (err) {
2805 health_error();
2806 ERR("Health error occurred in %s", __func__);
2807 }
2808 health_unregister(health_consumerd);
2809
2810 rcu_unregister_thread();
2811 return nullptr;
2812 }
2813
2814 /*
2815 * Close wake-up end of each stream belonging to the channel. This will
2816 * allow the poll() on the stream read-side to detect when the
2817 * write-side (application) finally closes them.
2818 */
2819 static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2820 {
2821 struct lttng_ht *ht;
2822 struct lttng_consumer_stream *stream;
2823 struct lttng_ht_iter iter;
2824
2825 ht = the_consumer_data.stream_per_chan_id_ht;
2826
2827 lttng::urcu::read_lock_guard read_lock;
2828 cds_lfht_for_each_entry_duplicate(ht->ht,
2829 ht->hash_fct(&channel->key, lttng_ht_seed),
2830 ht->match_fct,
2831 &channel->key,
2832 &iter.iter,
2833 stream,
2834 node_channel_id.node)
2835 {
2836 /*
2837 * Protect against teardown with mutex.
2838 */
2839 pthread_mutex_lock(&stream->lock);
2840 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2841 goto next;
2842 }
2843 switch (the_consumer_data.type) {
2844 case LTTNG_CONSUMER_KERNEL:
2845 break;
2846 case LTTNG_CONSUMER32_UST:
2847 case LTTNG_CONSUMER64_UST:
2848 if (stream->metadata_flag) {
2849 /* Safe and protected by the stream lock. */
2850 lttng_ustconsumer_close_metadata(stream->chan);
2851 } else {
2852 /*
2853 * Note: a mutex is taken internally within
2854 * liblttng-ust-ctl to protect timer wakeup_fd
2855 * use from concurrent close.
2856 */
2857 lttng_ustconsumer_close_stream_wakeup(stream);
2858 }
2859 break;
2860 default:
2861 ERR("Unknown consumer_data type");
2862 abort();
2863 }
2864 next:
2865 pthread_mutex_unlock(&stream->lock);
2866 }
2867 }
2868
2869 static void destroy_channel_ht(struct lttng_ht *ht)
2870 {
2871 struct lttng_ht_iter iter;
2872 struct lttng_consumer_channel *channel;
2873 int ret;
2874
2875 if (ht == nullptr) {
2876 return;
2877 }
2878
2879 {
2880 lttng::urcu::read_lock_guard read_lock;
2881
2882 cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
2883 ret = lttng_ht_del(ht, &iter);
2884 LTTNG_ASSERT(ret != 0);
2885 }
2886 }
2887
2888 lttng_ht_destroy(ht);
2889 }
2890
2891 /*
2892 * This thread polls the channel fds to detect when they are being
2893 * closed. It closes all related streams if the channel is detected as
2894 * closed. It is currently only used as a shim layer for UST because the
2895 * consumerd needs to keep the per-stream wakeup end of pipes open for
2896 * periodical flush.
2897 */
2898 void *consumer_thread_channel_poll(void *data)
2899 {
2900 int ret, i, pollfd, err = -1;
2901 uint32_t revents, nb_fd;
2902 struct lttng_consumer_channel *chan = nullptr;
2903 struct lttng_ht_iter iter;
2904 struct lttng_ht_node_u64 *node;
2905 struct lttng_poll_event events;
2906 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
2907 struct lttng_ht *channel_ht;
2908
2909 rcu_register_thread();
2910
2911 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2912
2913 if (testpoint(consumerd_thread_channel)) {
2914 goto error_testpoint;
2915 }
2916
2917 health_code_update();
2918
2919 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2920 if (!channel_ht) {
2921 /* ENOMEM at this point. Better to bail out. */
2922 goto end_ht;
2923 }
2924
2925 DBG("Thread channel poll started");
2926
2927 /* Size is set to 1 for the consumer_channel pipe */
2928 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2929 if (ret < 0) {
2930 ERR("Poll set creation failed");
2931 goto end_poll;
2932 }
2933
2934 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2935 if (ret < 0) {
2936 goto end;
2937 }
2938
2939 /* Main loop */
2940 DBG("Channel main loop started");
2941
2942 while (true) {
2943 restart:
2944 health_code_update();
2945 DBG("Channel poll wait");
2946 health_poll_entry();
2947 ret = lttng_poll_wait(&events, -1);
2948 DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2949 health_poll_exit();
2950 DBG("Channel event caught in thread");
2951 if (ret < 0) {
2952 if (errno == EINTR) {
2953 ERR("Poll EINTR caught");
2954 goto restart;
2955 }
2956 if (LTTNG_POLL_GETNB(&events) == 0) {
2957 err = 0; /* All is OK */
2958 }
2959 goto end;
2960 }
2961
2962 nb_fd = ret;
2963
2964 /* From here, the event is a channel wait fd */
2965 for (i = 0; i < nb_fd; i++) {
2966 health_code_update();
2967
2968 revents = LTTNG_POLL_GETEV(&events, i);
2969 pollfd = LTTNG_POLL_GETFD(&events, i);
2970
2971 if (pollfd == ctx->consumer_channel_pipe[0]) {
2972 if (revents & LPOLLIN) {
2973 enum consumer_channel_action action;
2974 uint64_t key;
2975
2976 ret = read_channel_pipe(ctx, &chan, &key, &action);
2977 if (ret <= 0) {
2978 if (ret < 0) {
2979 ERR("Error reading channel pipe");
2980 }
2981 lttng_poll_del(&events,
2982 ctx->consumer_channel_pipe[0]);
2983 continue;
2984 }
2985
2986 switch (action) {
2987 case CONSUMER_CHANNEL_ADD:
2988 {
2989 DBG("Adding channel %d to poll set", chan->wait_fd);
2990
2991 lttng_ht_node_init_u64(&chan->wait_fd_node,
2992 chan->wait_fd);
2993 lttng::urcu::read_lock_guard read_lock;
2994 lttng_ht_add_unique_u64(channel_ht,
2995 &chan->wait_fd_node);
2996 /* Add channel to the global poll events list */
2997 // FIXME: Empty flag on a pipe pollset, this might
2998 // hang on FreeBSD.
2999 lttng_poll_add(&events, chan->wait_fd, 0);
3000 break;
3001 }
3002 case CONSUMER_CHANNEL_DEL:
3003 {
3004 /*
3005 * This command should never be called if the
3006 * channel has streams monitored by either the data
3007 * or metadata thread. The consumer only notify this
3008 * thread with a channel del. command if it receives
3009 * a destroy channel command from the session daemon
3010 * that send it if a command prior to the
3011 * GET_CHANNEL failed.
3012 */
3013
3014 lttng::urcu::read_lock_guard read_lock;
3015 chan = consumer_find_channel(key);
3016 if (!chan) {
3017 ERR("UST consumer get channel key %" PRIu64
3018 " not found for del channel",
3019 key);
3020 break;
3021 }
3022 lttng_poll_del(&events, chan->wait_fd);
3023 iter.iter.node = &chan->wait_fd_node.node;
3024 ret = lttng_ht_del(channel_ht, &iter);
3025 LTTNG_ASSERT(ret == 0);
3026
3027 switch (the_consumer_data.type) {
3028 case LTTNG_CONSUMER_KERNEL:
3029 break;
3030 case LTTNG_CONSUMER32_UST:
3031 case LTTNG_CONSUMER64_UST:
3032 health_code_update();
3033 /* Destroy streams that might have been left
3034 * in the stream list. */
3035 clean_channel_stream_list(chan);
3036 break;
3037 default:
3038 ERR("Unknown consumer_data type");
3039 abort();
3040 }
3041
3042 /*
3043 * Release our own refcount. Force channel deletion
3044 * even if streams were not initialized.
3045 */
3046 if (!uatomic_sub_return(&chan->refcount, 1)) {
3047 consumer_del_channel(chan);
3048 }
3049 goto restart;
3050 }
3051 case CONSUMER_CHANNEL_QUIT:
3052 /*
3053 * Remove the pipe from the poll set and continue
3054 * the loop since their might be data to consume.
3055 */
3056 lttng_poll_del(&events,
3057 ctx->consumer_channel_pipe[0]);
3058 continue;
3059 default:
3060 ERR("Unknown action");
3061 break;
3062 }
3063 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3064 DBG("Channel thread pipe hung up");
3065 /*
3066 * Remove the pipe from the poll set and continue the loop
3067 * since their might be data to consume.
3068 */
3069 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3070 continue;
3071 } else {
3072 ERR("Unexpected poll events %u for sock %d",
3073 revents,
3074 pollfd);
3075 goto end;
3076 }
3077
3078 /* Handle other stream */
3079 continue;
3080 }
3081
3082 lttng::urcu::read_lock_guard read_lock;
3083 {
3084 uint64_t tmp_id = (uint64_t) pollfd;
3085
3086 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3087 }
3088 node = lttng_ht_iter_get_node_u64(&iter);
3089 LTTNG_ASSERT(node);
3090
3091 chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
3092
3093 /* Check for error event */
3094 if (revents & (LPOLLERR | LPOLLHUP)) {
3095 DBG("Channel fd %d is hup|err.", pollfd);
3096
3097 lttng_poll_del(&events, chan->wait_fd);
3098 ret = lttng_ht_del(channel_ht, &iter);
3099 LTTNG_ASSERT(ret == 0);
3100
3101 /*
3102 * This will close the wait fd for each stream associated to
3103 * this channel AND monitored by the data/metadata thread thus
3104 * will be clean by the right thread.
3105 */
3106 consumer_close_channel_streams(chan);
3107
3108 /* Release our own refcount */
3109 if (!uatomic_sub_return(&chan->refcount, 1) &&
3110 !uatomic_read(&chan->nb_init_stream_left)) {
3111 consumer_del_channel(chan);
3112 }
3113 } else {
3114 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3115 goto end;
3116 }
3117
3118 /* Release RCU lock for the channel looked up */
3119 }
3120 }
3121
3122 /* All is OK */
3123 err = 0;
3124 end:
3125 lttng_poll_clean(&events);
3126 end_poll:
3127 destroy_channel_ht(channel_ht);
3128 end_ht:
3129 error_testpoint:
3130 DBG("Channel poll thread exiting");
3131 if (err) {
3132 health_error();
3133 ERR("Health error occurred in %s", __func__);
3134 }
3135 health_unregister(health_consumerd);
3136 rcu_unregister_thread();
3137 return nullptr;
3138 }
3139
3140 static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
3141 struct pollfd *sockpoll,
3142 int client_socket)
3143 {
3144 int ret;
3145
3146 LTTNG_ASSERT(ctx);
3147 LTTNG_ASSERT(sockpoll);
3148
3149 ret = lttng_consumer_poll_socket(sockpoll);
3150 if (ret) {
3151 goto error;
3152 }
3153 DBG("Metadata connection on client_socket");
3154
3155 /* Blocking call, waiting for transmission */
3156 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3157 if (ctx->consumer_metadata_socket < 0) {
3158 WARN("On accept metadata");
3159 ret = -1;
3160 goto error;
3161 }
3162 ret = 0;
3163
3164 error:
3165 return ret;
3166 }
3167
3168 /*
3169 * This thread listens on the consumerd socket and receives the file
3170 * descriptors from the session daemon.
3171 */
3172 void *consumer_thread_sessiond_poll(void *data)
3173 {
3174 int sock = -1, client_socket, ret, err = -1;
3175 /*
3176 * structure to poll for incoming data on communication socket avoids
3177 * making blocking sockets.
3178 */
3179 struct pollfd consumer_sockpoll[2];
3180 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
3181
3182 rcu_register_thread();
3183
3184 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3185
3186 if (testpoint(consumerd_thread_sessiond)) {
3187 goto error_testpoint;
3188 }
3189
3190 health_code_update();
3191
3192 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3193 unlink(ctx->consumer_command_sock_path);
3194 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3195 if (client_socket < 0) {
3196 ERR("Cannot create command socket");
3197 goto end;
3198 }
3199
3200 ret = lttcomm_listen_unix_sock(client_socket);
3201 if (ret < 0) {
3202 goto end;
3203 }
3204
3205 DBG("Sending ready command to lttng-sessiond");
3206 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3207 /* return < 0 on error, but == 0 is not fatal */
3208 if (ret < 0) {
3209 ERR("Error sending ready command to lttng-sessiond");
3210 goto end;
3211 }
3212
3213 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3214 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3215 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3216 consumer_sockpoll[1].fd = client_socket;
3217 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3218
3219 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3220 if (ret) {
3221 if (ret > 0) {
3222 /* should exit */
3223 err = 0;
3224 }
3225 goto end;
3226 }
3227 DBG("Connection on client_socket");
3228
3229 /* Blocking call, waiting for transmission */
3230 sock = lttcomm_accept_unix_sock(client_socket);
3231 if (sock < 0) {
3232 WARN("On accept");
3233 goto end;
3234 }
3235
3236 /*
3237 * Setup metadata socket which is the second socket connection on the
3238 * command unix socket.
3239 */
3240 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
3241 if (ret) {
3242 if (ret > 0) {
3243 /* should exit */
3244 err = 0;
3245 }
3246 goto end;
3247 }
3248
3249 /* This socket is not useful anymore. */
3250 ret = close(client_socket);
3251 if (ret < 0) {
3252 PERROR("close client_socket");
3253 }
3254 client_socket = -1;
3255
3256 /* update the polling structure to poll on the established socket */
3257 consumer_sockpoll[1].fd = sock;
3258 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3259
3260 while (true) {
3261 health_code_update();
3262
3263 health_poll_entry();
3264 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3265 health_poll_exit();
3266 if (ret) {
3267 if (ret > 0) {
3268 /* should exit */
3269 err = 0;
3270 }
3271 goto end;
3272 }
3273 DBG("Incoming command on sock");
3274 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
3275 if (ret <= 0) {
3276 /*
3277 * This could simply be a session daemon quitting. Don't output
3278 * ERR() here.
3279 */
3280 DBG("Communication interrupted on command socket");
3281 err = 0;
3282 goto end;
3283 }
3284 if (CMM_LOAD_SHARED(consumer_quit)) {
3285 DBG("consumer_thread_receive_fds received quit from signal");
3286 err = 0; /* All is OK */
3287 goto end;
3288 }
3289 DBG("Received command on sock");
3290 }
3291 /* All is OK */
3292 err = 0;
3293
3294 end:
3295 DBG("Consumer thread sessiond poll exiting");
3296
3297 /*
3298 * Close metadata streams since the producer is the session daemon which
3299 * just died.
3300 *
3301 * NOTE: for now, this only applies to the UST tracer.
3302 */
3303 lttng_consumer_close_all_metadata();
3304
3305 /*
3306 * when all fds have hung up, the polling thread
3307 * can exit cleanly
3308 */
3309 CMM_STORE_SHARED(consumer_quit, 1);
3310
3311 /*
3312 * Notify the data poll thread to poll back again and test the
3313 * consumer_quit state that we just set so to quit gracefully.
3314 */
3315 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
3316
3317 notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
3318
3319 notify_health_quit_pipe(health_quit_pipe);
3320
3321 /* Cleaning up possibly open sockets. */
3322 if (sock >= 0) {
3323 ret = close(sock);
3324 if (ret < 0) {
3325 PERROR("close sock sessiond poll");
3326 }
3327 }
3328 if (client_socket >= 0) {
3329 ret = close(client_socket);
3330 if (ret < 0) {
3331 PERROR("close client_socket sessiond poll");
3332 }
3333 }
3334
3335 error_testpoint:
3336 if (err) {
3337 health_error();
3338 ERR("Health error occurred in %s", __func__);
3339 }
3340 health_unregister(health_consumerd);
3341
3342 rcu_unregister_thread();
3343 return nullptr;
3344 }
3345
3346 static int post_consume(struct lttng_consumer_stream *stream,
3347 const struct stream_subbuffer *subbuffer,
3348 struct lttng_consumer_local_data *ctx)
3349 {
3350 size_t i;
3351 int ret = 0;
3352 const size_t count =
3353 lttng_dynamic_array_get_count(&stream->read_subbuffer_ops.post_consume_cbs);
3354
3355 for (i = 0; i < count; i++) {
3356 const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
3357 &stream->read_subbuffer_ops.post_consume_cbs, i);
3358
3359 ret = op(stream, subbuffer, ctx);
3360 if (ret) {
3361 goto end;
3362 }
3363 }
3364 end:
3365 return ret;
3366 }
3367
3368 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
3369 struct lttng_consumer_local_data *ctx,
3370 bool locked_by_caller)
3371 {
3372 ssize_t ret, written_bytes = 0;
3373 int rotation_ret;
3374 struct stream_subbuffer subbuffer = {};
3375 enum get_next_subbuffer_status get_next_status;
3376
3377 if (!locked_by_caller) {
3378 stream->read_subbuffer_ops.lock(stream);
3379 } else {
3380 stream->read_subbuffer_ops.assert_locked(stream);
3381 }
3382
3383 if (stream->read_subbuffer_ops.on_wake_up) {
3384 ret = stream->read_subbuffer_ops.on_wake_up(stream);
3385 if (ret) {
3386 goto end;
3387 }
3388 }
3389
3390 /*
3391 * If the stream was flagged to be ready for rotation before we extract
3392 * the next packet, rotate it now.
3393 */
3394 if (stream->rotate_ready) {
3395 DBG("Rotate stream before consuming data");
3396 ret = lttng_consumer_rotate_stream(stream);
3397 if (ret < 0) {
3398 ERR("Stream rotation error before consuming data");
3399 goto end;
3400 }
3401 }
3402
3403 get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
3404 switch (get_next_status) {
3405 case GET_NEXT_SUBBUFFER_STATUS_OK:
3406 break;
3407 case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
3408 /* Not an error. */
3409 ret = 0;
3410 goto sleep_stream;
3411 case GET_NEXT_SUBBUFFER_STATUS_ERROR:
3412 ret = -1;
3413 goto end;
3414 default:
3415 abort();
3416 }
3417
3418 ret = stream->read_subbuffer_ops.pre_consume_subbuffer(stream, &subbuffer);
3419 if (ret) {
3420 goto error_put_subbuf;
3421 }
3422
3423 written_bytes = stream->read_subbuffer_ops.consume_subbuffer(ctx, stream, &subbuffer);
3424 if (written_bytes <= 0) {
3425 ERR("Error consuming subbuffer: (%zd)", written_bytes);
3426 ret = (int) written_bytes;
3427 goto error_put_subbuf;
3428 }
3429
3430 ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3431 if (ret) {
3432 goto end;
3433 }
3434
3435 ret = post_consume(stream, &subbuffer, ctx);
3436 if (ret) {
3437 goto end;
3438 }
3439
3440 /*
3441 * After extracting the packet, we check if the stream is now ready to
3442 * be rotated and perform the action immediately.
3443 *
3444 * Don't overwrite `ret` as callers expect the number of bytes
3445 * consumed to be returned on success.
3446 */
3447 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
3448 if (rotation_ret == 1) {
3449 rotation_ret = lttng_consumer_rotate_stream(stream);
3450 if (rotation_ret < 0) {
3451 ret = rotation_ret;
3452 ERR("Stream rotation error after consuming data");
3453 goto end;
3454 }
3455
3456 } else if (rotation_ret < 0) {
3457 ret = rotation_ret;
3458 ERR("Failed to check if stream was ready to rotate after consuming data");
3459 goto end;
3460 }
3461
3462 sleep_stream:
3463 if (stream->read_subbuffer_ops.on_sleep) {
3464 stream->read_subbuffer_ops.on_sleep(stream, ctx);
3465 }
3466
3467 ret = written_bytes;
3468 end:
3469 if (!locked_by_caller) {
3470 stream->read_subbuffer_ops.unlock(stream);
3471 }
3472
3473 return ret;
3474 error_put_subbuf:
3475 (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
3476 goto end;
3477 }
3478
3479 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3480 {
3481 switch (the_consumer_data.type) {
3482 case LTTNG_CONSUMER_KERNEL:
3483 return lttng_kconsumer_on_recv_stream(stream);
3484 case LTTNG_CONSUMER32_UST:
3485 case LTTNG_CONSUMER64_UST:
3486 return lttng_ustconsumer_on_recv_stream(stream);
3487 default:
3488 ERR("Unknown consumer_data type");
3489 abort();
3490 return -ENOSYS;
3491 }
3492 }
3493
3494 /*
3495 * Allocate and set consumer data hash tables.
3496 */
3497 int lttng_consumer_init()
3498 {
3499 the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3500 if (!the_consumer_data.channel_ht) {
3501 goto error;
3502 }
3503
3504 the_consumer_data.channels_by_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3505 if (!the_consumer_data.channels_by_session_id_ht) {
3506 goto error;
3507 }
3508
3509 the_consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3510 if (!the_consumer_data.relayd_ht) {
3511 goto error;
3512 }
3513
3514 the_consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3515 if (!the_consumer_data.stream_list_ht) {
3516 goto error;
3517 }
3518
3519 the_consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3520 if (!the_consumer_data.stream_per_chan_id_ht) {
3521 goto error;
3522 }
3523
3524 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3525 if (!data_ht) {
3526 goto error;
3527 }
3528
3529 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3530 if (!metadata_ht) {
3531 goto error;
3532 }
3533
3534 the_consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3535 if (!the_consumer_data.chunk_registry) {
3536 goto error;
3537 }
3538
3539 return 0;
3540
3541 error:
3542 return -1;
3543 }
3544
3545 /*
3546 * Process the ADD_RELAYD command receive by a consumer.
3547 *
3548 * This will create a relayd socket pair and add it to the relayd hash table.
3549 * The caller MUST acquire a RCU read side lock before calling it.
3550 */
3551 void consumer_add_relayd_socket(uint64_t net_seq_idx,
3552 int sock_type,
3553 struct lttng_consumer_local_data *ctx,
3554 int sock,
3555 struct pollfd *consumer_sockpoll,
3556 uint64_t sessiond_id,
3557 uint64_t relayd_session_id,
3558 uint32_t relayd_version_major,
3559 uint32_t relayd_version_minor,
3560 enum lttcomm_sock_proto relayd_socket_protocol)
3561 {
3562 int fd = -1, ret = -1, relayd_created = 0;
3563 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3564 struct consumer_relayd_sock_pair *relayd = nullptr;
3565
3566 LTTNG_ASSERT(ctx);
3567 LTTNG_ASSERT(sock >= 0);
3568 ASSERT_RCU_READ_LOCKED();
3569
3570 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
3571
3572 /* Get relayd reference if exists. */
3573 relayd = consumer_find_relayd(net_seq_idx);
3574 if (relayd == nullptr) {
3575 LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
3576 /* Not found. Allocate one. */
3577 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3578 if (relayd == nullptr) {
3579 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3580 goto error;
3581 } else {
3582 relayd->sessiond_session_id = sessiond_id;
3583 relayd_created = 1;
3584 }
3585
3586 /*
3587 * This code path MUST continue to the consumer send status message to
3588 * we can notify the session daemon and continue our work without
3589 * killing everything.
3590 */
3591 } else {
3592 /*
3593 * relayd key should never be found for control socket.
3594 */
3595 LTTNG_ASSERT(sock_type != LTTNG_STREAM_CONTROL);
3596 }
3597
3598 /* First send a status message before receiving the fds. */
3599 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
3600 if (ret < 0) {
3601 /* Somehow, the session daemon is not responding anymore. */
3602 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3603 goto error_nosignal;
3604 }
3605
3606 /* Poll on consumer socket. */
3607 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3608 if (ret) {
3609 /* Needing to exit in the middle of a command: error. */
3610 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3611 goto error_nosignal;
3612 }
3613
3614 /* Get relayd socket from session daemon */
3615 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3616 if (ret != sizeof(fd)) {
3617 fd = -1; /* Just in case it gets set with an invalid value. */
3618
3619 /*
3620 * Failing to receive FDs might indicate a major problem such as
3621 * reaching a fd limit during the receive where the kernel returns a
3622 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3623 * don't take any chances and stop everything.
3624 *
3625 * XXX: Feature request #558 will fix that and avoid this possible
3626 * issue when reaching the fd limit.
3627 */
3628 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
3629 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
3630 goto error;
3631 }
3632
3633 /* Copy socket information and received FD */
3634 switch (sock_type) {
3635 case LTTNG_STREAM_CONTROL:
3636 /* Copy received lttcomm socket */
3637 ret = lttcomm_populate_sock_from_open_socket(
3638 &relayd->control_sock.sock, fd, relayd_socket_protocol);
3639
3640 /* Assign version values. */
3641 relayd->control_sock.major = relayd_version_major;
3642 relayd->control_sock.minor = relayd_version_minor;
3643
3644 relayd->relayd_session_id = relayd_session_id;
3645
3646 break;
3647 case LTTNG_STREAM_DATA:
3648 /* Copy received lttcomm socket */
3649 ret = lttcomm_populate_sock_from_open_socket(
3650 &relayd->data_sock.sock, fd, relayd_socket_protocol);
3651 /* Assign version values. */
3652 relayd->data_sock.major = relayd_version_major;
3653 relayd->data_sock.minor = relayd_version_minor;
3654 break;
3655 default:
3656 ERR("Unknown relayd socket type (%d)", sock_type);
3657 ret_code = LTTCOMM_CONSUMERD_FATAL;
3658 goto error;
3659 }
3660
3661 if (ret < 0) {
3662 ret_code = LTTCOMM_CONSUMERD_FATAL;
3663 goto error;
3664 }
3665
3666 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
3667 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3668 relayd->net_seq_idx,
3669 fd);
3670 /*
3671 * We gave the ownership of the fd to the relayd structure. Set the
3672 * fd to -1 so we don't call close() on it in the error path below.
3673 */
3674 fd = -1;
3675
3676 /* We successfully added the socket. Send status back. */
3677 ret = consumer_send_status_msg(sock, ret_code);
3678 if (ret < 0) {
3679 /* Somehow, the session daemon is not responding anymore. */
3680 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3681 goto error_nosignal;
3682 }
3683
3684 /*
3685 * Add relayd socket pair to consumer data hashtable. If object already
3686 * exists or on error, the function gracefully returns.
3687 */
3688 relayd->ctx = ctx;
3689 add_relayd(relayd);
3690
3691 /* All good! */
3692 return;
3693
3694 error:
3695 if (consumer_send_status_msg(sock, ret_code) < 0) {
3696 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3697 }
3698
3699 error_nosignal:
3700 /* Close received socket if valid. */
3701 if (fd >= 0) {
3702 if (close(fd)) {
3703 PERROR("close received socket");
3704 }
3705 }
3706
3707 if (relayd_created) {
3708 free(relayd);
3709 }
3710 }
3711
3712 /*
3713 * Search for a relayd associated to the session id and return the reference.
3714 *
3715 * A rcu read side lock MUST be acquire before calling this function and locked
3716 * until the relayd object is no longer necessary.
3717 */
3718 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3719 {
3720 struct lttng_ht_iter iter;
3721 struct consumer_relayd_sock_pair *relayd = nullptr;
3722
3723 ASSERT_RCU_READ_LOCKED();
3724
3725 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3726 cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
3727 /*
3728 * Check by sessiond id which is unique here where the relayd session
3729 * id might not be when having multiple relayd.
3730 */
3731 if (relayd->sessiond_session_id == id) {
3732 /* Found the relayd. There can be only one per id. */
3733 goto found;
3734 }
3735 }
3736
3737 return nullptr;
3738
3739 found:
3740 return relayd;
3741 }
3742
3743 /*
3744 * Check if for a given session id there is still data needed to be extract
3745 * from the buffers.
3746 *
3747 * Return 1 if data is pending or else 0 meaning ready to be read.
3748 */
3749 int consumer_data_pending(uint64_t id)
3750 {
3751 int ret;
3752 struct lttng_ht_iter iter;
3753 struct lttng_ht *ht;
3754 struct lttng_consumer_stream *stream;
3755 struct consumer_relayd_sock_pair *relayd = nullptr;
3756 int (*data_pending)(struct lttng_consumer_stream *);
3757
3758 DBG("Consumer data pending command on session id %" PRIu64, id);
3759
3760 lttng::urcu::read_lock_guard read_lock;
3761 pthread_mutex_lock(&the_consumer_data.lock);
3762
3763 switch (the_consumer_data.type) {
3764 case LTTNG_CONSUMER_KERNEL:
3765 data_pending = lttng_kconsumer_data_pending;
3766 break;
3767 case LTTNG_CONSUMER32_UST:
3768 case LTTNG_CONSUMER64_UST:
3769 data_pending = lttng_ustconsumer_data_pending;
3770 break;
3771 default:
3772 ERR("Unknown consumer data type");
3773 abort();
3774 }
3775
3776 /* Ease our life a bit */
3777 ht = the_consumer_data.stream_list_ht;
3778
3779 cds_lfht_for_each_entry_duplicate(ht->ht,
3780 ht->hash_fct(&id, lttng_ht_seed),
3781 ht->match_fct,
3782 &id,
3783 &iter.iter,
3784 stream,
3785 node_session_id.node)
3786 {
3787 pthread_mutex_lock(&stream->lock);
3788
3789 /*
3790 * A removed node from the hash table indicates that the stream has
3791 * been deleted thus having a guarantee that the buffers are closed
3792 * on the consumer side. However, data can still be transmitted
3793 * over the network so don't skip the relayd check.
3794 */
3795 ret = cds_lfht_is_node_deleted(&stream->node.node);
3796 if (!ret) {
3797 /* Check the stream if there is data in the buffers. */
3798 ret = data_pending(stream);
3799 if (ret == 1) {
3800 pthread_mutex_unlock(&stream->lock);
3801 goto data_pending;
3802 }
3803 }
3804
3805 pthread_mutex_unlock(&stream->lock);
3806 }
3807
3808 relayd = find_relayd_by_session_id(id);
3809 if (relayd) {
3810 unsigned int is_data_inflight = 0;
3811
3812 /* Send init command for data pending. */
3813 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3814 ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
3815 if (ret < 0) {
3816 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3817 /* Communication error thus the relayd so no data pending. */
3818 goto data_not_pending;
3819 }
3820
3821 cds_lfht_for_each_entry_duplicate(ht->ht,
3822 ht->hash_fct(&id, lttng_ht_seed),
3823 ht->match_fct,
3824 &id,
3825 &iter.iter,
3826 stream,
3827 node_session_id.node)
3828 {
3829 if (stream->metadata_flag) {
3830 ret = relayd_quiescent_control(&relayd->control_sock,
3831 stream->relayd_stream_id);
3832 } else {
3833 ret = relayd_data_pending(&relayd->control_sock,
3834 stream->relayd_stream_id,
3835 stream->next_net_seq_num - 1);
3836 }
3837
3838 if (ret == 1) {
3839 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3840 goto data_pending;
3841 } else if (ret < 0) {
3842 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
3843 relayd->net_seq_idx);
3844 lttng_consumer_cleanup_relayd(relayd);
3845 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3846 goto data_not_pending;
3847 }
3848 }
3849
3850 /* Send end command for data pending. */
3851 ret = relayd_end_data_pending(
3852 &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
3853 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3854 if (ret < 0) {
3855 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
3856 relayd->net_seq_idx);
3857 lttng_consumer_cleanup_relayd(relayd);
3858 goto data_not_pending;
3859 }
3860 if (is_data_inflight) {
3861 goto data_pending;
3862 }
3863 }
3864
3865 /*
3866 * Finding _no_ node in the hash table and no inflight data means that the
3867 * stream(s) have been removed thus data is guaranteed to be available for
3868 * analysis from the trace files.
3869 */
3870
3871 data_not_pending:
3872 /* Data is available to be read by a viewer. */
3873 pthread_mutex_unlock(&the_consumer_data.lock);
3874 return 0;
3875
3876 data_pending:
3877 /* Data is still being extracted from buffers. */
3878 pthread_mutex_unlock(&the_consumer_data.lock);
3879 return 1;
3880 }
3881
3882 /*
3883 * Send a ret code status message to the sessiond daemon.
3884 *
3885 * Return the sendmsg() return value.
3886 */
3887 int consumer_send_status_msg(int sock, int ret_code)
3888 {
3889 struct lttcomm_consumer_status_msg msg;
3890
3891 memset(&msg, 0, sizeof(msg));
3892 msg.ret_code = (lttcomm_return_code) ret_code;
3893
3894 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3895 }
3896
3897 /*
3898 * Send a channel status message to the sessiond daemon.
3899 *
3900 * Return the sendmsg() return value.
3901 */
3902 int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel)
3903 {
3904 struct lttcomm_consumer_status_channel msg;
3905
3906 LTTNG_ASSERT(sock >= 0);
3907
3908 memset(&msg, 0, sizeof(msg));
3909 if (!channel) {
3910 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
3911 } else {
3912 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3913 msg.key = channel->key;
3914 msg.stream_count = channel->streams.count;
3915 }
3916
3917 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3918 }
3919
3920 unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
3921 unsigned long produced_pos,
3922 uint64_t nb_packets_per_stream,
3923 uint64_t max_sb_size)
3924 {
3925 unsigned long start_pos;
3926
3927 if (!nb_packets_per_stream) {
3928 return consumed_pos; /* Grab everything */
3929 }
3930 start_pos = produced_pos - lttng_offset_align_floor(produced_pos, max_sb_size);
3931 start_pos -= max_sb_size * nb_packets_per_stream;
3932 if ((long) (start_pos - consumed_pos) < 0) {
3933 return consumed_pos; /* Grab everything */
3934 }
3935 return start_pos;
3936 }
3937
3938 /* Stream lock must be held by the caller. */
3939 static int sample_stream_positions(struct lttng_consumer_stream *stream,
3940 unsigned long *produced,
3941 unsigned long *consumed)
3942 {
3943 int ret;
3944
3945 ASSERT_LOCKED(stream->lock);
3946
3947 ret = lttng_consumer_sample_snapshot_positions(stream);
3948 if (ret < 0) {
3949 ERR("Failed to sample snapshot positions");
3950 goto end;
3951 }
3952
3953 ret = lttng_consumer_get_produced_snapshot(stream, produced);
3954 if (ret < 0) {
3955 ERR("Failed to sample produced position");
3956 goto end;
3957 }
3958
3959 ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
3960 if (ret < 0) {
3961 ERR("Failed to sample consumed position");
3962 goto end;
3963 }
3964
3965 end:
3966 return ret;
3967 }
3968
3969 /*
3970 * Sample the rotate position for all the streams of a channel. If a stream
3971 * is already at the rotate position (produced == consumed), we flag it as
3972 * ready for rotation. The rotation of ready streams occurs after we have
3973 * replied to the session daemon that we have finished sampling the positions.
3974 * Must be called with RCU read-side lock held to ensure existence of channel.
3975 *
3976 * Returns 0 on success, < 0 on error
3977 */
3978 int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
3979 uint64_t key,
3980 uint64_t relayd_id)
3981 {
3982 int ret;
3983 struct lttng_consumer_stream *stream;
3984 struct lttng_ht_iter iter;
3985 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
3986 struct lttng_dynamic_array stream_rotation_positions;
3987 uint64_t next_chunk_id, stream_count = 0;
3988 enum lttng_trace_chunk_status chunk_status;
3989 const bool is_local_trace = relayd_id == -1ULL;
3990 struct consumer_relayd_sock_pair *relayd = nullptr;
3991 bool rotating_to_new_chunk = true;
3992 /* Array of `struct lttng_consumer_stream *` */
3993 struct lttng_dynamic_pointer_array streams_packet_to_open;
3994 size_t stream_idx;
3995
3996 ASSERT_RCU_READ_LOCKED();
3997
3998 DBG("Consumer sample rotate position for channel %" PRIu64, key);
3999
4000 lttng_dynamic_array_init(&stream_rotation_positions,
4001 sizeof(struct relayd_stream_rotation_position),
4002 nullptr);
4003 lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
4004
4005 lttng::urcu::read_lock_guard read_lock;
4006
4007 pthread_mutex_lock(&channel->lock);
4008 LTTNG_ASSERT(channel->trace_chunk);
4009 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
4010 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4011 ret = -1;
4012 goto end_unlock_channel;
4013 }
4014
4015 cds_lfht_for_each_entry_duplicate(ht->ht,
4016 ht->hash_fct(&channel->key, lttng_ht_seed),
4017 ht->match_fct,
4018 &channel->key,
4019 &iter.iter,
4020 stream,
4021 node_channel_id.node)
4022 {
4023 unsigned long produced_pos = 0, consumed_pos = 0;
4024
4025 health_code_update();
4026
4027 /*
4028 * Lock stream because we are about to change its state.
4029 */
4030 pthread_mutex_lock(&stream->lock);
4031
4032 if (stream->trace_chunk == stream->chan->trace_chunk) {
4033 rotating_to_new_chunk = false;
4034 }
4035
4036 /*
4037 * Do not flush a packet when rotating from a NULL trace
4038 * chunk. The stream has no means to output data, and the prior
4039 * rotation which rotated to NULL performed that side-effect
4040 * already. No new data can be produced when a stream has no
4041 * associated trace chunk (e.g. a stop followed by a rotate).
4042 */
4043 if (stream->trace_chunk) {
4044 bool flush_active;
4045
4046 if (stream->metadata_flag) {
4047 /*
4048 * Don't produce an empty metadata packet,
4049 * simply close the current one.
4050 *
4051 * Metadata is regenerated on every trace chunk
4052 * switch; there is no concern that no data was
4053 * produced.
4054 */
4055 flush_active = true;
4056 } else {
4057 /*
4058 * Only flush an empty packet if the "packet
4059 * open" could not be performed on transition
4060 * to a new trace chunk and no packets were
4061 * consumed within the chunk's lifetime.
4062 */
4063 if (stream->opened_packet_in_current_trace_chunk) {
4064 flush_active = true;
4065 } else {
4066 /*
4067 * Stream could have been full at the
4068 * time of rotation, but then have had
4069 * no activity at all.
4070 *
4071 * It is important to flush a packet
4072 * to prevent 0-length files from being
4073 * produced as most viewers choke on
4074 * them.
4075 *
4076 * Unfortunately viewers will not be
4077 * able to know that tracing was active
4078 * for this stream during this trace
4079 * chunk's lifetime.
4080 */
4081 ret = sample_stream_positions(
4082 stream, &produced_pos, &consumed_pos);
4083 if (ret) {
4084 goto end_unlock_stream;
4085 }
4086
4087 /*
4088 * Don't flush an empty packet if data
4089 * was produced; it will be consumed
4090 * before the rotation completes.
4091 */
4092 flush_active = produced_pos != consumed_pos;
4093 if (!flush_active) {
4094 const char *trace_chunk_name;
4095 uint64_t trace_chunk_id;
4096
4097 chunk_status = lttng_trace_chunk_get_name(
4098 stream->trace_chunk,
4099 &trace_chunk_name,
4100 nullptr);
4101 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
4102 trace_chunk_name = "none";
4103 }
4104
4105 /*
4106 * Consumer trace chunks are
4107 * never anonymous.
4108 */
4109 chunk_status = lttng_trace_chunk_get_id(
4110 stream->trace_chunk, &trace_chunk_id);
4111 LTTNG_ASSERT(chunk_status ==
4112 LTTNG_TRACE_CHUNK_STATUS_OK);
4113
4114 DBG("Unable to open packet for stream during trace chunk's lifetime. "
4115 "Flushing an empty packet to prevent an empty file from being created: "
4116 "stream id = %" PRIu64
4117 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
4118 stream->key,
4119 trace_chunk_name,
4120 trace_chunk_id);
4121 }
4122 }
4123 }
4124
4125 /*
4126 * Close the current packet before sampling the
4127 * ring buffer positions.
4128 */
4129 ret = consumer_stream_flush_buffer(stream, flush_active);
4130 if (ret < 0) {
4131 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
4132 stream->key);
4133 goto end_unlock_stream;
4134 }
4135 }
4136
4137 ret = lttng_consumer_take_snapshot(stream);
4138 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4139 ERR("Failed to sample snapshot position during channel rotation");
4140 goto end_unlock_stream;
4141 }
4142 if (!ret) {
4143 ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
4144 if (ret < 0) {
4145 ERR("Failed to sample produced position during channel rotation");
4146 goto end_unlock_stream;
4147 }
4148
4149 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
4150 if (ret < 0) {
4151 ERR("Failed to sample consumed position during channel rotation");
4152 goto end_unlock_stream;
4153 }
4154 }
4155 /*
4156 * Align produced position on the start-of-packet boundary of the first
4157 * packet going into the next trace chunk.
4158 */
4159 produced_pos = lttng_align_floor(produced_pos, stream->max_sb_size);
4160 if (consumed_pos == produced_pos) {
4161 DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
4162 stream->key,
4163 produced_pos,
4164 consumed_pos);
4165 stream->rotate_ready = true;
4166 } else {
4167 DBG("Different consumed and produced positions "
4168 "for stream %" PRIu64 " produced = %lu consumed = %lu",
4169 stream->key,
4170 produced_pos,
4171 consumed_pos);
4172 }
4173 /*
4174 * The rotation position is based on the packet_seq_num of the
4175 * packet following the last packet that was consumed for this
4176 * stream, incremented by the offset between produced and
4177 * consumed positions. This rotation position is a lower bound
4178 * (inclusive) at which the next trace chunk starts. Since it
4179 * is a lower bound, it is OK if the packet_seq_num does not
4180 * correspond exactly to the same packet identified by the
4181 * consumed_pos, which can happen in overwrite mode.
4182 */
4183 if (stream->sequence_number_unavailable) {
4184 /*
4185 * Rotation should never be performed on a session which
4186 * interacts with a pre-2.8 lttng-modules, which does
4187 * not implement packet sequence number.
4188 */
4189 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
4190 stream->key);
4191 ret = -1;
4192 goto end_unlock_stream;
4193 }
4194 stream->rotate_position = stream->last_sequence_number + 1 +
4195 ((produced_pos - consumed_pos) / stream->max_sb_size);
4196 DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
4197 stream->key,
4198 stream->rotate_position);
4199
4200 if (!is_local_trace) {
4201 /*
4202 * The relay daemon control protocol expects a rotation
4203 * position as "the sequence number of the first packet
4204 * _after_ the current trace chunk".
4205 */
4206 const struct relayd_stream_rotation_position position = {
4207 .stream_id = stream->relayd_stream_id,
4208 .rotate_at_seq_num = stream->rotate_position,
4209 };
4210
4211 ret = lttng_dynamic_array_add_element(&stream_rotation_positions,
4212 &position);
4213 if (ret) {
4214 ERR("Failed to allocate stream rotation position");
4215 goto end_unlock_stream;
4216 }
4217 stream_count++;
4218 }
4219
4220 stream->opened_packet_in_current_trace_chunk = false;
4221
4222 if (rotating_to_new_chunk && !stream->metadata_flag) {
4223 /*
4224 * Attempt to flush an empty packet as close to the
4225 * rotation point as possible. In the event where a
4226 * stream remains inactive after the rotation point,
4227 * this ensures that the new trace chunk has a
4228 * beginning timestamp set at the begining of the
4229 * trace chunk instead of only creating an empty
4230 * packet when the trace chunk is stopped.
4231 *
4232 * This indicates to the viewers that the stream
4233 * was being recorded, but more importantly it
4234 * allows viewers to determine a useable trace
4235 * intersection.
4236 *
4237 * This presents a problem in the case where the
4238 * ring-buffer is completely full.
4239 *
4240 * Consider the following scenario:
4241 * - The consumption of data is slow (slow network,
4242 * for instance),
4243 * - The ring buffer is full,
4244 * - A rotation is initiated,
4245 * - The flush below does nothing (no space left to
4246 * open a new packet),
4247 * - The other streams rotate very soon, and new
4248 * data is produced in the new chunk,
4249 * - This stream completes its rotation long after the
4250 * rotation was initiated
4251 * - The session is stopped before any event can be
4252 * produced in this stream's buffers.
4253 *
4254 * The resulting trace chunk will have a single packet
4255 * temporaly at the end of the trace chunk for this
4256 * stream making the stream intersection more narrow
4257 * than it should be.
4258 *
4259 * To work-around this, an empty flush is performed
4260 * after the first consumption of a packet during a
4261 * rotation if open_packet fails. The idea is that
4262 * consuming a packet frees enough space to switch
4263 * packets in this scenario and allows the tracer to
4264 * "stamp" the beginning of the new trace chunk at the
4265 * earliest possible point.
4266 *
4267 * The packet open is performed after the channel
4268 * rotation to ensure that no attempt to open a packet
4269 * is performed in a stream that has no active trace
4270 * chunk.
4271 */
4272 ret = lttng_dynamic_pointer_array_add_pointer(&streams_packet_to_open,
4273 stream);
4274 if (ret) {
4275 PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
4276 ret = -1;
4277 goto end_unlock_stream;
4278 }
4279 }
4280
4281 pthread_mutex_unlock(&stream->lock);
4282 }
4283 stream = nullptr;
4284
4285 if (!is_local_trace) {
4286 relayd = consumer_find_relayd(relayd_id);
4287 if (!relayd) {
4288 ERR("Failed to find relayd %" PRIu64, relayd_id);
4289 ret = -1;
4290 goto end_unlock_channel;
4291 }
4292
4293 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4294 ret = relayd_rotate_streams(&relayd->control_sock,
4295 stream_count,
4296 rotating_to_new_chunk ? &next_chunk_id : nullptr,
4297 (const struct relayd_stream_rotation_position *)
4298 stream_rotation_positions.buffer.data);
4299 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4300 if (ret < 0) {
4301 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
4302 relayd->net_seq_idx);
4303 lttng_consumer_cleanup_relayd(relayd);
4304 goto end_unlock_channel;
4305 }
4306 }
4307
4308 for (stream_idx = 0;
4309 stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
4310 stream_idx++) {
4311 enum consumer_stream_open_packet_status status;
4312
4313 stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
4314 &streams_packet_to_open, stream_idx);
4315
4316 pthread_mutex_lock(&stream->lock);
4317 status = consumer_stream_open_packet(stream);
4318 pthread_mutex_unlock(&stream->lock);
4319 switch (status) {
4320 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
4321 DBG("Opened a packet after a rotation: stream id = %" PRIu64
4322 ", channel name = %s, session id = %" PRIu64,
4323 stream->key,
4324 stream->chan->name,
4325 stream->chan->session_id);
4326 break;
4327 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
4328 /*
4329 * Can't open a packet as there is no space left
4330 * in the buffer. A new packet will be opened
4331 * once one has been consumed.
4332 */
4333 DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
4334 ", channel name = %s, session id = %" PRIu64,
4335 stream->key,
4336 stream->chan->name,
4337 stream->chan->session_id);
4338 break;
4339 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
4340 /* Logged by callee. */
4341 ret = -1;
4342 goto end_unlock_channel;
4343 default:
4344 abort();
4345 }
4346 }
4347
4348 pthread_mutex_unlock(&channel->lock);
4349 ret = 0;
4350 goto end;
4351
4352 end_unlock_stream:
4353 pthread_mutex_unlock(&stream->lock);
4354 end_unlock_channel:
4355 pthread_mutex_unlock(&channel->lock);
4356 end:
4357 lttng_dynamic_array_reset(&stream_rotation_positions);
4358 lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
4359 return ret;
4360 }
4361
4362 static int consumer_clear_buffer(struct lttng_consumer_stream *stream)
4363 {
4364 int ret = 0;
4365 unsigned long consumed_pos_before, consumed_pos_after;
4366
4367 ret = lttng_consumer_sample_snapshot_positions(stream);
4368 if (ret < 0) {
4369 ERR("Taking snapshot positions");
4370 goto end;
4371 }
4372
4373 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4374 if (ret < 0) {
4375 ERR("Consumed snapshot position");
4376 goto end;
4377 }
4378
4379 switch (the_consumer_data.type) {
4380 case LTTNG_CONSUMER_KERNEL:
4381 ret = kernctl_buffer_clear(stream->wait_fd);
4382 if (ret < 0) {
4383 ERR("Failed to clear kernel stream (ret = %d)", ret);
4384 goto end;
4385 }
4386 break;
4387 case LTTNG_CONSUMER32_UST:
4388 case LTTNG_CONSUMER64_UST:
4389 ret = lttng_ustconsumer_clear_buffer(stream);
4390 if (ret < 0) {
4391 ERR("Failed to clear ust stream (ret = %d)", ret);
4392 goto end;
4393 }
4394 break;
4395 default:
4396 ERR("Unknown consumer_data type");
4397 abort();
4398 }
4399
4400 ret = lttng_consumer_sample_snapshot_positions(stream);
4401 if (ret < 0) {
4402 ERR("Taking snapshot positions");
4403 goto end;
4404 }
4405 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4406 if (ret < 0) {
4407 ERR("Consumed snapshot position");
4408 goto end;
4409 }
4410 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4411 end:
4412 return ret;
4413 }
4414
4415 static int consumer_clear_stream(struct lttng_consumer_stream *stream)
4416 {
4417 int ret;
4418
4419 ret = consumer_stream_flush_buffer(stream, true);
4420 if (ret < 0) {
4421 ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
4422 ret = LTTCOMM_CONSUMERD_FATAL;
4423 goto error;
4424 }
4425
4426 ret = consumer_clear_buffer(stream);
4427 if (ret < 0) {
4428 ERR("Failed to clear stream %" PRIu64 " during channel clear", stream->key);
4429 ret = LTTCOMM_CONSUMERD_FATAL;
4430 goto error;
4431 }
4432
4433 ret = LTTCOMM_CONSUMERD_SUCCESS;
4434 error:
4435 return ret;
4436 }
4437
4438 static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
4439 {
4440 int ret;
4441 struct lttng_consumer_stream *stream;
4442
4443 lttng::urcu::read_lock_guard read_lock;
4444 pthread_mutex_lock(&channel->lock);
4445 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
4446 health_code_update();
4447 pthread_mutex_lock(&stream->lock);
4448 ret = consumer_clear_stream(stream);
4449 if (ret) {
4450 goto error_unlock;
4451 }
4452 pthread_mutex_unlock(&stream->lock);
4453 }
4454 pthread_mutex_unlock(&channel->lock);
4455 return 0;
4456
4457 error_unlock:
4458 pthread_mutex_unlock(&stream->lock);
4459 pthread_mutex_unlock(&channel->lock);
4460 return ret;
4461 }
4462
4463 /*
4464 * Check if a stream is ready to be rotated after extracting it.
4465 *
4466 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4467 * error. Stream lock must be held.
4468 */
4469 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4470 {
4471 DBG("Check is rotate ready for stream %" PRIu64 " ready %u rotate_position %" PRIu64
4472 " last_sequence_number %" PRIu64,
4473 stream->key,
4474 stream->rotate_ready,
4475 stream->rotate_position,
4476 stream->last_sequence_number);
4477 if (stream->rotate_ready) {
4478 return 1;
4479 }
4480
4481 /*
4482 * If packet seq num is unavailable, it means we are interacting
4483 * with a pre-2.8 lttng-modules which does not implement the
4484 * sequence number. Rotation should never be used by sessiond in this
4485 * scenario.
4486 */
4487 if (stream->sequence_number_unavailable) {
4488 ERR("Internal error: rotation used on stream %" PRIu64
4489 " with unavailable sequence number",
4490 stream->key);
4491 return -1;
4492 }
4493
4494 if (stream->rotate_position == -1ULL || stream->last_sequence_number == -1ULL) {
4495 return 0;
4496 }
4497
4498 /*
4499 * Rotate position not reached yet. The stream rotate position is
4500 * the position of the next packet belonging to the next trace chunk,
4501 * but consumerd considers rotation ready when reaching the last
4502 * packet of the current chunk, hence the "rotate_position - 1".
4503 */
4504
4505 DBG("Check is rotate ready for stream %" PRIu64 " last_sequence_number %" PRIu64
4506 " rotate_position %" PRIu64,
4507 stream->key,
4508 stream->last_sequence_number,
4509 stream->rotate_position);
4510 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4511 return 1;
4512 }
4513
4514 return 0;
4515 }
4516
4517 /*
4518 * Reset the state for a stream after a rotation occurred.
4519 */
4520 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4521 {
4522 DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64, stream->key);
4523 stream->rotate_position = -1ULL;
4524 stream->rotate_ready = false;
4525 }
4526
4527 /*
4528 * Perform the rotation a local stream file.
4529 */
4530 static int rotate_local_stream(struct lttng_consumer_stream *stream)
4531 {
4532 int ret = 0;
4533
4534 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
4535 stream->key,
4536 stream->chan->key);
4537 stream->tracefile_size_current = 0;
4538 stream->tracefile_count_current = 0;
4539
4540 if (stream->out_fd >= 0) {
4541 ret = close(stream->out_fd);
4542 if (ret) {
4543 PERROR("Failed to close stream out_fd of channel \"%s\"",
4544 stream->chan->name);
4545 }
4546 stream->out_fd = -1;
4547 }
4548
4549 if (stream->index_file) {
4550 lttng_index_file_put(stream->index_file);
4551 stream->index_file = nullptr;
4552 }
4553
4554 if (!stream->trace_chunk) {
4555 goto end;
4556 }
4557
4558 ret = consumer_stream_create_output_files(stream, true);
4559 end:
4560 return ret;
4561 }
4562
4563 /*
4564 * Performs the stream rotation for the rotate session feature if needed.
4565 * It must be called with the channel and stream locks held.
4566 *
4567 * Return 0 on success, a negative number of error.
4568 */
4569 int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
4570 {
4571 int ret;
4572
4573 DBG("Consumer rotate stream %" PRIu64, stream->key);
4574
4575 /*
4576 * Update the stream's 'current' chunk to the session's (channel)
4577 * now-current chunk.
4578 */
4579 lttng_trace_chunk_put(stream->trace_chunk);
4580 if (stream->chan->trace_chunk == stream->trace_chunk) {
4581 /*
4582 * A channel can be rotated and not have a "next" chunk
4583 * to transition to. In that case, the channel's "current chunk"
4584 * has not been closed yet, but it has not been updated to
4585 * a "next" trace chunk either. Hence, the stream, like its
4586 * parent channel, becomes part of no chunk and can't output
4587 * anything until a new trace chunk is created.
4588 */
4589 stream->trace_chunk = nullptr;
4590 } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
4591 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4592 ret = -1;
4593 goto error;
4594 } else {
4595 /*
4596 * Update the stream's trace chunk to its parent channel's
4597 * current trace chunk.
4598 */
4599 stream->trace_chunk = stream->chan->trace_chunk;
4600 }
4601
4602 if (stream->net_seq_idx == (uint64_t) -1ULL) {
4603 ret = rotate_local_stream(stream);
4604 if (ret < 0) {
4605 ERR("Failed to rotate stream, ret = %i", ret);
4606 goto error;
4607 }
4608 }
4609
4610 if (stream->metadata_flag && stream->trace_chunk) {
4611 /*
4612 * If the stream has transitioned to a new trace
4613 * chunk, the metadata should be re-dumped to the
4614 * newest chunk.
4615 *
4616 * However, it is possible for a stream to transition to
4617 * a "no-chunk" state. This can happen if a rotation
4618 * occurs on an inactive session. In such cases, the metadata
4619 * regeneration will happen when the next trace chunk is
4620 * created.
4621 */
4622 ret = consumer_metadata_stream_dump(stream);
4623 if (ret) {
4624 goto error;
4625 }
4626 }
4627 lttng_consumer_reset_stream_rotate_state(stream);
4628
4629 ret = 0;
4630
4631 error:
4632 return ret;
4633 }
4634
4635 /*
4636 * Rotate all the ready streams now.
4637 *
4638 * This is especially important for low throughput streams that have already
4639 * been consumed, we cannot wait for their next packet to perform the
4640 * rotation.
4641 * Need to be called with RCU read-side lock held to ensure existence of
4642 * channel.
4643 *
4644 * Returns 0 on success, < 0 on error
4645 */
4646 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
4647 {
4648 int ret;
4649 struct lttng_consumer_stream *stream;
4650 struct lttng_ht_iter iter;
4651 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
4652
4653 ASSERT_RCU_READ_LOCKED();
4654
4655 lttng::urcu::read_lock_guard read_lock;
4656
4657 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4658
4659 cds_lfht_for_each_entry_duplicate(ht->ht,
4660 ht->hash_fct(&channel->key, lttng_ht_seed),
4661 ht->match_fct,
4662 &channel->key,
4663 &iter.iter,
4664 stream,
4665 node_channel_id.node)
4666 {
4667 health_code_update();
4668
4669 pthread_mutex_lock(&stream->chan->lock);
4670 pthread_mutex_lock(&stream->lock);
4671
4672 if (!stream->rotate_ready) {
4673 pthread_mutex_unlock(&stream->lock);
4674 pthread_mutex_unlock(&stream->chan->lock);
4675 continue;
4676 }
4677 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4678
4679 ret = lttng_consumer_rotate_stream(stream);
4680 pthread_mutex_unlock(&stream->lock);
4681 pthread_mutex_unlock(&stream->chan->lock);
4682 if (ret) {
4683 goto end;
4684 }
4685 }
4686
4687 ret = 0;
4688
4689 end:
4690 return ret;
4691 }
4692
4693 enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
4694 const lttng_uuid& sessiond_uuid)
4695 {
4696 enum lttcomm_return_code ret;
4697 char uuid_str[LTTNG_UUID_STR_LEN];
4698
4699 if (ctx->sessiond_uuid.is_set) {
4700 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
4701 goto end;
4702 }
4703
4704 ctx->sessiond_uuid.is_set = true;
4705 ctx->sessiond_uuid.value = sessiond_uuid;
4706 ret = LTTCOMM_CONSUMERD_SUCCESS;
4707 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4708 DBG("Received session daemon UUID: %s", uuid_str);
4709 end:
4710 return ret;
4711 }
4712
4713 enum lttcomm_return_code
4714 lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
4715 uint64_t session_id,
4716 uint64_t chunk_id,
4717 time_t chunk_creation_timestamp,
4718 const char *chunk_override_name,
4719 const struct lttng_credentials *credentials,
4720 struct lttng_directory_handle *chunk_directory_handle)
4721 {
4722 int ret;
4723 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4724 struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
4725 enum lttng_trace_chunk_status chunk_status;
4726 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4727 char creation_timestamp_buffer[ISO8601_STR_LEN];
4728 const char *relayd_id_str = "(none)";
4729 const char *creation_timestamp_str;
4730 struct lttng_ht_iter iter;
4731 struct lttng_consumer_channel *channel;
4732
4733 if (relayd_id) {
4734 /* Only used for logging purposes. */
4735 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
4736 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4737 relayd_id_str = relayd_id_buffer;
4738 } else {
4739 relayd_id_str = "(formatting error)";
4740 }
4741 }
4742
4743 /* Local protocol error. */
4744 LTTNG_ASSERT(chunk_creation_timestamp);
4745 ret = time_to_iso8601_str(chunk_creation_timestamp,
4746 creation_timestamp_buffer,
4747 sizeof(creation_timestamp_buffer));
4748 creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)";
4749
4750 DBG("Consumer create trace chunk command: relay_id = %s"
4751 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", chunk_override_name = %s"
4752 ", chunk_creation_timestamp = %s",
4753 relayd_id_str,
4754 session_id,
4755 chunk_id,
4756 chunk_override_name ?: "(none)",
4757 creation_timestamp_str);
4758
4759 /*
4760 * The trace chunk registry, as used by the consumer daemon, implicitly
4761 * owns the trace chunks. This is only needed in the consumer since
4762 * the consumer has no notion of a session beyond session IDs being
4763 * used to identify other objects.
4764 *
4765 * The lttng_trace_chunk_registry_publish() call below provides a
4766 * reference which is not released; it implicitly becomes the session
4767 * daemon's reference to the chunk in the consumer daemon.
4768 *
4769 * The lifetime of trace chunks in the consumer daemon is managed by
4770 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4771 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
4772 */
4773 created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
4774 if (!created_chunk) {
4775 ERR("Failed to create trace chunk");
4776 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4777 goto error;
4778 }
4779
4780 if (chunk_override_name) {
4781 chunk_status = lttng_trace_chunk_override_name(created_chunk, chunk_override_name);
4782 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4783 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4784 goto error;
4785 }
4786 }
4787
4788 if (chunk_directory_handle) {
4789 chunk_status = lttng_trace_chunk_set_credentials(created_chunk, credentials);
4790 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4791 ERR("Failed to set trace chunk credentials");
4792 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4793 goto error;
4794 }
4795 /*
4796 * The consumer daemon has no ownership of the chunk output
4797 * directory.
4798 */
4799 chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
4800 chunk_directory_handle = nullptr;
4801 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4802 ERR("Failed to set trace chunk's directory handle");
4803 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4804 goto error;
4805 }
4806 }
4807
4808 published_chunk = lttng_trace_chunk_registry_publish_chunk(
4809 the_consumer_data.chunk_registry, session_id, created_chunk);
4810 lttng_trace_chunk_put(created_chunk);
4811 created_chunk = nullptr;
4812 if (!published_chunk) {
4813 ERR("Failed to publish trace chunk");
4814 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4815 goto error;
4816 }
4817
4818 {
4819 lttng::urcu::read_lock_guard read_lock;
4820 cds_lfht_for_each_entry_duplicate(
4821 the_consumer_data.channels_by_session_id_ht->ht,
4822 the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
4823 lttng_ht_seed),
4824 the_consumer_data.channels_by_session_id_ht->match_fct,
4825 &session_id,
4826 &iter.iter,
4827 channel,
4828 channels_by_session_id_ht_node.node)
4829 {
4830 ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
4831 if (ret) {
4832 /*
4833 * Roll-back the creation of this chunk.
4834 *
4835 * This is important since the session daemon will
4836 * assume that the creation of this chunk failed and
4837 * will never ask for it to be closed, resulting
4838 * in a leak and an inconsistent state for some
4839 * channels.
4840 */
4841 enum lttcomm_return_code close_ret;
4842 char path[LTTNG_PATH_MAX];
4843
4844 DBG("Failed to set new trace chunk on existing channels, rolling back");
4845 close_ret =
4846 lttng_consumer_close_trace_chunk(relayd_id,
4847 session_id,
4848 chunk_id,
4849 chunk_creation_timestamp,
4850 nullptr,
4851 path);
4852 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4853 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4854 ", chunk_id = %" PRIu64,
4855 session_id,
4856 chunk_id);
4857 }
4858
4859 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4860 break;
4861 }
4862 }
4863 }
4864
4865 if (relayd_id) {
4866 struct consumer_relayd_sock_pair *relayd;
4867
4868 relayd = consumer_find_relayd(*relayd_id);
4869 if (relayd) {
4870 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4871 ret = relayd_create_trace_chunk(&relayd->control_sock, published_chunk);
4872 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4873 } else {
4874 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4875 }
4876
4877 if (!relayd || ret) {
4878 enum lttcomm_return_code close_ret;
4879 char path[LTTNG_PATH_MAX];
4880
4881 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
4882 session_id,
4883 chunk_id,
4884 chunk_creation_timestamp,
4885 nullptr,
4886 path);
4887 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4888 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
4889 ", chunk_id = %" PRIu64,
4890 session_id,
4891 chunk_id);
4892 }
4893
4894 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4895 goto error_unlock;
4896 }
4897 }
4898 error_unlock:
4899 error:
4900 /* Release the reference returned by the "publish" operation. */
4901 lttng_trace_chunk_put(published_chunk);
4902 lttng_trace_chunk_put(created_chunk);
4903 return ret_code;
4904 }
4905
4906 enum lttcomm_return_code
4907 lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
4908 uint64_t session_id,
4909 uint64_t chunk_id,
4910 time_t chunk_close_timestamp,
4911 const enum lttng_trace_chunk_command_type *close_command,
4912 char *path)
4913 {
4914 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4915 struct lttng_trace_chunk *chunk;
4916 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4917 const char *relayd_id_str = "(none)";
4918 const char *close_command_name = "none";
4919 struct lttng_ht_iter iter;
4920 struct lttng_consumer_channel *channel;
4921 enum lttng_trace_chunk_status chunk_status;
4922
4923 if (relayd_id) {
4924 int ret;
4925
4926 /* Only used for logging purposes. */
4927 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
4928 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4929 relayd_id_str = relayd_id_buffer;
4930 } else {
4931 relayd_id_str = "(formatting error)";
4932 }
4933 }
4934 if (close_command) {
4935 close_command_name = lttng_trace_chunk_command_type_get_name(*close_command);
4936 }
4937
4938 DBG("Consumer close trace chunk command: relayd_id = %s"
4939 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = %s",
4940 relayd_id_str,
4941 session_id,
4942 chunk_id,
4943 close_command_name);
4944
4945 chunk = lttng_trace_chunk_registry_find_chunk(
4946 the_consumer_data.chunk_registry, session_id, chunk_id);
4947 if (!chunk) {
4948 ERR("Failed to find chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4949 session_id,
4950 chunk_id);
4951 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4952 goto end;
4953 }
4954
4955 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, chunk_close_timestamp);
4956 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4957 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4958 goto end;
4959 }
4960
4961 if (close_command) {
4962 chunk_status = lttng_trace_chunk_set_close_command(chunk, *close_command);
4963 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4964 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4965 goto end;
4966 }
4967 }
4968
4969 /*
4970 * chunk is now invalid to access as we no longer hold a reference to
4971 * it; it is only kept around to compare it (by address) to the
4972 * current chunk found in the session's channels.
4973 */
4974 {
4975 lttng::urcu::read_lock_guard read_lock;
4976 cds_lfht_for_each_entry (
4977 the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
4978 int ret;
4979
4980 /*
4981 * Only change the channel's chunk to NULL if it still
4982 * references the chunk being closed. The channel may
4983 * reference a newer channel in the case of a session
4984 * rotation. When a session rotation occurs, the "next"
4985 * chunk is created before the "current" chunk is closed.
4986 */
4987 if (channel->trace_chunk != chunk) {
4988 continue;
4989 }
4990 ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
4991 if (ret) {
4992 /*
4993 * Attempt to close the chunk on as many channels as
4994 * possible.
4995 */
4996 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4997 }
4998 }
4999 }
5000 if (relayd_id) {
5001 int ret;
5002 struct consumer_relayd_sock_pair *relayd;
5003
5004 relayd = consumer_find_relayd(*relayd_id);
5005 if (relayd) {
5006 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
5007 ret = relayd_close_trace_chunk(&relayd->control_sock, chunk, path);
5008 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5009 } else {
5010 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
5011 }
5012
5013 if (!relayd || ret) {
5014 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
5015 goto error_unlock;
5016 }
5017 }
5018 error_unlock:
5019 end:
5020 /*
5021 * Release the reference returned by the "find" operation and
5022 * the session daemon's implicit reference to the chunk.
5023 */
5024 lttng_trace_chunk_put(chunk);
5025 lttng_trace_chunk_put(chunk);
5026
5027 return ret_code;
5028 }
5029
5030 enum lttcomm_return_code
5031 lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id)
5032 {
5033 int ret;
5034 enum lttcomm_return_code ret_code;
5035 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
5036 const char *relayd_id_str = "(none)";
5037 const bool is_local_trace = !relayd_id;
5038 struct consumer_relayd_sock_pair *relayd = nullptr;
5039 bool chunk_exists_local, chunk_exists_remote;
5040 lttng::urcu::read_lock_guard read_lock;
5041
5042 if (relayd_id) {
5043 /* Only used for logging purposes. */
5044 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), "%" PRIu64, *relayd_id);
5045 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
5046 relayd_id_str = relayd_id_buffer;
5047 } else {
5048 relayd_id_str = "(formatting error)";
5049 }
5050 }
5051
5052 DBG("Consumer trace chunk exists command: relayd_id = %s"
5053 ", chunk_id = %" PRIu64,
5054 relayd_id_str,
5055 chunk_id);
5056 ret = lttng_trace_chunk_registry_chunk_exists(
5057 the_consumer_data.chunk_registry, session_id, chunk_id, &chunk_exists_local);
5058 if (ret) {
5059 /* Internal error. */
5060 ERR("Failed to query the existence of a trace chunk");
5061 ret_code = LTTCOMM_CONSUMERD_FATAL;
5062 goto end;
5063 }
5064 DBG("Trace chunk %s locally", chunk_exists_local ? "exists" : "does not exist");
5065 if (chunk_exists_local) {
5066 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
5067 goto end;
5068 } else if (is_local_trace) {
5069 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5070 goto end;
5071 }
5072
5073 relayd = consumer_find_relayd(*relayd_id);
5074 if (!relayd) {
5075 ERR("Failed to find relayd %" PRIu64, *relayd_id);
5076 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5077 goto end_rcu_unlock;
5078 }
5079 DBG("Looking up existence of trace chunk on relay daemon");
5080 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
5081 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id, &chunk_exists_remote);
5082 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
5083 if (ret < 0) {
5084 ERR("Failed to look-up the existence of trace chunk on relay daemon");
5085 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
5086 goto end_rcu_unlock;
5087 }
5088
5089 ret_code = chunk_exists_remote ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
5090 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
5091 DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
5092
5093 end_rcu_unlock:
5094 end:
5095 return ret_code;
5096 }
5097
5098 static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
5099 {
5100 struct lttng_ht *ht;
5101 struct lttng_consumer_stream *stream;
5102 struct lttng_ht_iter iter;
5103 int ret;
5104
5105 ht = the_consumer_data.stream_per_chan_id_ht;
5106
5107 lttng::urcu::read_lock_guard read_lock;
5108 cds_lfht_for_each_entry_duplicate(ht->ht,
5109 ht->hash_fct(&channel->key, lttng_ht_seed),
5110 ht->match_fct,
5111 &channel->key,
5112 &iter.iter,
5113 stream,
5114 node_channel_id.node)
5115 {
5116 /*
5117 * Protect against teardown with mutex.
5118 */
5119 pthread_mutex_lock(&stream->lock);
5120 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5121 goto next;
5122 }
5123 ret = consumer_clear_stream(stream);
5124 if (ret) {
5125 goto error_unlock;
5126 }
5127 next:
5128 pthread_mutex_unlock(&stream->lock);
5129 }
5130 return LTTCOMM_CONSUMERD_SUCCESS;
5131
5132 error_unlock:
5133 pthread_mutex_unlock(&stream->lock);
5134 return ret;
5135 }
5136
5137 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
5138 {
5139 int ret;
5140
5141 DBG("Consumer clear channel %" PRIu64, channel->key);
5142
5143 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
5144 /*
5145 * Nothing to do for the metadata channel/stream.
5146 * Snapshot mechanism already take care of the metadata
5147 * handling/generation, and monitored channels only need to
5148 * have their data stream cleared..
5149 */
5150 ret = LTTCOMM_CONSUMERD_SUCCESS;
5151 goto end;
5152 }
5153
5154 if (!channel->monitor) {
5155 ret = consumer_clear_unmonitored_channel(channel);
5156 } else {
5157 ret = consumer_clear_monitored_channel(channel);
5158 }
5159 end:
5160 return ret;
5161 }
5162
5163 enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
5164 {
5165 struct lttng_consumer_stream *stream;
5166 enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
5167
5168 if (channel->metadata_stream) {
5169 ERR("Open channel packets command attempted on a metadata channel");
5170 ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
5171 goto end;
5172 }
5173
5174 {
5175 lttng::urcu::read_lock_guard read_lock;
5176 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
5177 enum consumer_stream_open_packet_status status;
5178
5179 pthread_mutex_lock(&stream->lock);
5180 if (cds_lfht_is_node_deleted(&stream->node.node)) {
5181 goto next;
5182 }
5183
5184 status = consumer_stream_open_packet(stream);
5185 switch (status) {
5186 case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
5187 DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
5188 ", channel name = %s, session id = %" PRIu64,
5189 stream->key,
5190 stream->chan->name,
5191 stream->chan->session_id);
5192 stream->opened_packet_in_current_trace_chunk = true;
5193 break;
5194 case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
5195 DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
5196 ", channel name = %s, session id = %" PRIu64,
5197 stream->key,
5198 stream->chan->name,
5199 stream->chan->session_id);
5200 break;
5201 case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
5202 /*
5203 * Only unexpected internal errors can lead to this
5204 * failing. Report an unknown error.
5205 */
5206 ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
5207 ", channel id = %" PRIu64 ", channel name = %s"
5208 ", session id = %" PRIu64,
5209 stream->key,
5210 channel->key,
5211 channel->name,
5212 channel->session_id);
5213 ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
5214 goto error_unlock;
5215 default:
5216 abort();
5217 }
5218
5219 next:
5220 pthread_mutex_unlock(&stream->lock);
5221 }
5222 }
5223 end_rcu_unlock:
5224 end:
5225 return ret;
5226
5227 error_unlock:
5228 pthread_mutex_unlock(&stream->lock);
5229 goto end_rcu_unlock;
5230 }
5231
5232 void lttng_consumer_sigbus_handle(void *addr)
5233 {
5234 lttng_ustconsumer_sigbus_handle(addr);
5235 }
This page took 0.181676 seconds and 4 git commands to generate.