trace-chunk: Introduce chunk "path", relayd session "ongoing_rotation", sessiond...
[lttng-tools.git] / src / common / consumer / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <inttypes.h>
31 #include <signal.h>
32
33 #include <bin/lttng-consumerd/health-consumerd.h>
34 #include <common/common.h>
35 #include <common/utils.h>
36 #include <common/time.h>
37 #include <common/compat/poll.h>
38 #include <common/compat/endian.h>
39 #include <common/index/index.h>
40 #include <common/kernel-ctl/kernel-ctl.h>
41 #include <common/sessiond-comm/relayd.h>
42 #include <common/sessiond-comm/sessiond-comm.h>
43 #include <common/kernel-consumer/kernel-consumer.h>
44 #include <common/relayd/relayd.h>
45 #include <common/ust-consumer/ust-consumer.h>
46 #include <common/consumer/consumer-timer.h>
47 #include <common/consumer/consumer.h>
48 #include <common/consumer/consumer-stream.h>
49 #include <common/consumer/consumer-testpoint.h>
50 #include <common/align.h>
51 #include <common/consumer/consumer-metadata-cache.h>
52 #include <common/trace-chunk.h>
53 #include <common/trace-chunk-registry.h>
54 #include <common/string-utils/format.h>
55 #include <common/dynamic-array.h>
56
57 struct lttng_consumer_global_data consumer_data = {
58 .stream_count = 0,
59 .need_update = 1,
60 .type = LTTNG_CONSUMER_UNKNOWN,
61 };
62
63 enum consumer_channel_action {
64 CONSUMER_CHANNEL_ADD,
65 CONSUMER_CHANNEL_DEL,
66 CONSUMER_CHANNEL_QUIT,
67 };
68
69 struct consumer_channel_msg {
70 enum consumer_channel_action action;
71 struct lttng_consumer_channel *chan; /* add */
72 uint64_t key; /* del */
73 };
74
75 /* Flag used to temporarily pause data consumption from testpoints. */
76 int data_consumption_paused;
77
78 /*
79 * Flag to inform the polling thread to quit when all fd hung up. Updated by
80 * the consumer_thread_receive_fds when it notices that all fds has hung up.
81 * Also updated by the signal handler (consumer_should_exit()). Read by the
82 * polling threads.
83 */
84 int consumer_quit;
85
86 /*
87 * Global hash table containing respectively metadata and data streams. The
88 * stream element in this ht should only be updated by the metadata poll thread
89 * for the metadata and the data poll thread for the data.
90 */
91 static struct lttng_ht *metadata_ht;
92 static struct lttng_ht *data_ht;
93
94 static const char *get_consumer_domain(void)
95 {
96 switch (consumer_data.type) {
97 case LTTNG_CONSUMER_KERNEL:
98 return DEFAULT_KERNEL_TRACE_DIR;
99 case LTTNG_CONSUMER64_UST:
100 /* Fall-through. */
101 case LTTNG_CONSUMER32_UST:
102 return DEFAULT_UST_TRACE_DIR;
103 default:
104 abort();
105 }
106 }
107
108 /*
109 * Notify a thread lttng pipe to poll back again. This usually means that some
110 * global state has changed so we just send back the thread in a poll wait
111 * call.
112 */
113 static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
114 {
115 struct lttng_consumer_stream *null_stream = NULL;
116
117 assert(pipe);
118
119 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
120 }
121
122 static void notify_health_quit_pipe(int *pipe)
123 {
124 ssize_t ret;
125
126 ret = lttng_write(pipe[1], "4", 1);
127 if (ret < 1) {
128 PERROR("write consumer health quit");
129 }
130 }
131
132 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
133 struct lttng_consumer_channel *chan,
134 uint64_t key,
135 enum consumer_channel_action action)
136 {
137 struct consumer_channel_msg msg;
138 ssize_t ret;
139
140 memset(&msg, 0, sizeof(msg));
141
142 msg.action = action;
143 msg.chan = chan;
144 msg.key = key;
145 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
146 if (ret < sizeof(msg)) {
147 PERROR("notify_channel_pipe write error");
148 }
149 }
150
151 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
152 uint64_t key)
153 {
154 notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
155 }
156
157 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
158 struct lttng_consumer_channel **chan,
159 uint64_t *key,
160 enum consumer_channel_action *action)
161 {
162 struct consumer_channel_msg msg;
163 ssize_t ret;
164
165 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
166 if (ret < sizeof(msg)) {
167 ret = -1;
168 goto error;
169 }
170 *action = msg.action;
171 *chan = msg.chan;
172 *key = msg.key;
173 error:
174 return (int) ret;
175 }
176
177 /*
178 * Cleanup the stream list of a channel. Those streams are not yet globally
179 * visible
180 */
181 static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
182 {
183 struct lttng_consumer_stream *stream, *stmp;
184
185 assert(channel);
186
187 /* Delete streams that might have been left in the stream list. */
188 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
189 send_node) {
190 cds_list_del(&stream->send_node);
191 /*
192 * Once a stream is added to this list, the buffers were created so we
193 * have a guarantee that this call will succeed. Setting the monitor
194 * mode to 0 so we don't lock nor try to delete the stream from the
195 * global hash table.
196 */
197 stream->monitor = 0;
198 consumer_stream_destroy(stream, NULL);
199 }
200 }
201
202 /*
203 * Find a stream. The consumer_data.lock must be locked during this
204 * call.
205 */
206 static struct lttng_consumer_stream *find_stream(uint64_t key,
207 struct lttng_ht *ht)
208 {
209 struct lttng_ht_iter iter;
210 struct lttng_ht_node_u64 *node;
211 struct lttng_consumer_stream *stream = NULL;
212
213 assert(ht);
214
215 /* -1ULL keys are lookup failures */
216 if (key == (uint64_t) -1ULL) {
217 return NULL;
218 }
219
220 rcu_read_lock();
221
222 lttng_ht_lookup(ht, &key, &iter);
223 node = lttng_ht_iter_get_node_u64(&iter);
224 if (node != NULL) {
225 stream = caa_container_of(node, struct lttng_consumer_stream, node);
226 }
227
228 rcu_read_unlock();
229
230 return stream;
231 }
232
233 static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
234 {
235 struct lttng_consumer_stream *stream;
236
237 rcu_read_lock();
238 stream = find_stream(key, ht);
239 if (stream) {
240 stream->key = (uint64_t) -1ULL;
241 /*
242 * We don't want the lookup to match, but we still need
243 * to iterate on this stream when iterating over the hash table. Just
244 * change the node key.
245 */
246 stream->node.key = (uint64_t) -1ULL;
247 }
248 rcu_read_unlock();
249 }
250
251 /*
252 * Return a channel object for the given key.
253 *
254 * RCU read side lock MUST be acquired before calling this function and
255 * protects the channel ptr.
256 */
257 struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
258 {
259 struct lttng_ht_iter iter;
260 struct lttng_ht_node_u64 *node;
261 struct lttng_consumer_channel *channel = NULL;
262
263 /* -1ULL keys are lookup failures */
264 if (key == (uint64_t) -1ULL) {
265 return NULL;
266 }
267
268 lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
269 node = lttng_ht_iter_get_node_u64(&iter);
270 if (node != NULL) {
271 channel = caa_container_of(node, struct lttng_consumer_channel, node);
272 }
273
274 return channel;
275 }
276
277 /*
278 * There is a possibility that the consumer does not have enough time between
279 * the close of the channel on the session daemon and the cleanup in here thus
280 * once we have a channel add with an existing key, we know for sure that this
281 * channel will eventually get cleaned up by all streams being closed.
282 *
283 * This function just nullifies the already existing channel key.
284 */
285 static void steal_channel_key(uint64_t key)
286 {
287 struct lttng_consumer_channel *channel;
288
289 rcu_read_lock();
290 channel = consumer_find_channel(key);
291 if (channel) {
292 channel->key = (uint64_t) -1ULL;
293 /*
294 * We don't want the lookup to match, but we still need to iterate on
295 * this channel when iterating over the hash table. Just change the
296 * node key.
297 */
298 channel->node.key = (uint64_t) -1ULL;
299 }
300 rcu_read_unlock();
301 }
302
303 static void free_channel_rcu(struct rcu_head *head)
304 {
305 struct lttng_ht_node_u64 *node =
306 caa_container_of(head, struct lttng_ht_node_u64, head);
307 struct lttng_consumer_channel *channel =
308 caa_container_of(node, struct lttng_consumer_channel, node);
309
310 switch (consumer_data.type) {
311 case LTTNG_CONSUMER_KERNEL:
312 break;
313 case LTTNG_CONSUMER32_UST:
314 case LTTNG_CONSUMER64_UST:
315 lttng_ustconsumer_free_channel(channel);
316 break;
317 default:
318 ERR("Unknown consumer_data type");
319 abort();
320 }
321 free(channel);
322 }
323
324 /*
325 * RCU protected relayd socket pair free.
326 */
327 static void free_relayd_rcu(struct rcu_head *head)
328 {
329 struct lttng_ht_node_u64 *node =
330 caa_container_of(head, struct lttng_ht_node_u64, head);
331 struct consumer_relayd_sock_pair *relayd =
332 caa_container_of(node, struct consumer_relayd_sock_pair, node);
333
334 /*
335 * Close all sockets. This is done in the call RCU since we don't want the
336 * socket fds to be reassigned thus potentially creating bad state of the
337 * relayd object.
338 *
339 * We do not have to lock the control socket mutex here since at this stage
340 * there is no one referencing to this relayd object.
341 */
342 (void) relayd_close(&relayd->control_sock);
343 (void) relayd_close(&relayd->data_sock);
344
345 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
346 free(relayd);
347 }
348
349 /*
350 * Destroy and free relayd socket pair object.
351 */
352 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
353 {
354 int ret;
355 struct lttng_ht_iter iter;
356
357 if (relayd == NULL) {
358 return;
359 }
360
361 DBG("Consumer destroy and close relayd socket pair");
362
363 iter.iter.node = &relayd->node.node;
364 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
365 if (ret != 0) {
366 /* We assume the relayd is being or is destroyed */
367 return;
368 }
369
370 /* RCU free() call */
371 call_rcu(&relayd->node.head, free_relayd_rcu);
372 }
373
374 /*
375 * Remove a channel from the global list protected by a mutex. This function is
376 * also responsible for freeing its data structures.
377 */
378 void consumer_del_channel(struct lttng_consumer_channel *channel)
379 {
380 struct lttng_ht_iter iter;
381
382 DBG("Consumer delete channel key %" PRIu64, channel->key);
383
384 pthread_mutex_lock(&consumer_data.lock);
385 pthread_mutex_lock(&channel->lock);
386
387 /* Destroy streams that might have been left in the stream list. */
388 clean_channel_stream_list(channel);
389
390 if (channel->live_timer_enabled == 1) {
391 consumer_timer_live_stop(channel);
392 }
393 if (channel->monitor_timer_enabled == 1) {
394 consumer_timer_monitor_stop(channel);
395 }
396
397 switch (consumer_data.type) {
398 case LTTNG_CONSUMER_KERNEL:
399 break;
400 case LTTNG_CONSUMER32_UST:
401 case LTTNG_CONSUMER64_UST:
402 lttng_ustconsumer_del_channel(channel);
403 break;
404 default:
405 ERR("Unknown consumer_data type");
406 assert(0);
407 goto end;
408 }
409
410 lttng_trace_chunk_put(channel->trace_chunk);
411 channel->trace_chunk = NULL;
412
413 if (channel->is_published) {
414 int ret;
415
416 rcu_read_lock();
417 iter.iter.node = &channel->node.node;
418 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
419 assert(!ret);
420
421 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
422 ret = lttng_ht_del(consumer_data.channels_by_session_id_ht,
423 &iter);
424 assert(!ret);
425 rcu_read_unlock();
426 }
427
428 channel->is_deleted = true;
429 call_rcu(&channel->node.head, free_channel_rcu);
430 end:
431 pthread_mutex_unlock(&channel->lock);
432 pthread_mutex_unlock(&consumer_data.lock);
433 }
434
435 /*
436 * Iterate over the relayd hash table and destroy each element. Finally,
437 * destroy the whole hash table.
438 */
439 static void cleanup_relayd_ht(void)
440 {
441 struct lttng_ht_iter iter;
442 struct consumer_relayd_sock_pair *relayd;
443
444 rcu_read_lock();
445
446 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
447 node.node) {
448 consumer_destroy_relayd(relayd);
449 }
450
451 rcu_read_unlock();
452
453 lttng_ht_destroy(consumer_data.relayd_ht);
454 }
455
456 /*
457 * Update the end point status of all streams having the given network sequence
458 * index (relayd index).
459 *
460 * It's atomically set without having the stream mutex locked which is fine
461 * because we handle the write/read race with a pipe wakeup for each thread.
462 */
463 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
464 enum consumer_endpoint_status status)
465 {
466 struct lttng_ht_iter iter;
467 struct lttng_consumer_stream *stream;
468
469 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
470
471 rcu_read_lock();
472
473 /* Let's begin with metadata */
474 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
475 if (stream->net_seq_idx == net_seq_idx) {
476 uatomic_set(&stream->endpoint_status, status);
477 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
478 }
479 }
480
481 /* Follow up by the data streams */
482 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
483 if (stream->net_seq_idx == net_seq_idx) {
484 uatomic_set(&stream->endpoint_status, status);
485 DBG("Delete flag set to data stream %d", stream->wait_fd);
486 }
487 }
488 rcu_read_unlock();
489 }
490
491 /*
492 * Cleanup a relayd object by flagging every associated streams for deletion,
493 * destroying the object meaning removing it from the relayd hash table,
494 * closing the sockets and freeing the memory in a RCU call.
495 *
496 * If a local data context is available, notify the threads that the streams'
497 * state have changed.
498 */
499 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
500 {
501 uint64_t netidx;
502
503 assert(relayd);
504
505 DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
506
507 /* Save the net sequence index before destroying the object */
508 netidx = relayd->net_seq_idx;
509
510 /*
511 * Delete the relayd from the relayd hash table, close the sockets and free
512 * the object in a RCU call.
513 */
514 consumer_destroy_relayd(relayd);
515
516 /* Set inactive endpoint to all streams */
517 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
518
519 /*
520 * With a local data context, notify the threads that the streams' state
521 * have changed. The write() action on the pipe acts as an "implicit"
522 * memory barrier ordering the updates of the end point status from the
523 * read of this status which happens AFTER receiving this notify.
524 */
525 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
526 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
527 }
528
529 /*
530 * Flag a relayd socket pair for destruction. Destroy it if the refcount
531 * reaches zero.
532 *
533 * RCU read side lock MUST be aquired before calling this function.
534 */
535 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
536 {
537 assert(relayd);
538
539 /* Set destroy flag for this object */
540 uatomic_set(&relayd->destroy_flag, 1);
541
542 /* Destroy the relayd if refcount is 0 */
543 if (uatomic_read(&relayd->refcount) == 0) {
544 consumer_destroy_relayd(relayd);
545 }
546 }
547
548 /*
549 * Completly destroy stream from every visiable data structure and the given
550 * hash table if one.
551 *
552 * One this call returns, the stream object is not longer usable nor visible.
553 */
554 void consumer_del_stream(struct lttng_consumer_stream *stream,
555 struct lttng_ht *ht)
556 {
557 consumer_stream_destroy(stream, ht);
558 }
559
560 /*
561 * XXX naming of del vs destroy is all mixed up.
562 */
563 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
564 {
565 consumer_stream_destroy(stream, data_ht);
566 }
567
568 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
569 {
570 consumer_stream_destroy(stream, metadata_ht);
571 }
572
573 void consumer_stream_update_channel_attributes(
574 struct lttng_consumer_stream *stream,
575 struct lttng_consumer_channel *channel)
576 {
577 stream->channel_read_only_attributes.tracefile_size =
578 channel->tracefile_size;
579 }
580
581 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
582 uint64_t stream_key,
583 const char *channel_name,
584 uint64_t relayd_id,
585 uint64_t session_id,
586 struct lttng_trace_chunk *trace_chunk,
587 int cpu,
588 int *alloc_ret,
589 enum consumer_channel_type type,
590 unsigned int monitor)
591 {
592 int ret;
593 struct lttng_consumer_stream *stream;
594
595 stream = zmalloc(sizeof(*stream));
596 if (stream == NULL) {
597 PERROR("malloc struct lttng_consumer_stream");
598 ret = -ENOMEM;
599 goto end;
600 }
601
602 if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
603 ERR("Failed to acquire trace chunk reference during the creation of a stream");
604 ret = -1;
605 goto error;
606 }
607
608 rcu_read_lock();
609 stream->key = stream_key;
610 stream->trace_chunk = trace_chunk;
611 stream->out_fd = -1;
612 stream->out_fd_offset = 0;
613 stream->output_written = 0;
614 stream->net_seq_idx = relayd_id;
615 stream->session_id = session_id;
616 stream->monitor = monitor;
617 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
618 stream->index_file = NULL;
619 stream->last_sequence_number = -1ULL;
620 stream->rotate_position = -1ULL;
621 pthread_mutex_init(&stream->lock, NULL);
622 pthread_mutex_init(&stream->metadata_timer_lock, NULL);
623
624 /* If channel is the metadata, flag this stream as metadata. */
625 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
626 stream->metadata_flag = 1;
627 /* Metadata is flat out. */
628 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
629 /* Live rendez-vous point. */
630 pthread_cond_init(&stream->metadata_rdv, NULL);
631 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
632 } else {
633 /* Format stream name to <channel_name>_<cpu_number> */
634 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
635 channel_name, cpu);
636 if (ret < 0) {
637 PERROR("snprintf stream name");
638 goto error;
639 }
640 }
641
642 /* Key is always the wait_fd for streams. */
643 lttng_ht_node_init_u64(&stream->node, stream->key);
644
645 /* Init node per channel id key */
646 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
647
648 /* Init session id node with the stream session id */
649 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
650
651 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
652 " relayd_id %" PRIu64 ", session_id %" PRIu64,
653 stream->name, stream->key, channel_key,
654 stream->net_seq_idx, stream->session_id);
655
656 rcu_read_unlock();
657 return stream;
658
659 error:
660 rcu_read_unlock();
661 lttng_trace_chunk_put(stream->trace_chunk);
662 free(stream);
663 end:
664 if (alloc_ret) {
665 *alloc_ret = ret;
666 }
667 return NULL;
668 }
669
670 /*
671 * Add a stream to the global list protected by a mutex.
672 */
673 void consumer_add_data_stream(struct lttng_consumer_stream *stream)
674 {
675 struct lttng_ht *ht = data_ht;
676
677 assert(stream);
678 assert(ht);
679
680 DBG3("Adding consumer stream %" PRIu64, stream->key);
681
682 pthread_mutex_lock(&consumer_data.lock);
683 pthread_mutex_lock(&stream->chan->lock);
684 pthread_mutex_lock(&stream->chan->timer_lock);
685 pthread_mutex_lock(&stream->lock);
686 rcu_read_lock();
687
688 /* Steal stream identifier to avoid having streams with the same key */
689 steal_stream_key(stream->key, ht);
690
691 lttng_ht_add_unique_u64(ht, &stream->node);
692
693 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
694 &stream->node_channel_id);
695
696 /*
697 * Add stream to the stream_list_ht of the consumer data. No need to steal
698 * the key since the HT does not use it and we allow to add redundant keys
699 * into this table.
700 */
701 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
702
703 /*
704 * When nb_init_stream_left reaches 0, we don't need to trigger any action
705 * in terms of destroying the associated channel, because the action that
706 * causes the count to become 0 also causes a stream to be added. The
707 * channel deletion will thus be triggered by the following removal of this
708 * stream.
709 */
710 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
711 /* Increment refcount before decrementing nb_init_stream_left */
712 cmm_smp_wmb();
713 uatomic_dec(&stream->chan->nb_init_stream_left);
714 }
715
716 /* Update consumer data once the node is inserted. */
717 consumer_data.stream_count++;
718 consumer_data.need_update = 1;
719
720 rcu_read_unlock();
721 pthread_mutex_unlock(&stream->lock);
722 pthread_mutex_unlock(&stream->chan->timer_lock);
723 pthread_mutex_unlock(&stream->chan->lock);
724 pthread_mutex_unlock(&consumer_data.lock);
725 }
726
727 /*
728 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
729 * be acquired before calling this.
730 */
731 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
732 {
733 int ret = 0;
734 struct lttng_ht_node_u64 *node;
735 struct lttng_ht_iter iter;
736
737 assert(relayd);
738
739 lttng_ht_lookup(consumer_data.relayd_ht,
740 &relayd->net_seq_idx, &iter);
741 node = lttng_ht_iter_get_node_u64(&iter);
742 if (node != NULL) {
743 goto end;
744 }
745 lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
746
747 end:
748 return ret;
749 }
750
751 /*
752 * Allocate and return a consumer relayd socket.
753 */
754 static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
755 uint64_t net_seq_idx)
756 {
757 struct consumer_relayd_sock_pair *obj = NULL;
758
759 /* net sequence index of -1 is a failure */
760 if (net_seq_idx == (uint64_t) -1ULL) {
761 goto error;
762 }
763
764 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
765 if (obj == NULL) {
766 PERROR("zmalloc relayd sock");
767 goto error;
768 }
769
770 obj->net_seq_idx = net_seq_idx;
771 obj->refcount = 0;
772 obj->destroy_flag = 0;
773 obj->control_sock.sock.fd = -1;
774 obj->data_sock.sock.fd = -1;
775 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
776 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
777
778 error:
779 return obj;
780 }
781
782 /*
783 * Find a relayd socket pair in the global consumer data.
784 *
785 * Return the object if found else NULL.
786 * RCU read-side lock must be held across this call and while using the
787 * returned object.
788 */
789 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
790 {
791 struct lttng_ht_iter iter;
792 struct lttng_ht_node_u64 *node;
793 struct consumer_relayd_sock_pair *relayd = NULL;
794
795 /* Negative keys are lookup failures */
796 if (key == (uint64_t) -1ULL) {
797 goto error;
798 }
799
800 lttng_ht_lookup(consumer_data.relayd_ht, &key,
801 &iter);
802 node = lttng_ht_iter_get_node_u64(&iter);
803 if (node != NULL) {
804 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
805 }
806
807 error:
808 return relayd;
809 }
810
811 /*
812 * Find a relayd and send the stream
813 *
814 * Returns 0 on success, < 0 on error
815 */
816 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
817 char *path)
818 {
819 int ret = 0;
820 struct consumer_relayd_sock_pair *relayd;
821
822 assert(stream);
823 assert(stream->net_seq_idx != -1ULL);
824 assert(path);
825
826 /* The stream is not metadata. Get relayd reference if exists. */
827 rcu_read_lock();
828 relayd = consumer_find_relayd(stream->net_seq_idx);
829 if (relayd != NULL) {
830 /* Add stream on the relayd */
831 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
832 ret = relayd_add_stream(&relayd->control_sock, stream->name,
833 get_consumer_domain(), path, &stream->relayd_stream_id,
834 stream->chan->tracefile_size,
835 stream->chan->tracefile_count,
836 stream->trace_chunk);
837 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
838 if (ret < 0) {
839 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
840 lttng_consumer_cleanup_relayd(relayd);
841 goto end;
842 }
843
844 uatomic_inc(&relayd->refcount);
845 stream->sent_to_relayd = 1;
846 } else {
847 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
848 stream->key, stream->net_seq_idx);
849 ret = -1;
850 goto end;
851 }
852
853 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
854 stream->name, stream->key, stream->net_seq_idx);
855
856 end:
857 rcu_read_unlock();
858 return ret;
859 }
860
861 /*
862 * Find a relayd and send the streams sent message
863 *
864 * Returns 0 on success, < 0 on error
865 */
866 int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
867 {
868 int ret = 0;
869 struct consumer_relayd_sock_pair *relayd;
870
871 assert(net_seq_idx != -1ULL);
872
873 /* The stream is not metadata. Get relayd reference if exists. */
874 rcu_read_lock();
875 relayd = consumer_find_relayd(net_seq_idx);
876 if (relayd != NULL) {
877 /* Add stream on the relayd */
878 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
879 ret = relayd_streams_sent(&relayd->control_sock);
880 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
881 if (ret < 0) {
882 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
883 lttng_consumer_cleanup_relayd(relayd);
884 goto end;
885 }
886 } else {
887 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
888 net_seq_idx);
889 ret = -1;
890 goto end;
891 }
892
893 ret = 0;
894 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
895
896 end:
897 rcu_read_unlock();
898 return ret;
899 }
900
901 /*
902 * Find a relayd and close the stream
903 */
904 void close_relayd_stream(struct lttng_consumer_stream *stream)
905 {
906 struct consumer_relayd_sock_pair *relayd;
907
908 /* The stream is not metadata. Get relayd reference if exists. */
909 rcu_read_lock();
910 relayd = consumer_find_relayd(stream->net_seq_idx);
911 if (relayd) {
912 consumer_stream_relayd_close(stream, relayd);
913 }
914 rcu_read_unlock();
915 }
916
917 /*
918 * Handle stream for relayd transmission if the stream applies for network
919 * streaming where the net sequence index is set.
920 *
921 * Return destination file descriptor or negative value on error.
922 */
923 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
924 size_t data_size, unsigned long padding,
925 struct consumer_relayd_sock_pair *relayd)
926 {
927 int outfd = -1, ret;
928 struct lttcomm_relayd_data_hdr data_hdr;
929
930 /* Safety net */
931 assert(stream);
932 assert(relayd);
933
934 /* Reset data header */
935 memset(&data_hdr, 0, sizeof(data_hdr));
936
937 if (stream->metadata_flag) {
938 /* Caller MUST acquire the relayd control socket lock */
939 ret = relayd_send_metadata(&relayd->control_sock, data_size);
940 if (ret < 0) {
941 goto error;
942 }
943
944 /* Metadata are always sent on the control socket. */
945 outfd = relayd->control_sock.sock.fd;
946 } else {
947 /* Set header with stream information */
948 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
949 data_hdr.data_size = htobe32(data_size);
950 data_hdr.padding_size = htobe32(padding);
951
952 /*
953 * Note that net_seq_num below is assigned with the *current* value of
954 * next_net_seq_num and only after that the next_net_seq_num will be
955 * increment. This is why when issuing a command on the relayd using
956 * this next value, 1 should always be substracted in order to compare
957 * the last seen sequence number on the relayd side to the last sent.
958 */
959 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
960 /* Other fields are zeroed previously */
961
962 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
963 sizeof(data_hdr));
964 if (ret < 0) {
965 goto error;
966 }
967
968 ++stream->next_net_seq_num;
969
970 /* Set to go on data socket */
971 outfd = relayd->data_sock.sock.fd;
972 }
973
974 error:
975 return outfd;
976 }
977
978 /*
979 * Trigger a dump of the metadata content. Following/during the succesful
980 * completion of this call, the metadata poll thread will start receiving
981 * metadata packets to consume.
982 *
983 * The caller must hold the channel and stream locks.
984 */
985 static
986 int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
987 {
988 int ret;
989
990 ASSERT_LOCKED(stream->chan->lock);
991 ASSERT_LOCKED(stream->lock);
992 assert(stream->metadata_flag);
993 assert(stream->chan->trace_chunk);
994
995 switch (consumer_data.type) {
996 case LTTNG_CONSUMER_KERNEL:
997 /*
998 * Reset the position of what has been read from the
999 * metadata cache to 0 so we can dump it again.
1000 */
1001 ret = kernctl_metadata_cache_dump(stream->wait_fd);
1002 break;
1003 case LTTNG_CONSUMER32_UST:
1004 case LTTNG_CONSUMER64_UST:
1005 /*
1006 * Reset the position pushed from the metadata cache so it
1007 * will write from the beginning on the next push.
1008 */
1009 stream->ust_metadata_pushed = 0;
1010 ret = consumer_metadata_wakeup_pipe(stream->chan);
1011 break;
1012 default:
1013 ERR("Unknown consumer_data type");
1014 abort();
1015 }
1016 if (ret < 0) {
1017 ERR("Failed to dump the metadata cache");
1018 }
1019 return ret;
1020 }
1021
1022 static
1023 int lttng_consumer_channel_set_trace_chunk(
1024 struct lttng_consumer_channel *channel,
1025 struct lttng_trace_chunk *new_trace_chunk)
1026 {
1027 pthread_mutex_lock(&channel->lock);
1028 if (channel->is_deleted) {
1029 /*
1030 * The channel has been logically deleted and should no longer
1031 * be used. It has released its reference to its current trace
1032 * chunk and should not acquire a new one.
1033 *
1034 * Return success as there is nothing for the caller to do.
1035 */
1036 goto end;
1037 }
1038
1039 /*
1040 * The acquisition of the reference cannot fail (barring
1041 * a severe internal error) since a reference to the published
1042 * chunk is already held by the caller.
1043 */
1044 if (new_trace_chunk) {
1045 const bool acquired_reference = lttng_trace_chunk_get(
1046 new_trace_chunk);
1047
1048 assert(acquired_reference);
1049 }
1050
1051 lttng_trace_chunk_put(channel->trace_chunk);
1052 channel->trace_chunk = new_trace_chunk;
1053 end:
1054 pthread_mutex_unlock(&channel->lock);
1055 return 0;
1056 }
1057
1058 /*
1059 * Allocate and return a new lttng_consumer_channel object using the given key
1060 * to initialize the hash table node.
1061 *
1062 * On error, return NULL.
1063 */
1064 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
1065 uint64_t session_id,
1066 const uint64_t *chunk_id,
1067 const char *pathname,
1068 const char *name,
1069 uint64_t relayd_id,
1070 enum lttng_event_output output,
1071 uint64_t tracefile_size,
1072 uint64_t tracefile_count,
1073 uint64_t session_id_per_pid,
1074 unsigned int monitor,
1075 unsigned int live_timer_interval,
1076 const char *root_shm_path,
1077 const char *shm_path)
1078 {
1079 struct lttng_consumer_channel *channel = NULL;
1080 struct lttng_trace_chunk *trace_chunk = NULL;
1081
1082 if (chunk_id) {
1083 trace_chunk = lttng_trace_chunk_registry_find_chunk(
1084 consumer_data.chunk_registry, session_id,
1085 *chunk_id);
1086 if (!trace_chunk) {
1087 ERR("Failed to find trace chunk reference during creation of channel");
1088 goto end;
1089 }
1090 }
1091
1092 channel = zmalloc(sizeof(*channel));
1093 if (channel == NULL) {
1094 PERROR("malloc struct lttng_consumer_channel");
1095 goto end;
1096 }
1097
1098 channel->key = key;
1099 channel->refcount = 0;
1100 channel->session_id = session_id;
1101 channel->session_id_per_pid = session_id_per_pid;
1102 channel->relayd_id = relayd_id;
1103 channel->tracefile_size = tracefile_size;
1104 channel->tracefile_count = tracefile_count;
1105 channel->monitor = monitor;
1106 channel->live_timer_interval = live_timer_interval;
1107 pthread_mutex_init(&channel->lock, NULL);
1108 pthread_mutex_init(&channel->timer_lock, NULL);
1109
1110 switch (output) {
1111 case LTTNG_EVENT_SPLICE:
1112 channel->output = CONSUMER_CHANNEL_SPLICE;
1113 break;
1114 case LTTNG_EVENT_MMAP:
1115 channel->output = CONSUMER_CHANNEL_MMAP;
1116 break;
1117 default:
1118 assert(0);
1119 free(channel);
1120 channel = NULL;
1121 goto end;
1122 }
1123
1124 /*
1125 * In monitor mode, the streams associated with the channel will be put in
1126 * a special list ONLY owned by this channel. So, the refcount is set to 1
1127 * here meaning that the channel itself has streams that are referenced.
1128 *
1129 * On a channel deletion, once the channel is no longer visible, the
1130 * refcount is decremented and checked for a zero value to delete it. With
1131 * streams in no monitor mode, it will now be safe to destroy the channel.
1132 */
1133 if (!channel->monitor) {
1134 channel->refcount = 1;
1135 }
1136
1137 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1138 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1139
1140 strncpy(channel->name, name, sizeof(channel->name));
1141 channel->name[sizeof(channel->name) - 1] = '\0';
1142
1143 if (root_shm_path) {
1144 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1145 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1146 }
1147 if (shm_path) {
1148 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1149 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1150 }
1151
1152 lttng_ht_node_init_u64(&channel->node, channel->key);
1153 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
1154 channel->session_id);
1155
1156 channel->wait_fd = -1;
1157 CDS_INIT_LIST_HEAD(&channel->streams.head);
1158
1159 if (trace_chunk) {
1160 int ret = lttng_consumer_channel_set_trace_chunk(channel,
1161 trace_chunk);
1162 if (ret) {
1163 goto error;
1164 }
1165 }
1166
1167 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
1168
1169 end:
1170 lttng_trace_chunk_put(trace_chunk);
1171 return channel;
1172 error:
1173 consumer_del_channel(channel);
1174 channel = NULL;
1175 goto end;
1176 }
1177
1178 /*
1179 * Add a channel to the global list protected by a mutex.
1180 *
1181 * Always return 0 indicating success.
1182 */
1183 int consumer_add_channel(struct lttng_consumer_channel *channel,
1184 struct lttng_consumer_local_data *ctx)
1185 {
1186 pthread_mutex_lock(&consumer_data.lock);
1187 pthread_mutex_lock(&channel->lock);
1188 pthread_mutex_lock(&channel->timer_lock);
1189
1190 /*
1191 * This gives us a guarantee that the channel we are about to add to the
1192 * channel hash table will be unique. See this function comment on the why
1193 * we need to steel the channel key at this stage.
1194 */
1195 steal_channel_key(channel->key);
1196
1197 rcu_read_lock();
1198 lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
1199 lttng_ht_add_u64(consumer_data.channels_by_session_id_ht,
1200 &channel->channels_by_session_id_ht_node);
1201 rcu_read_unlock();
1202 channel->is_published = true;
1203
1204 pthread_mutex_unlock(&channel->timer_lock);
1205 pthread_mutex_unlock(&channel->lock);
1206 pthread_mutex_unlock(&consumer_data.lock);
1207
1208 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
1209 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
1210 }
1211
1212 return 0;
1213 }
1214
1215 /*
1216 * Allocate the pollfd structure and the local view of the out fds to avoid
1217 * doing a lookup in the linked list and concurrency issues when writing is
1218 * needed. Called with consumer_data.lock held.
1219 *
1220 * Returns the number of fds in the structures.
1221 */
1222 static int update_poll_array(struct lttng_consumer_local_data *ctx,
1223 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
1224 struct lttng_ht *ht, int *nb_inactive_fd)
1225 {
1226 int i = 0;
1227 struct lttng_ht_iter iter;
1228 struct lttng_consumer_stream *stream;
1229
1230 assert(ctx);
1231 assert(ht);
1232 assert(pollfd);
1233 assert(local_stream);
1234
1235 DBG("Updating poll fd array");
1236 *nb_inactive_fd = 0;
1237 rcu_read_lock();
1238 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1239 /*
1240 * Only active streams with an active end point can be added to the
1241 * poll set and local stream storage of the thread.
1242 *
1243 * There is a potential race here for endpoint_status to be updated
1244 * just after the check. However, this is OK since the stream(s) will
1245 * be deleted once the thread is notified that the end point state has
1246 * changed where this function will be called back again.
1247 *
1248 * We track the number of inactive FDs because they still need to be
1249 * closed by the polling thread after a wakeup on the data_pipe or
1250 * metadata_pipe.
1251 */
1252 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1253 (*nb_inactive_fd)++;
1254 continue;
1255 }
1256 /*
1257 * This clobbers way too much the debug output. Uncomment that if you
1258 * need it for debugging purposes.
1259 */
1260 (*pollfd)[i].fd = stream->wait_fd;
1261 (*pollfd)[i].events = POLLIN | POLLPRI;
1262 local_stream[i] = stream;
1263 i++;
1264 }
1265 rcu_read_unlock();
1266
1267 /*
1268 * Insert the consumer_data_pipe at the end of the array and don't
1269 * increment i so nb_fd is the number of real FD.
1270 */
1271 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
1272 (*pollfd)[i].events = POLLIN | POLLPRI;
1273
1274 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1275 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
1276 return i;
1277 }
1278
1279 /*
1280 * Poll on the should_quit pipe and the command socket return -1 on
1281 * error, 1 if should exit, 0 if data is available on the command socket
1282 */
1283 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1284 {
1285 int num_rdy;
1286
1287 restart:
1288 num_rdy = poll(consumer_sockpoll, 2, -1);
1289 if (num_rdy == -1) {
1290 /*
1291 * Restart interrupted system call.
1292 */
1293 if (errno == EINTR) {
1294 goto restart;
1295 }
1296 PERROR("Poll error");
1297 return -1;
1298 }
1299 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
1300 DBG("consumer_should_quit wake up");
1301 return 1;
1302 }
1303 return 0;
1304 }
1305
1306 /*
1307 * Set the error socket.
1308 */
1309 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
1310 int sock)
1311 {
1312 ctx->consumer_error_socket = sock;
1313 }
1314
1315 /*
1316 * Set the command socket path.
1317 */
1318 void lttng_consumer_set_command_sock_path(
1319 struct lttng_consumer_local_data *ctx, char *sock)
1320 {
1321 ctx->consumer_command_sock_path = sock;
1322 }
1323
1324 /*
1325 * Send return code to the session daemon.
1326 * If the socket is not defined, we return 0, it is not a fatal error
1327 */
1328 int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
1329 {
1330 if (ctx->consumer_error_socket > 0) {
1331 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
1332 sizeof(enum lttcomm_sessiond_command));
1333 }
1334
1335 return 0;
1336 }
1337
1338 /*
1339 * Close all the tracefiles and stream fds and MUST be called when all
1340 * instances are destroyed i.e. when all threads were joined and are ended.
1341 */
1342 void lttng_consumer_cleanup(void)
1343 {
1344 struct lttng_ht_iter iter;
1345 struct lttng_consumer_channel *channel;
1346 unsigned int trace_chunks_left;
1347
1348 rcu_read_lock();
1349
1350 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
1351 node.node) {
1352 consumer_del_channel(channel);
1353 }
1354
1355 rcu_read_unlock();
1356
1357 lttng_ht_destroy(consumer_data.channel_ht);
1358 lttng_ht_destroy(consumer_data.channels_by_session_id_ht);
1359
1360 cleanup_relayd_ht();
1361
1362 lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
1363
1364 /*
1365 * This HT contains streams that are freed by either the metadata thread or
1366 * the data thread so we do *nothing* on the hash table and simply destroy
1367 * it.
1368 */
1369 lttng_ht_destroy(consumer_data.stream_list_ht);
1370
1371 /*
1372 * Trace chunks in the registry may still exist if the session
1373 * daemon has encountered an internal error and could not
1374 * tear down its sessions and/or trace chunks properly.
1375 *
1376 * Release the session daemon's implicit reference to any remaining
1377 * trace chunk and print an error if any trace chunk was found. Note
1378 * that there are _no_ legitimate cases for trace chunks to be left,
1379 * it is a leak. However, it can happen following a crash of the
1380 * session daemon and not emptying the registry would cause an assertion
1381 * to hit.
1382 */
1383 trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk(
1384 consumer_data.chunk_registry);
1385 if (trace_chunks_left) {
1386 ERR("%u trace chunks are leaked by lttng-consumerd. "
1387 "This can be caused by an internal error of the session daemon.",
1388 trace_chunks_left);
1389 }
1390 /* Run all callbacks freeing each chunk. */
1391 rcu_barrier();
1392 lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry);
1393 }
1394
1395 /*
1396 * Called from signal handler.
1397 */
1398 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1399 {
1400 ssize_t ret;
1401
1402 CMM_STORE_SHARED(consumer_quit, 1);
1403 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1404 if (ret < 1) {
1405 PERROR("write consumer quit");
1406 }
1407
1408 DBG("Consumer flag that it should quit");
1409 }
1410
1411
1412 /*
1413 * Flush pending writes to trace output disk file.
1414 */
1415 static
1416 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1417 off_t orig_offset)
1418 {
1419 int ret;
1420 int outfd = stream->out_fd;
1421
1422 /*
1423 * This does a blocking write-and-wait on any page that belongs to the
1424 * subbuffer prior to the one we just wrote.
1425 * Don't care about error values, as these are just hints and ways to
1426 * limit the amount of page cache used.
1427 */
1428 if (orig_offset < stream->max_sb_size) {
1429 return;
1430 }
1431 lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
1432 stream->max_sb_size,
1433 SYNC_FILE_RANGE_WAIT_BEFORE
1434 | SYNC_FILE_RANGE_WRITE
1435 | SYNC_FILE_RANGE_WAIT_AFTER);
1436 /*
1437 * Give hints to the kernel about how we access the file:
1438 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1439 * we write it.
1440 *
1441 * We need to call fadvise again after the file grows because the
1442 * kernel does not seem to apply fadvise to non-existing parts of the
1443 * file.
1444 *
1445 * Call fadvise _after_ having waited for the page writeback to
1446 * complete because the dirty page writeback semantic is not well
1447 * defined. So it can be expected to lead to lower throughput in
1448 * streaming.
1449 */
1450 ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
1451 stream->max_sb_size, POSIX_FADV_DONTNEED);
1452 if (ret && ret != -ENOSYS) {
1453 errno = ret;
1454 PERROR("posix_fadvise on fd %i", outfd);
1455 }
1456 }
1457
1458 /*
1459 * Initialise the necessary environnement :
1460 * - create a new context
1461 * - create the poll_pipe
1462 * - create the should_quit pipe (for signal handler)
1463 * - create the thread pipe (for splice)
1464 *
1465 * Takes a function pointer as argument, this function is called when data is
1466 * available on a buffer. This function is responsible to do the
1467 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1468 * buffer configuration and then kernctl_put_next_subbuf at the end.
1469 *
1470 * Returns a pointer to the new context or NULL on error.
1471 */
1472 struct lttng_consumer_local_data *lttng_consumer_create(
1473 enum lttng_consumer_type type,
1474 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1475 struct lttng_consumer_local_data *ctx),
1476 int (*recv_channel)(struct lttng_consumer_channel *channel),
1477 int (*recv_stream)(struct lttng_consumer_stream *stream),
1478 int (*update_stream)(uint64_t stream_key, uint32_t state))
1479 {
1480 int ret;
1481 struct lttng_consumer_local_data *ctx;
1482
1483 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1484 consumer_data.type == type);
1485 consumer_data.type = type;
1486
1487 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
1488 if (ctx == NULL) {
1489 PERROR("allocating context");
1490 goto error;
1491 }
1492
1493 ctx->consumer_error_socket = -1;
1494 ctx->consumer_metadata_socket = -1;
1495 pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
1496 /* assign the callbacks */
1497 ctx->on_buffer_ready = buffer_ready;
1498 ctx->on_recv_channel = recv_channel;
1499 ctx->on_recv_stream = recv_stream;
1500 ctx->on_update_stream = update_stream;
1501
1502 ctx->consumer_data_pipe = lttng_pipe_open(0);
1503 if (!ctx->consumer_data_pipe) {
1504 goto error_poll_pipe;
1505 }
1506
1507 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1508 if (!ctx->consumer_wakeup_pipe) {
1509 goto error_wakeup_pipe;
1510 }
1511
1512 ret = pipe(ctx->consumer_should_quit);
1513 if (ret < 0) {
1514 PERROR("Error creating recv pipe");
1515 goto error_quit_pipe;
1516 }
1517
1518 ret = pipe(ctx->consumer_channel_pipe);
1519 if (ret < 0) {
1520 PERROR("Error creating channel pipe");
1521 goto error_channel_pipe;
1522 }
1523
1524 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1525 if (!ctx->consumer_metadata_pipe) {
1526 goto error_metadata_pipe;
1527 }
1528
1529 ctx->channel_monitor_pipe = -1;
1530
1531 return ctx;
1532
1533 error_metadata_pipe:
1534 utils_close_pipe(ctx->consumer_channel_pipe);
1535 error_channel_pipe:
1536 utils_close_pipe(ctx->consumer_should_quit);
1537 error_quit_pipe:
1538 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1539 error_wakeup_pipe:
1540 lttng_pipe_destroy(ctx->consumer_data_pipe);
1541 error_poll_pipe:
1542 free(ctx);
1543 error:
1544 return NULL;
1545 }
1546
1547 /*
1548 * Iterate over all streams of the hashtable and free them properly.
1549 */
1550 static void destroy_data_stream_ht(struct lttng_ht *ht)
1551 {
1552 struct lttng_ht_iter iter;
1553 struct lttng_consumer_stream *stream;
1554
1555 if (ht == NULL) {
1556 return;
1557 }
1558
1559 rcu_read_lock();
1560 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1561 /*
1562 * Ignore return value since we are currently cleaning up so any error
1563 * can't be handled.
1564 */
1565 (void) consumer_del_stream(stream, ht);
1566 }
1567 rcu_read_unlock();
1568
1569 lttng_ht_destroy(ht);
1570 }
1571
1572 /*
1573 * Iterate over all streams of the metadata hashtable and free them
1574 * properly.
1575 */
1576 static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1577 {
1578 struct lttng_ht_iter iter;
1579 struct lttng_consumer_stream *stream;
1580
1581 if (ht == NULL) {
1582 return;
1583 }
1584
1585 rcu_read_lock();
1586 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1587 /*
1588 * Ignore return value since we are currently cleaning up so any error
1589 * can't be handled.
1590 */
1591 (void) consumer_del_metadata_stream(stream, ht);
1592 }
1593 rcu_read_unlock();
1594
1595 lttng_ht_destroy(ht);
1596 }
1597
1598 /*
1599 * Close all fds associated with the instance and free the context.
1600 */
1601 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1602 {
1603 int ret;
1604
1605 DBG("Consumer destroying it. Closing everything.");
1606
1607 if (!ctx) {
1608 return;
1609 }
1610
1611 destroy_data_stream_ht(data_ht);
1612 destroy_metadata_stream_ht(metadata_ht);
1613
1614 ret = close(ctx->consumer_error_socket);
1615 if (ret) {
1616 PERROR("close");
1617 }
1618 ret = close(ctx->consumer_metadata_socket);
1619 if (ret) {
1620 PERROR("close");
1621 }
1622 utils_close_pipe(ctx->consumer_channel_pipe);
1623 lttng_pipe_destroy(ctx->consumer_data_pipe);
1624 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1625 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1626 utils_close_pipe(ctx->consumer_should_quit);
1627
1628 unlink(ctx->consumer_command_sock_path);
1629 free(ctx);
1630 }
1631
1632 /*
1633 * Write the metadata stream id on the specified file descriptor.
1634 */
1635 static int write_relayd_metadata_id(int fd,
1636 struct lttng_consumer_stream *stream,
1637 unsigned long padding)
1638 {
1639 ssize_t ret;
1640 struct lttcomm_relayd_metadata_payload hdr;
1641
1642 hdr.stream_id = htobe64(stream->relayd_stream_id);
1643 hdr.padding_size = htobe32(padding);
1644 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1645 if (ret < sizeof(hdr)) {
1646 /*
1647 * This error means that the fd's end is closed so ignore the PERROR
1648 * not to clubber the error output since this can happen in a normal
1649 * code path.
1650 */
1651 if (errno != EPIPE) {
1652 PERROR("write metadata stream id");
1653 }
1654 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
1655 /*
1656 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1657 * handle writting the missing part so report that as an error and
1658 * don't lie to the caller.
1659 */
1660 ret = -1;
1661 goto end;
1662 }
1663 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1664 stream->relayd_stream_id, padding);
1665
1666 end:
1667 return (int) ret;
1668 }
1669
1670 /*
1671 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1672 * core function for writing trace buffers to either the local filesystem or
1673 * the network.
1674 *
1675 * It must be called with the stream and the channel lock held.
1676 *
1677 * Careful review MUST be put if any changes occur!
1678 *
1679 * Returns the number of bytes written
1680 */
1681 ssize_t lttng_consumer_on_read_subbuffer_mmap(
1682 struct lttng_consumer_local_data *ctx,
1683 struct lttng_consumer_stream *stream, unsigned long len,
1684 unsigned long padding,
1685 struct ctf_packet_index *index)
1686 {
1687 unsigned long mmap_offset;
1688 void *mmap_base;
1689 ssize_t ret = 0;
1690 off_t orig_offset = stream->out_fd_offset;
1691 /* Default is on the disk */
1692 int outfd = stream->out_fd;
1693 struct consumer_relayd_sock_pair *relayd = NULL;
1694 unsigned int relayd_hang_up = 0;
1695
1696 /* RCU lock for the relayd pointer */
1697 rcu_read_lock();
1698 assert(stream->net_seq_idx != (uint64_t) -1ULL ||
1699 stream->trace_chunk);
1700
1701 /* Flag that the current stream if set for network streaming. */
1702 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1703 relayd = consumer_find_relayd(stream->net_seq_idx);
1704 if (relayd == NULL) {
1705 ret = -EPIPE;
1706 goto end;
1707 }
1708 }
1709
1710 /* get the offset inside the fd to mmap */
1711 switch (consumer_data.type) {
1712 case LTTNG_CONSUMER_KERNEL:
1713 mmap_base = stream->mmap_base;
1714 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1715 if (ret < 0) {
1716 PERROR("tracer ctl get_mmap_read_offset");
1717 goto end;
1718 }
1719 break;
1720 case LTTNG_CONSUMER32_UST:
1721 case LTTNG_CONSUMER64_UST:
1722 mmap_base = lttng_ustctl_get_mmap_base(stream);
1723 if (!mmap_base) {
1724 ERR("read mmap get mmap base for stream %s", stream->name);
1725 ret = -EPERM;
1726 goto end;
1727 }
1728 ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
1729 if (ret != 0) {
1730 PERROR("tracer ctl get_mmap_read_offset");
1731 ret = -EINVAL;
1732 goto end;
1733 }
1734 break;
1735 default:
1736 ERR("Unknown consumer_data type");
1737 assert(0);
1738 }
1739
1740 /* Handle stream on the relayd if the output is on the network */
1741 if (relayd) {
1742 unsigned long netlen = len;
1743
1744 /*
1745 * Lock the control socket for the complete duration of the function
1746 * since from this point on we will use the socket.
1747 */
1748 if (stream->metadata_flag) {
1749 /* Metadata requires the control socket. */
1750 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1751 if (stream->reset_metadata_flag) {
1752 ret = relayd_reset_metadata(&relayd->control_sock,
1753 stream->relayd_stream_id,
1754 stream->metadata_version);
1755 if (ret < 0) {
1756 relayd_hang_up = 1;
1757 goto write_error;
1758 }
1759 stream->reset_metadata_flag = 0;
1760 }
1761 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1762 }
1763
1764 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1765 if (ret < 0) {
1766 relayd_hang_up = 1;
1767 goto write_error;
1768 }
1769 /* Use the returned socket. */
1770 outfd = ret;
1771
1772 /* Write metadata stream id before payload */
1773 if (stream->metadata_flag) {
1774 ret = write_relayd_metadata_id(outfd, stream, padding);
1775 if (ret < 0) {
1776 relayd_hang_up = 1;
1777 goto write_error;
1778 }
1779 }
1780 } else {
1781 /* No streaming, we have to set the len with the full padding */
1782 len += padding;
1783
1784 if (stream->metadata_flag && stream->reset_metadata_flag) {
1785 ret = utils_truncate_stream_file(stream->out_fd, 0);
1786 if (ret < 0) {
1787 ERR("Reset metadata file");
1788 goto end;
1789 }
1790 stream->reset_metadata_flag = 0;
1791 }
1792
1793 /*
1794 * Check if we need to change the tracefile before writing the packet.
1795 */
1796 if (stream->chan->tracefile_size > 0 &&
1797 (stream->tracefile_size_current + len) >
1798 stream->chan->tracefile_size) {
1799 ret = consumer_stream_rotate_output_files(stream);
1800 if (ret) {
1801 goto end;
1802 }
1803 outfd = stream->out_fd;
1804 orig_offset = 0;
1805 }
1806 stream->tracefile_size_current += len;
1807 if (index) {
1808 index->offset = htobe64(stream->out_fd_offset);
1809 }
1810 }
1811
1812 /*
1813 * This call guarantee that len or less is returned. It's impossible to
1814 * receive a ret value that is bigger than len.
1815 */
1816 ret = lttng_write(outfd, mmap_base + mmap_offset, len);
1817 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
1818 if (ret < 0 || ((size_t) ret != len)) {
1819 /*
1820 * Report error to caller if nothing was written else at least send the
1821 * amount written.
1822 */
1823 if (ret < 0) {
1824 ret = -errno;
1825 }
1826 relayd_hang_up = 1;
1827
1828 /* Socket operation failed. We consider the relayd dead */
1829 if (errno == EPIPE) {
1830 /*
1831 * This is possible if the fd is closed on the other side
1832 * (outfd) or any write problem. It can be verbose a bit for a
1833 * normal execution if for instance the relayd is stopped
1834 * abruptly. This can happen so set this to a DBG statement.
1835 */
1836 DBG("Consumer mmap write detected relayd hang up");
1837 } else {
1838 /* Unhandled error, print it and stop function right now. */
1839 PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
1840 }
1841 goto write_error;
1842 }
1843 stream->output_written += ret;
1844
1845 /* This call is useless on a socket so better save a syscall. */
1846 if (!relayd) {
1847 /* This won't block, but will start writeout asynchronously */
1848 lttng_sync_file_range(outfd, stream->out_fd_offset, len,
1849 SYNC_FILE_RANGE_WRITE);
1850 stream->out_fd_offset += len;
1851 lttng_consumer_sync_trace_file(stream, orig_offset);
1852 }
1853
1854 write_error:
1855 /*
1856 * This is a special case that the relayd has closed its socket. Let's
1857 * cleanup the relayd object and all associated streams.
1858 */
1859 if (relayd && relayd_hang_up) {
1860 ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
1861 lttng_consumer_cleanup_relayd(relayd);
1862 }
1863
1864 end:
1865 /* Unlock only if ctrl socket used */
1866 if (relayd && stream->metadata_flag) {
1867 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1868 }
1869
1870 rcu_read_unlock();
1871 return ret;
1872 }
1873
1874 /*
1875 * Splice the data from the ring buffer to the tracefile.
1876 *
1877 * It must be called with the stream lock held.
1878 *
1879 * Returns the number of bytes spliced.
1880 */
1881 ssize_t lttng_consumer_on_read_subbuffer_splice(
1882 struct lttng_consumer_local_data *ctx,
1883 struct lttng_consumer_stream *stream, unsigned long len,
1884 unsigned long padding,
1885 struct ctf_packet_index *index)
1886 {
1887 ssize_t ret = 0, written = 0, ret_splice = 0;
1888 loff_t offset = 0;
1889 off_t orig_offset = stream->out_fd_offset;
1890 int fd = stream->wait_fd;
1891 /* Default is on the disk */
1892 int outfd = stream->out_fd;
1893 struct consumer_relayd_sock_pair *relayd = NULL;
1894 int *splice_pipe;
1895 unsigned int relayd_hang_up = 0;
1896
1897 switch (consumer_data.type) {
1898 case LTTNG_CONSUMER_KERNEL:
1899 break;
1900 case LTTNG_CONSUMER32_UST:
1901 case LTTNG_CONSUMER64_UST:
1902 /* Not supported for user space tracing */
1903 return -ENOSYS;
1904 default:
1905 ERR("Unknown consumer_data type");
1906 assert(0);
1907 }
1908
1909 /* RCU lock for the relayd pointer */
1910 rcu_read_lock();
1911
1912 /* Flag that the current stream if set for network streaming. */
1913 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1914 relayd = consumer_find_relayd(stream->net_seq_idx);
1915 if (relayd == NULL) {
1916 written = -ret;
1917 goto end;
1918 }
1919 }
1920 splice_pipe = stream->splice_pipe;
1921
1922 /* Write metadata stream id before payload */
1923 if (relayd) {
1924 unsigned long total_len = len;
1925
1926 if (stream->metadata_flag) {
1927 /*
1928 * Lock the control socket for the complete duration of the function
1929 * since from this point on we will use the socket.
1930 */
1931 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1932
1933 if (stream->reset_metadata_flag) {
1934 ret = relayd_reset_metadata(&relayd->control_sock,
1935 stream->relayd_stream_id,
1936 stream->metadata_version);
1937 if (ret < 0) {
1938 relayd_hang_up = 1;
1939 goto write_error;
1940 }
1941 stream->reset_metadata_flag = 0;
1942 }
1943 ret = write_relayd_metadata_id(splice_pipe[1], stream,
1944 padding);
1945 if (ret < 0) {
1946 written = ret;
1947 relayd_hang_up = 1;
1948 goto write_error;
1949 }
1950
1951 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1952 }
1953
1954 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1955 if (ret < 0) {
1956 written = ret;
1957 relayd_hang_up = 1;
1958 goto write_error;
1959 }
1960 /* Use the returned socket. */
1961 outfd = ret;
1962 } else {
1963 /* No streaming, we have to set the len with the full padding */
1964 len += padding;
1965
1966 if (stream->metadata_flag && stream->reset_metadata_flag) {
1967 ret = utils_truncate_stream_file(stream->out_fd, 0);
1968 if (ret < 0) {
1969 ERR("Reset metadata file");
1970 goto end;
1971 }
1972 stream->reset_metadata_flag = 0;
1973 }
1974 /*
1975 * Check if we need to change the tracefile before writing the packet.
1976 */
1977 if (stream->chan->tracefile_size > 0 &&
1978 (stream->tracefile_size_current + len) >
1979 stream->chan->tracefile_size) {
1980 ret = consumer_stream_rotate_output_files(stream);
1981 if (ret < 0) {
1982 written = ret;
1983 goto end;
1984 }
1985 outfd = stream->out_fd;
1986 orig_offset = 0;
1987 }
1988 stream->tracefile_size_current += len;
1989 index->offset = htobe64(stream->out_fd_offset);
1990 }
1991
1992 while (len > 0) {
1993 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1994 (unsigned long)offset, len, fd, splice_pipe[1]);
1995 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
1996 SPLICE_F_MOVE | SPLICE_F_MORE);
1997 DBG("splice chan to pipe, ret %zd", ret_splice);
1998 if (ret_splice < 0) {
1999 ret = errno;
2000 written = -ret;
2001 PERROR("Error in relay splice");
2002 goto splice_error;
2003 }
2004
2005 /* Handle stream on the relayd if the output is on the network */
2006 if (relayd && stream->metadata_flag) {
2007 size_t metadata_payload_size =
2008 sizeof(struct lttcomm_relayd_metadata_payload);
2009
2010 /* Update counter to fit the spliced data */
2011 ret_splice += metadata_payload_size;
2012 len += metadata_payload_size;
2013 /*
2014 * We do this so the return value can match the len passed as
2015 * argument to this function.
2016 */
2017 written -= metadata_payload_size;
2018 }
2019
2020 /* Splice data out */
2021 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
2022 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
2023 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
2024 outfd, ret_splice);
2025 if (ret_splice < 0) {
2026 ret = errno;
2027 written = -ret;
2028 relayd_hang_up = 1;
2029 goto write_error;
2030 } else if (ret_splice > len) {
2031 /*
2032 * We don't expect this code path to be executed but you never know
2033 * so this is an extra protection agains a buggy splice().
2034 */
2035 ret = errno;
2036 written += ret_splice;
2037 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
2038 len);
2039 goto splice_error;
2040 } else {
2041 /* All good, update current len and continue. */
2042 len -= ret_splice;
2043 }
2044
2045 /* This call is useless on a socket so better save a syscall. */
2046 if (!relayd) {
2047 /* This won't block, but will start writeout asynchronously */
2048 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
2049 SYNC_FILE_RANGE_WRITE);
2050 stream->out_fd_offset += ret_splice;
2051 }
2052 stream->output_written += ret_splice;
2053 written += ret_splice;
2054 }
2055 if (!relayd) {
2056 lttng_consumer_sync_trace_file(stream, orig_offset);
2057 }
2058 goto end;
2059
2060 write_error:
2061 /*
2062 * This is a special case that the relayd has closed its socket. Let's
2063 * cleanup the relayd object and all associated streams.
2064 */
2065 if (relayd && relayd_hang_up) {
2066 ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
2067 lttng_consumer_cleanup_relayd(relayd);
2068 /* Skip splice error so the consumer does not fail */
2069 goto end;
2070 }
2071
2072 splice_error:
2073 /* send the appropriate error description to sessiond */
2074 switch (ret) {
2075 case EINVAL:
2076 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
2077 break;
2078 case ENOMEM:
2079 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
2080 break;
2081 case ESPIPE:
2082 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
2083 break;
2084 }
2085
2086 end:
2087 if (relayd && stream->metadata_flag) {
2088 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2089 }
2090
2091 rcu_read_unlock();
2092 return written;
2093 }
2094
2095 /*
2096 * Sample the snapshot positions for a specific fd
2097 *
2098 * Returns 0 on success, < 0 on error
2099 */
2100 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
2101 {
2102 switch (consumer_data.type) {
2103 case LTTNG_CONSUMER_KERNEL:
2104 return lttng_kconsumer_sample_snapshot_positions(stream);
2105 case LTTNG_CONSUMER32_UST:
2106 case LTTNG_CONSUMER64_UST:
2107 return lttng_ustconsumer_sample_snapshot_positions(stream);
2108 default:
2109 ERR("Unknown consumer_data type");
2110 assert(0);
2111 return -ENOSYS;
2112 }
2113 }
2114 /*
2115 * Take a snapshot for a specific fd
2116 *
2117 * Returns 0 on success, < 0 on error
2118 */
2119 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
2120 {
2121 switch (consumer_data.type) {
2122 case LTTNG_CONSUMER_KERNEL:
2123 return lttng_kconsumer_take_snapshot(stream);
2124 case LTTNG_CONSUMER32_UST:
2125 case LTTNG_CONSUMER64_UST:
2126 return lttng_ustconsumer_take_snapshot(stream);
2127 default:
2128 ERR("Unknown consumer_data type");
2129 assert(0);
2130 return -ENOSYS;
2131 }
2132 }
2133
2134 /*
2135 * Get the produced position
2136 *
2137 * Returns 0 on success, < 0 on error
2138 */
2139 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
2140 unsigned long *pos)
2141 {
2142 switch (consumer_data.type) {
2143 case LTTNG_CONSUMER_KERNEL:
2144 return lttng_kconsumer_get_produced_snapshot(stream, pos);
2145 case LTTNG_CONSUMER32_UST:
2146 case LTTNG_CONSUMER64_UST:
2147 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
2148 default:
2149 ERR("Unknown consumer_data type");
2150 assert(0);
2151 return -ENOSYS;
2152 }
2153 }
2154
2155 /*
2156 * Get the consumed position (free-running counter position in bytes).
2157 *
2158 * Returns 0 on success, < 0 on error
2159 */
2160 int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
2161 unsigned long *pos)
2162 {
2163 switch (consumer_data.type) {
2164 case LTTNG_CONSUMER_KERNEL:
2165 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2166 case LTTNG_CONSUMER32_UST:
2167 case LTTNG_CONSUMER64_UST:
2168 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2169 default:
2170 ERR("Unknown consumer_data type");
2171 assert(0);
2172 return -ENOSYS;
2173 }
2174 }
2175
2176 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
2177 int sock, struct pollfd *consumer_sockpoll)
2178 {
2179 switch (consumer_data.type) {
2180 case LTTNG_CONSUMER_KERNEL:
2181 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2182 case LTTNG_CONSUMER32_UST:
2183 case LTTNG_CONSUMER64_UST:
2184 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2185 default:
2186 ERR("Unknown consumer_data type");
2187 assert(0);
2188 return -ENOSYS;
2189 }
2190 }
2191
2192 static
2193 void lttng_consumer_close_all_metadata(void)
2194 {
2195 switch (consumer_data.type) {
2196 case LTTNG_CONSUMER_KERNEL:
2197 /*
2198 * The Kernel consumer has a different metadata scheme so we don't
2199 * close anything because the stream will be closed by the session
2200 * daemon.
2201 */
2202 break;
2203 case LTTNG_CONSUMER32_UST:
2204 case LTTNG_CONSUMER64_UST:
2205 /*
2206 * Close all metadata streams. The metadata hash table is passed and
2207 * this call iterates over it by closing all wakeup fd. This is safe
2208 * because at this point we are sure that the metadata producer is
2209 * either dead or blocked.
2210 */
2211 lttng_ustconsumer_close_all_metadata(metadata_ht);
2212 break;
2213 default:
2214 ERR("Unknown consumer_data type");
2215 assert(0);
2216 }
2217 }
2218
2219 /*
2220 * Clean up a metadata stream and free its memory.
2221 */
2222 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
2223 struct lttng_ht *ht)
2224 {
2225 struct lttng_consumer_channel *channel = NULL;
2226 bool free_channel = false;
2227
2228 assert(stream);
2229 /*
2230 * This call should NEVER receive regular stream. It must always be
2231 * metadata stream and this is crucial for data structure synchronization.
2232 */
2233 assert(stream->metadata_flag);
2234
2235 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2236
2237 pthread_mutex_lock(&consumer_data.lock);
2238 /*
2239 * Note that this assumes that a stream's channel is never changed and
2240 * that the stream's lock doesn't need to be taken to sample its
2241 * channel.
2242 */
2243 channel = stream->chan;
2244 pthread_mutex_lock(&channel->lock);
2245 pthread_mutex_lock(&stream->lock);
2246 if (channel->metadata_cache) {
2247 /* Only applicable to userspace consumers. */
2248 pthread_mutex_lock(&channel->metadata_cache->lock);
2249 }
2250
2251 /* Remove any reference to that stream. */
2252 consumer_stream_delete(stream, ht);
2253
2254 /* Close down everything including the relayd if one. */
2255 consumer_stream_close(stream);
2256 /* Destroy tracer buffers of the stream. */
2257 consumer_stream_destroy_buffers(stream);
2258
2259 /* Atomically decrement channel refcount since other threads can use it. */
2260 if (!uatomic_sub_return(&channel->refcount, 1)
2261 && !uatomic_read(&channel->nb_init_stream_left)) {
2262 /* Go for channel deletion! */
2263 free_channel = true;
2264 }
2265 stream->chan = NULL;
2266
2267 /*
2268 * Nullify the stream reference so it is not used after deletion. The
2269 * channel lock MUST be acquired before being able to check for a NULL
2270 * pointer value.
2271 */
2272 channel->metadata_stream = NULL;
2273
2274 if (channel->metadata_cache) {
2275 pthread_mutex_unlock(&channel->metadata_cache->lock);
2276 }
2277 pthread_mutex_unlock(&stream->lock);
2278 pthread_mutex_unlock(&channel->lock);
2279 pthread_mutex_unlock(&consumer_data.lock);
2280
2281 if (free_channel) {
2282 consumer_del_channel(channel);
2283 }
2284
2285 lttng_trace_chunk_put(stream->trace_chunk);
2286 stream->trace_chunk = NULL;
2287 consumer_stream_free(stream);
2288 }
2289
2290 /*
2291 * Action done with the metadata stream when adding it to the consumer internal
2292 * data structures to handle it.
2293 */
2294 void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
2295 {
2296 struct lttng_ht *ht = metadata_ht;
2297 struct lttng_ht_iter iter;
2298 struct lttng_ht_node_u64 *node;
2299
2300 assert(stream);
2301 assert(ht);
2302
2303 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
2304
2305 pthread_mutex_lock(&consumer_data.lock);
2306 pthread_mutex_lock(&stream->chan->lock);
2307 pthread_mutex_lock(&stream->chan->timer_lock);
2308 pthread_mutex_lock(&stream->lock);
2309
2310 /*
2311 * From here, refcounts are updated so be _careful_ when returning an error
2312 * after this point.
2313 */
2314
2315 rcu_read_lock();
2316
2317 /*
2318 * Lookup the stream just to make sure it does not exist in our internal
2319 * state. This should NEVER happen.
2320 */
2321 lttng_ht_lookup(ht, &stream->key, &iter);
2322 node = lttng_ht_iter_get_node_u64(&iter);
2323 assert(!node);
2324
2325 /*
2326 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2327 * in terms of destroying the associated channel, because the action that
2328 * causes the count to become 0 also causes a stream to be added. The
2329 * channel deletion will thus be triggered by the following removal of this
2330 * stream.
2331 */
2332 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
2333 /* Increment refcount before decrementing nb_init_stream_left */
2334 cmm_smp_wmb();
2335 uatomic_dec(&stream->chan->nb_init_stream_left);
2336 }
2337
2338 lttng_ht_add_unique_u64(ht, &stream->node);
2339
2340 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
2341 &stream->node_channel_id);
2342
2343 /*
2344 * Add stream to the stream_list_ht of the consumer data. No need to steal
2345 * the key since the HT does not use it and we allow to add redundant keys
2346 * into this table.
2347 */
2348 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
2349
2350 rcu_read_unlock();
2351
2352 pthread_mutex_unlock(&stream->lock);
2353 pthread_mutex_unlock(&stream->chan->lock);
2354 pthread_mutex_unlock(&stream->chan->timer_lock);
2355 pthread_mutex_unlock(&consumer_data.lock);
2356 }
2357
2358 /*
2359 * Delete data stream that are flagged for deletion (endpoint_status).
2360 */
2361 static void validate_endpoint_status_data_stream(void)
2362 {
2363 struct lttng_ht_iter iter;
2364 struct lttng_consumer_stream *stream;
2365
2366 DBG("Consumer delete flagged data stream");
2367
2368 rcu_read_lock();
2369 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
2370 /* Validate delete flag of the stream */
2371 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2372 continue;
2373 }
2374 /* Delete it right now */
2375 consumer_del_stream(stream, data_ht);
2376 }
2377 rcu_read_unlock();
2378 }
2379
2380 /*
2381 * Delete metadata stream that are flagged for deletion (endpoint_status).
2382 */
2383 static void validate_endpoint_status_metadata_stream(
2384 struct lttng_poll_event *pollset)
2385 {
2386 struct lttng_ht_iter iter;
2387 struct lttng_consumer_stream *stream;
2388
2389 DBG("Consumer delete flagged metadata stream");
2390
2391 assert(pollset);
2392
2393 rcu_read_lock();
2394 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
2395 /* Validate delete flag of the stream */
2396 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2397 continue;
2398 }
2399 /*
2400 * Remove from pollset so the metadata thread can continue without
2401 * blocking on a deleted stream.
2402 */
2403 lttng_poll_del(pollset, stream->wait_fd);
2404
2405 /* Delete it right now */
2406 consumer_del_metadata_stream(stream, metadata_ht);
2407 }
2408 rcu_read_unlock();
2409 }
2410
2411 /*
2412 * Thread polls on metadata file descriptor and write them on disk or on the
2413 * network.
2414 */
2415 void *consumer_thread_metadata_poll(void *data)
2416 {
2417 int ret, i, pollfd, err = -1;
2418 uint32_t revents, nb_fd;
2419 struct lttng_consumer_stream *stream = NULL;
2420 struct lttng_ht_iter iter;
2421 struct lttng_ht_node_u64 *node;
2422 struct lttng_poll_event events;
2423 struct lttng_consumer_local_data *ctx = data;
2424 ssize_t len;
2425
2426 rcu_register_thread();
2427
2428 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2429
2430 if (testpoint(consumerd_thread_metadata)) {
2431 goto error_testpoint;
2432 }
2433
2434 health_code_update();
2435
2436 DBG("Thread metadata poll started");
2437
2438 /* Size is set to 1 for the consumer_metadata pipe */
2439 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2440 if (ret < 0) {
2441 ERR("Poll set creation failed");
2442 goto end_poll;
2443 }
2444
2445 ret = lttng_poll_add(&events,
2446 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
2447 if (ret < 0) {
2448 goto end;
2449 }
2450
2451 /* Main loop */
2452 DBG("Metadata main loop started");
2453
2454 while (1) {
2455 restart:
2456 health_code_update();
2457 health_poll_entry();
2458 DBG("Metadata poll wait");
2459 ret = lttng_poll_wait(&events, -1);
2460 DBG("Metadata poll return from wait with %d fd(s)",
2461 LTTNG_POLL_GETNB(&events));
2462 health_poll_exit();
2463 DBG("Metadata event caught in thread");
2464 if (ret < 0) {
2465 if (errno == EINTR) {
2466 ERR("Poll EINTR caught");
2467 goto restart;
2468 }
2469 if (LTTNG_POLL_GETNB(&events) == 0) {
2470 err = 0; /* All is OK */
2471 }
2472 goto end;
2473 }
2474
2475 nb_fd = ret;
2476
2477 /* From here, the event is a metadata wait fd */
2478 for (i = 0; i < nb_fd; i++) {
2479 health_code_update();
2480
2481 revents = LTTNG_POLL_GETEV(&events, i);
2482 pollfd = LTTNG_POLL_GETFD(&events, i);
2483
2484 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
2485 if (revents & LPOLLIN) {
2486 ssize_t pipe_len;
2487
2488 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2489 &stream, sizeof(stream));
2490 if (pipe_len < sizeof(stream)) {
2491 if (pipe_len < 0) {
2492 PERROR("read metadata stream");
2493 }
2494 /*
2495 * Remove the pipe from the poll set and continue the loop
2496 * since their might be data to consume.
2497 */
2498 lttng_poll_del(&events,
2499 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2500 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2501 continue;
2502 }
2503
2504 /* A NULL stream means that the state has changed. */
2505 if (stream == NULL) {
2506 /* Check for deleted streams. */
2507 validate_endpoint_status_metadata_stream(&events);
2508 goto restart;
2509 }
2510
2511 DBG("Adding metadata stream %d to poll set",
2512 stream->wait_fd);
2513
2514 /* Add metadata stream to the global poll events list */
2515 lttng_poll_add(&events, stream->wait_fd,
2516 LPOLLIN | LPOLLPRI | LPOLLHUP);
2517 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2518 DBG("Metadata thread pipe hung up");
2519 /*
2520 * Remove the pipe from the poll set and continue the loop
2521 * since their might be data to consume.
2522 */
2523 lttng_poll_del(&events,
2524 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2525 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2526 continue;
2527 } else {
2528 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2529 goto end;
2530 }
2531
2532 /* Handle other stream */
2533 continue;
2534 }
2535
2536 rcu_read_lock();
2537 {
2538 uint64_t tmp_id = (uint64_t) pollfd;
2539
2540 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2541 }
2542 node = lttng_ht_iter_get_node_u64(&iter);
2543 assert(node);
2544
2545 stream = caa_container_of(node, struct lttng_consumer_stream,
2546 node);
2547
2548 if (revents & (LPOLLIN | LPOLLPRI)) {
2549 /* Get the data out of the metadata file descriptor */
2550 DBG("Metadata available on fd %d", pollfd);
2551 assert(stream->wait_fd == pollfd);
2552
2553 do {
2554 health_code_update();
2555
2556 len = ctx->on_buffer_ready(stream, ctx);
2557 /*
2558 * We don't check the return value here since if we get
2559 * a negative len, it means an error occurred thus we
2560 * simply remove it from the poll set and free the
2561 * stream.
2562 */
2563 } while (len > 0);
2564
2565 /* It's ok to have an unavailable sub-buffer */
2566 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2567 /* Clean up stream from consumer and free it. */
2568 lttng_poll_del(&events, stream->wait_fd);
2569 consumer_del_metadata_stream(stream, metadata_ht);
2570 }
2571 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2572 DBG("Metadata fd %d is hup|err.", pollfd);
2573 if (!stream->hangup_flush_done
2574 && (consumer_data.type == LTTNG_CONSUMER32_UST
2575 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2576 DBG("Attempting to flush and consume the UST buffers");
2577 lttng_ustconsumer_on_stream_hangup(stream);
2578
2579 /* We just flushed the stream now read it. */
2580 do {
2581 health_code_update();
2582
2583 len = ctx->on_buffer_ready(stream, ctx);
2584 /*
2585 * We don't check the return value here since if we get
2586 * a negative len, it means an error occurred thus we
2587 * simply remove it from the poll set and free the
2588 * stream.
2589 */
2590 } while (len > 0);
2591 }
2592
2593 lttng_poll_del(&events, stream->wait_fd);
2594 /*
2595 * This call update the channel states, closes file descriptors
2596 * and securely free the stream.
2597 */
2598 consumer_del_metadata_stream(stream, metadata_ht);
2599 } else {
2600 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2601 rcu_read_unlock();
2602 goto end;
2603 }
2604 /* Release RCU lock for the stream looked up */
2605 rcu_read_unlock();
2606 }
2607 }
2608
2609 /* All is OK */
2610 err = 0;
2611 end:
2612 DBG("Metadata poll thread exiting");
2613
2614 lttng_poll_clean(&events);
2615 end_poll:
2616 error_testpoint:
2617 if (err) {
2618 health_error();
2619 ERR("Health error occurred in %s", __func__);
2620 }
2621 health_unregister(health_consumerd);
2622 rcu_unregister_thread();
2623 return NULL;
2624 }
2625
2626 /*
2627 * This thread polls the fds in the set to consume the data and write
2628 * it to tracefile if necessary.
2629 */
2630 void *consumer_thread_data_poll(void *data)
2631 {
2632 int num_rdy, num_hup, high_prio, ret, i, err = -1;
2633 struct pollfd *pollfd = NULL;
2634 /* local view of the streams */
2635 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
2636 /* local view of consumer_data.fds_count */
2637 int nb_fd = 0;
2638 /* 2 for the consumer_data_pipe and wake up pipe */
2639 const int nb_pipes_fd = 2;
2640 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2641 int nb_inactive_fd = 0;
2642 struct lttng_consumer_local_data *ctx = data;
2643 ssize_t len;
2644
2645 rcu_register_thread();
2646
2647 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2648
2649 if (testpoint(consumerd_thread_data)) {
2650 goto error_testpoint;
2651 }
2652
2653 health_code_update();
2654
2655 local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
2656 if (local_stream == NULL) {
2657 PERROR("local_stream malloc");
2658 goto end;
2659 }
2660
2661 while (1) {
2662 health_code_update();
2663
2664 high_prio = 0;
2665 num_hup = 0;
2666
2667 /*
2668 * the fds set has been updated, we need to update our
2669 * local array as well
2670 */
2671 pthread_mutex_lock(&consumer_data.lock);
2672 if (consumer_data.need_update) {
2673 free(pollfd);
2674 pollfd = NULL;
2675
2676 free(local_stream);
2677 local_stream = NULL;
2678
2679 /* Allocate for all fds */
2680 pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
2681 if (pollfd == NULL) {
2682 PERROR("pollfd malloc");
2683 pthread_mutex_unlock(&consumer_data.lock);
2684 goto end;
2685 }
2686
2687 local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
2688 sizeof(struct lttng_consumer_stream *));
2689 if (local_stream == NULL) {
2690 PERROR("local_stream malloc");
2691 pthread_mutex_unlock(&consumer_data.lock);
2692 goto end;
2693 }
2694 ret = update_poll_array(ctx, &pollfd, local_stream,
2695 data_ht, &nb_inactive_fd);
2696 if (ret < 0) {
2697 ERR("Error in allocating pollfd or local_outfds");
2698 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2699 pthread_mutex_unlock(&consumer_data.lock);
2700 goto end;
2701 }
2702 nb_fd = ret;
2703 consumer_data.need_update = 0;
2704 }
2705 pthread_mutex_unlock(&consumer_data.lock);
2706
2707 /* No FDs and consumer_quit, consumer_cleanup the thread */
2708 if (nb_fd == 0 && nb_inactive_fd == 0 &&
2709 CMM_LOAD_SHARED(consumer_quit) == 1) {
2710 err = 0; /* All is OK */
2711 goto end;
2712 }
2713 /* poll on the array of fds */
2714 restart:
2715 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
2716 if (testpoint(consumerd_thread_data_poll)) {
2717 goto end;
2718 }
2719 health_poll_entry();
2720 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
2721 health_poll_exit();
2722 DBG("poll num_rdy : %d", num_rdy);
2723 if (num_rdy == -1) {
2724 /*
2725 * Restart interrupted system call.
2726 */
2727 if (errno == EINTR) {
2728 goto restart;
2729 }
2730 PERROR("Poll error");
2731 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2732 goto end;
2733 } else if (num_rdy == 0) {
2734 DBG("Polling thread timed out");
2735 goto end;
2736 }
2737
2738 if (caa_unlikely(data_consumption_paused)) {
2739 DBG("Data consumption paused, sleeping...");
2740 sleep(1);
2741 goto restart;
2742 }
2743
2744 /*
2745 * If the consumer_data_pipe triggered poll go directly to the
2746 * beginning of the loop to update the array. We want to prioritize
2747 * array update over low-priority reads.
2748 */
2749 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2750 ssize_t pipe_readlen;
2751
2752 DBG("consumer_data_pipe wake up");
2753 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2754 &new_stream, sizeof(new_stream));
2755 if (pipe_readlen < sizeof(new_stream)) {
2756 PERROR("Consumer data pipe");
2757 /* Continue so we can at least handle the current stream(s). */
2758 continue;
2759 }
2760
2761 /*
2762 * If the stream is NULL, just ignore it. It's also possible that
2763 * the sessiond poll thread changed the consumer_quit state and is
2764 * waking us up to test it.
2765 */
2766 if (new_stream == NULL) {
2767 validate_endpoint_status_data_stream();
2768 continue;
2769 }
2770
2771 /* Continue to update the local streams and handle prio ones */
2772 continue;
2773 }
2774
2775 /* Handle wakeup pipe. */
2776 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2777 char dummy;
2778 ssize_t pipe_readlen;
2779
2780 pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
2781 sizeof(dummy));
2782 if (pipe_readlen < 0) {
2783 PERROR("Consumer data wakeup pipe");
2784 }
2785 /* We've been awakened to handle stream(s). */
2786 ctx->has_wakeup = 0;
2787 }
2788
2789 /* Take care of high priority channels first. */
2790 for (i = 0; i < nb_fd; i++) {
2791 health_code_update();
2792
2793 if (local_stream[i] == NULL) {
2794 continue;
2795 }
2796 if (pollfd[i].revents & POLLPRI) {
2797 DBG("Urgent read on fd %d", pollfd[i].fd);
2798 high_prio = 1;
2799 len = ctx->on_buffer_ready(local_stream[i], ctx);
2800 /* it's ok to have an unavailable sub-buffer */
2801 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2802 /* Clean the stream and free it. */
2803 consumer_del_stream(local_stream[i], data_ht);
2804 local_stream[i] = NULL;
2805 } else if (len > 0) {
2806 local_stream[i]->data_read = 1;
2807 }
2808 }
2809 }
2810
2811 /*
2812 * If we read high prio channel in this loop, try again
2813 * for more high prio data.
2814 */
2815 if (high_prio) {
2816 continue;
2817 }
2818
2819 /* Take care of low priority channels. */
2820 for (i = 0; i < nb_fd; i++) {
2821 health_code_update();
2822
2823 if (local_stream[i] == NULL) {
2824 continue;
2825 }
2826 if ((pollfd[i].revents & POLLIN) ||
2827 local_stream[i]->hangup_flush_done ||
2828 local_stream[i]->has_data) {
2829 DBG("Normal read on fd %d", pollfd[i].fd);
2830 len = ctx->on_buffer_ready(local_stream[i], ctx);
2831 /* it's ok to have an unavailable sub-buffer */
2832 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2833 /* Clean the stream and free it. */
2834 consumer_del_stream(local_stream[i], data_ht);
2835 local_stream[i] = NULL;
2836 } else if (len > 0) {
2837 local_stream[i]->data_read = 1;
2838 }
2839 }
2840 }
2841
2842 /* Handle hangup and errors */
2843 for (i = 0; i < nb_fd; i++) {
2844 health_code_update();
2845
2846 if (local_stream[i] == NULL) {
2847 continue;
2848 }
2849 if (!local_stream[i]->hangup_flush_done
2850 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2851 && (consumer_data.type == LTTNG_CONSUMER32_UST
2852 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2853 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2854 pollfd[i].fd);
2855 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2856 /* Attempt read again, for the data we just flushed. */
2857 local_stream[i]->data_read = 1;
2858 }
2859 /*
2860 * If the poll flag is HUP/ERR/NVAL and we have
2861 * read no data in this pass, we can remove the
2862 * stream from its hash table.
2863 */
2864 if ((pollfd[i].revents & POLLHUP)) {
2865 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2866 if (!local_stream[i]->data_read) {
2867 consumer_del_stream(local_stream[i], data_ht);
2868 local_stream[i] = NULL;
2869 num_hup++;
2870 }
2871 } else if (pollfd[i].revents & POLLERR) {
2872 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2873 if (!local_stream[i]->data_read) {
2874 consumer_del_stream(local_stream[i], data_ht);
2875 local_stream[i] = NULL;
2876 num_hup++;
2877 }
2878 } else if (pollfd[i].revents & POLLNVAL) {
2879 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2880 if (!local_stream[i]->data_read) {
2881 consumer_del_stream(local_stream[i], data_ht);
2882 local_stream[i] = NULL;
2883 num_hup++;
2884 }
2885 }
2886 if (local_stream[i] != NULL) {
2887 local_stream[i]->data_read = 0;
2888 }
2889 }
2890 }
2891 /* All is OK */
2892 err = 0;
2893 end:
2894 DBG("polling thread exiting");
2895 free(pollfd);
2896 free(local_stream);
2897
2898 /*
2899 * Close the write side of the pipe so epoll_wait() in
2900 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2901 * read side of the pipe. If we close them both, epoll_wait strangely does
2902 * not return and could create a endless wait period if the pipe is the
2903 * only tracked fd in the poll set. The thread will take care of closing
2904 * the read side.
2905 */
2906 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
2907
2908 error_testpoint:
2909 if (err) {
2910 health_error();
2911 ERR("Health error occurred in %s", __func__);
2912 }
2913 health_unregister(health_consumerd);
2914
2915 rcu_unregister_thread();
2916 return NULL;
2917 }
2918
2919 /*
2920 * Close wake-up end of each stream belonging to the channel. This will
2921 * allow the poll() on the stream read-side to detect when the
2922 * write-side (application) finally closes them.
2923 */
2924 static
2925 void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2926 {
2927 struct lttng_ht *ht;
2928 struct lttng_consumer_stream *stream;
2929 struct lttng_ht_iter iter;
2930
2931 ht = consumer_data.stream_per_chan_id_ht;
2932
2933 rcu_read_lock();
2934 cds_lfht_for_each_entry_duplicate(ht->ht,
2935 ht->hash_fct(&channel->key, lttng_ht_seed),
2936 ht->match_fct, &channel->key,
2937 &iter.iter, stream, node_channel_id.node) {
2938 /*
2939 * Protect against teardown with mutex.
2940 */
2941 pthread_mutex_lock(&stream->lock);
2942 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2943 goto next;
2944 }
2945 switch (consumer_data.type) {
2946 case LTTNG_CONSUMER_KERNEL:
2947 break;
2948 case LTTNG_CONSUMER32_UST:
2949 case LTTNG_CONSUMER64_UST:
2950 if (stream->metadata_flag) {
2951 /* Safe and protected by the stream lock. */
2952 lttng_ustconsumer_close_metadata(stream->chan);
2953 } else {
2954 /*
2955 * Note: a mutex is taken internally within
2956 * liblttng-ust-ctl to protect timer wakeup_fd
2957 * use from concurrent close.
2958 */
2959 lttng_ustconsumer_close_stream_wakeup(stream);
2960 }
2961 break;
2962 default:
2963 ERR("Unknown consumer_data type");
2964 assert(0);
2965 }
2966 next:
2967 pthread_mutex_unlock(&stream->lock);
2968 }
2969 rcu_read_unlock();
2970 }
2971
2972 static void destroy_channel_ht(struct lttng_ht *ht)
2973 {
2974 struct lttng_ht_iter iter;
2975 struct lttng_consumer_channel *channel;
2976 int ret;
2977
2978 if (ht == NULL) {
2979 return;
2980 }
2981
2982 rcu_read_lock();
2983 cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
2984 ret = lttng_ht_del(ht, &iter);
2985 assert(ret != 0);
2986 }
2987 rcu_read_unlock();
2988
2989 lttng_ht_destroy(ht);
2990 }
2991
2992 /*
2993 * This thread polls the channel fds to detect when they are being
2994 * closed. It closes all related streams if the channel is detected as
2995 * closed. It is currently only used as a shim layer for UST because the
2996 * consumerd needs to keep the per-stream wakeup end of pipes open for
2997 * periodical flush.
2998 */
2999 void *consumer_thread_channel_poll(void *data)
3000 {
3001 int ret, i, pollfd, err = -1;
3002 uint32_t revents, nb_fd;
3003 struct lttng_consumer_channel *chan = NULL;
3004 struct lttng_ht_iter iter;
3005 struct lttng_ht_node_u64 *node;
3006 struct lttng_poll_event events;
3007 struct lttng_consumer_local_data *ctx = data;
3008 struct lttng_ht *channel_ht;
3009
3010 rcu_register_thread();
3011
3012 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
3013
3014 if (testpoint(consumerd_thread_channel)) {
3015 goto error_testpoint;
3016 }
3017
3018 health_code_update();
3019
3020 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3021 if (!channel_ht) {
3022 /* ENOMEM at this point. Better to bail out. */
3023 goto end_ht;
3024 }
3025
3026 DBG("Thread channel poll started");
3027
3028 /* Size is set to 1 for the consumer_channel pipe */
3029 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
3030 if (ret < 0) {
3031 ERR("Poll set creation failed");
3032 goto end_poll;
3033 }
3034
3035 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
3036 if (ret < 0) {
3037 goto end;
3038 }
3039
3040 /* Main loop */
3041 DBG("Channel main loop started");
3042
3043 while (1) {
3044 restart:
3045 health_code_update();
3046 DBG("Channel poll wait");
3047 health_poll_entry();
3048 ret = lttng_poll_wait(&events, -1);
3049 DBG("Channel poll return from wait with %d fd(s)",
3050 LTTNG_POLL_GETNB(&events));
3051 health_poll_exit();
3052 DBG("Channel event caught in thread");
3053 if (ret < 0) {
3054 if (errno == EINTR) {
3055 ERR("Poll EINTR caught");
3056 goto restart;
3057 }
3058 if (LTTNG_POLL_GETNB(&events) == 0) {
3059 err = 0; /* All is OK */
3060 }
3061 goto end;
3062 }
3063
3064 nb_fd = ret;
3065
3066 /* From here, the event is a channel wait fd */
3067 for (i = 0; i < nb_fd; i++) {
3068 health_code_update();
3069
3070 revents = LTTNG_POLL_GETEV(&events, i);
3071 pollfd = LTTNG_POLL_GETFD(&events, i);
3072
3073 if (pollfd == ctx->consumer_channel_pipe[0]) {
3074 if (revents & LPOLLIN) {
3075 enum consumer_channel_action action;
3076 uint64_t key;
3077
3078 ret = read_channel_pipe(ctx, &chan, &key, &action);
3079 if (ret <= 0) {
3080 if (ret < 0) {
3081 ERR("Error reading channel pipe");
3082 }
3083 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3084 continue;
3085 }
3086
3087 switch (action) {
3088 case CONSUMER_CHANNEL_ADD:
3089 DBG("Adding channel %d to poll set",
3090 chan->wait_fd);
3091
3092 lttng_ht_node_init_u64(&chan->wait_fd_node,
3093 chan->wait_fd);
3094 rcu_read_lock();
3095 lttng_ht_add_unique_u64(channel_ht,
3096 &chan->wait_fd_node);
3097 rcu_read_unlock();
3098 /* Add channel to the global poll events list */
3099 lttng_poll_add(&events, chan->wait_fd,
3100 LPOLLERR | LPOLLHUP);
3101 break;
3102 case CONSUMER_CHANNEL_DEL:
3103 {
3104 /*
3105 * This command should never be called if the channel
3106 * has streams monitored by either the data or metadata
3107 * thread. The consumer only notify this thread with a
3108 * channel del. command if it receives a destroy
3109 * channel command from the session daemon that send it
3110 * if a command prior to the GET_CHANNEL failed.
3111 */
3112
3113 rcu_read_lock();
3114 chan = consumer_find_channel(key);
3115 if (!chan) {
3116 rcu_read_unlock();
3117 ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
3118 break;
3119 }
3120 lttng_poll_del(&events, chan->wait_fd);
3121 iter.iter.node = &chan->wait_fd_node.node;
3122 ret = lttng_ht_del(channel_ht, &iter);
3123 assert(ret == 0);
3124
3125 switch (consumer_data.type) {
3126 case LTTNG_CONSUMER_KERNEL:
3127 break;
3128 case LTTNG_CONSUMER32_UST:
3129 case LTTNG_CONSUMER64_UST:
3130 health_code_update();
3131 /* Destroy streams that might have been left in the stream list. */
3132 clean_channel_stream_list(chan);
3133 break;
3134 default:
3135 ERR("Unknown consumer_data type");
3136 assert(0);
3137 }
3138
3139 /*
3140 * Release our own refcount. Force channel deletion even if
3141 * streams were not initialized.
3142 */
3143 if (!uatomic_sub_return(&chan->refcount, 1)) {
3144 consumer_del_channel(chan);
3145 }
3146 rcu_read_unlock();
3147 goto restart;
3148 }
3149 case CONSUMER_CHANNEL_QUIT:
3150 /*
3151 * Remove the pipe from the poll set and continue the loop
3152 * since their might be data to consume.
3153 */
3154 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3155 continue;
3156 default:
3157 ERR("Unknown action");
3158 break;
3159 }
3160 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3161 DBG("Channel thread pipe hung up");
3162 /*
3163 * Remove the pipe from the poll set and continue the loop
3164 * since their might be data to consume.
3165 */
3166 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3167 continue;
3168 } else {
3169 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3170 goto end;
3171 }
3172
3173 /* Handle other stream */
3174 continue;
3175 }
3176
3177 rcu_read_lock();
3178 {
3179 uint64_t tmp_id = (uint64_t) pollfd;
3180
3181 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3182 }
3183 node = lttng_ht_iter_get_node_u64(&iter);
3184 assert(node);
3185
3186 chan = caa_container_of(node, struct lttng_consumer_channel,
3187 wait_fd_node);
3188
3189 /* Check for error event */
3190 if (revents & (LPOLLERR | LPOLLHUP)) {
3191 DBG("Channel fd %d is hup|err.", pollfd);
3192
3193 lttng_poll_del(&events, chan->wait_fd);
3194 ret = lttng_ht_del(channel_ht, &iter);
3195 assert(ret == 0);
3196
3197 /*
3198 * This will close the wait fd for each stream associated to
3199 * this channel AND monitored by the data/metadata thread thus
3200 * will be clean by the right thread.
3201 */
3202 consumer_close_channel_streams(chan);
3203
3204 /* Release our own refcount */
3205 if (!uatomic_sub_return(&chan->refcount, 1)
3206 && !uatomic_read(&chan->nb_init_stream_left)) {
3207 consumer_del_channel(chan);
3208 }
3209 } else {
3210 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3211 rcu_read_unlock();
3212 goto end;
3213 }
3214
3215 /* Release RCU lock for the channel looked up */
3216 rcu_read_unlock();
3217 }
3218 }
3219
3220 /* All is OK */
3221 err = 0;
3222 end:
3223 lttng_poll_clean(&events);
3224 end_poll:
3225 destroy_channel_ht(channel_ht);
3226 end_ht:
3227 error_testpoint:
3228 DBG("Channel poll thread exiting");
3229 if (err) {
3230 health_error();
3231 ERR("Health error occurred in %s", __func__);