consumer: implement clear channel
[lttng-tools.git] / src / common / consumer / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <inttypes.h>
31 #include <signal.h>
32
33 #include <bin/lttng-consumerd/health-consumerd.h>
34 #include <common/common.h>
35 #include <common/utils.h>
36 #include <common/time.h>
37 #include <common/compat/poll.h>
38 #include <common/compat/endian.h>
39 #include <common/index/index.h>
40 #include <common/kernel-ctl/kernel-ctl.h>
41 #include <common/sessiond-comm/relayd.h>
42 #include <common/sessiond-comm/sessiond-comm.h>
43 #include <common/kernel-consumer/kernel-consumer.h>
44 #include <common/relayd/relayd.h>
45 #include <common/ust-consumer/ust-consumer.h>
46 #include <common/consumer/consumer-timer.h>
47 #include <common/consumer/consumer.h>
48 #include <common/consumer/consumer-stream.h>
49 #include <common/consumer/consumer-testpoint.h>
50 #include <common/align.h>
51 #include <common/consumer/consumer-metadata-cache.h>
52 #include <common/trace-chunk.h>
53 #include <common/trace-chunk-registry.h>
54 #include <common/string-utils/format.h>
55 #include <common/dynamic-array.h>
56
57 struct lttng_consumer_global_data consumer_data = {
58 .stream_count = 0,
59 .need_update = 1,
60 .type = LTTNG_CONSUMER_UNKNOWN,
61 };
62
63 enum consumer_channel_action {
64 CONSUMER_CHANNEL_ADD,
65 CONSUMER_CHANNEL_DEL,
66 CONSUMER_CHANNEL_QUIT,
67 };
68
69 struct consumer_channel_msg {
70 enum consumer_channel_action action;
71 struct lttng_consumer_channel *chan; /* add */
72 uint64_t key; /* del */
73 };
74
75 /* Flag used to temporarily pause data consumption from testpoints. */
76 int data_consumption_paused;
77
78 /*
79 * Flag to inform the polling thread to quit when all fd hung up. Updated by
80 * the consumer_thread_receive_fds when it notices that all fds has hung up.
81 * Also updated by the signal handler (consumer_should_exit()). Read by the
82 * polling threads.
83 */
84 int consumer_quit;
85
86 /*
87 * Global hash table containing respectively metadata and data streams. The
88 * stream element in this ht should only be updated by the metadata poll thread
89 * for the metadata and the data poll thread for the data.
90 */
91 static struct lttng_ht *metadata_ht;
92 static struct lttng_ht *data_ht;
93
94 static const char *get_consumer_domain(void)
95 {
96 switch (consumer_data.type) {
97 case LTTNG_CONSUMER_KERNEL:
98 return DEFAULT_KERNEL_TRACE_DIR;
99 case LTTNG_CONSUMER64_UST:
100 /* Fall-through. */
101 case LTTNG_CONSUMER32_UST:
102 return DEFAULT_UST_TRACE_DIR;
103 default:
104 abort();
105 }
106 }
107
108 /*
109 * Notify a thread lttng pipe to poll back again. This usually means that some
110 * global state has changed so we just send back the thread in a poll wait
111 * call.
112 */
113 static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
114 {
115 struct lttng_consumer_stream *null_stream = NULL;
116
117 assert(pipe);
118
119 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
120 }
121
122 static void notify_health_quit_pipe(int *pipe)
123 {
124 ssize_t ret;
125
126 ret = lttng_write(pipe[1], "4", 1);
127 if (ret < 1) {
128 PERROR("write consumer health quit");
129 }
130 }
131
132 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
133 struct lttng_consumer_channel *chan,
134 uint64_t key,
135 enum consumer_channel_action action)
136 {
137 struct consumer_channel_msg msg;
138 ssize_t ret;
139
140 memset(&msg, 0, sizeof(msg));
141
142 msg.action = action;
143 msg.chan = chan;
144 msg.key = key;
145 ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
146 if (ret < sizeof(msg)) {
147 PERROR("notify_channel_pipe write error");
148 }
149 }
150
151 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
152 uint64_t key)
153 {
154 notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
155 }
156
157 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
158 struct lttng_consumer_channel **chan,
159 uint64_t *key,
160 enum consumer_channel_action *action)
161 {
162 struct consumer_channel_msg msg;
163 ssize_t ret;
164
165 ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
166 if (ret < sizeof(msg)) {
167 ret = -1;
168 goto error;
169 }
170 *action = msg.action;
171 *chan = msg.chan;
172 *key = msg.key;
173 error:
174 return (int) ret;
175 }
176
177 /*
178 * Cleanup the stream list of a channel. Those streams are not yet globally
179 * visible
180 */
181 static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
182 {
183 struct lttng_consumer_stream *stream, *stmp;
184
185 assert(channel);
186
187 /* Delete streams that might have been left in the stream list. */
188 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
189 send_node) {
190 cds_list_del(&stream->send_node);
191 /*
192 * Once a stream is added to this list, the buffers were created so we
193 * have a guarantee that this call will succeed. Setting the monitor
194 * mode to 0 so we don't lock nor try to delete the stream from the
195 * global hash table.
196 */
197 stream->monitor = 0;
198 consumer_stream_destroy(stream, NULL);
199 }
200 }
201
202 /*
203 * Find a stream. The consumer_data.lock must be locked during this
204 * call.
205 */
206 static struct lttng_consumer_stream *find_stream(uint64_t key,
207 struct lttng_ht *ht)
208 {
209 struct lttng_ht_iter iter;
210 struct lttng_ht_node_u64 *node;
211 struct lttng_consumer_stream *stream = NULL;
212
213 assert(ht);
214
215 /* -1ULL keys are lookup failures */
216 if (key == (uint64_t) -1ULL) {
217 return NULL;
218 }
219
220 rcu_read_lock();
221
222 lttng_ht_lookup(ht, &key, &iter);
223 node = lttng_ht_iter_get_node_u64(&iter);
224 if (node != NULL) {
225 stream = caa_container_of(node, struct lttng_consumer_stream, node);
226 }
227
228 rcu_read_unlock();
229
230 return stream;
231 }
232
233 static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
234 {
235 struct lttng_consumer_stream *stream;
236
237 rcu_read_lock();
238 stream = find_stream(key, ht);
239 if (stream) {
240 stream->key = (uint64_t) -1ULL;
241 /*
242 * We don't want the lookup to match, but we still need
243 * to iterate on this stream when iterating over the hash table. Just
244 * change the node key.
245 */
246 stream->node.key = (uint64_t) -1ULL;
247 }
248 rcu_read_unlock();
249 }
250
251 /*
252 * Return a channel object for the given key.
253 *
254 * RCU read side lock MUST be acquired before calling this function and
255 * protects the channel ptr.
256 */
257 struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
258 {
259 struct lttng_ht_iter iter;
260 struct lttng_ht_node_u64 *node;
261 struct lttng_consumer_channel *channel = NULL;
262
263 /* -1ULL keys are lookup failures */
264 if (key == (uint64_t) -1ULL) {
265 return NULL;
266 }
267
268 lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
269 node = lttng_ht_iter_get_node_u64(&iter);
270 if (node != NULL) {
271 channel = caa_container_of(node, struct lttng_consumer_channel, node);
272 }
273
274 return channel;
275 }
276
277 /*
278 * There is a possibility that the consumer does not have enough time between
279 * the close of the channel on the session daemon and the cleanup in here thus
280 * once we have a channel add with an existing key, we know for sure that this
281 * channel will eventually get cleaned up by all streams being closed.
282 *
283 * This function just nullifies the already existing channel key.
284 */
285 static void steal_channel_key(uint64_t key)
286 {
287 struct lttng_consumer_channel *channel;
288
289 rcu_read_lock();
290 channel = consumer_find_channel(key);
291 if (channel) {
292 channel->key = (uint64_t) -1ULL;
293 /*
294 * We don't want the lookup to match, but we still need to iterate on
295 * this channel when iterating over the hash table. Just change the
296 * node key.
297 */
298 channel->node.key = (uint64_t) -1ULL;
299 }
300 rcu_read_unlock();
301 }
302
303 static void free_channel_rcu(struct rcu_head *head)
304 {
305 struct lttng_ht_node_u64 *node =
306 caa_container_of(head, struct lttng_ht_node_u64, head);
307 struct lttng_consumer_channel *channel =
308 caa_container_of(node, struct lttng_consumer_channel, node);
309
310 switch (consumer_data.type) {
311 case LTTNG_CONSUMER_KERNEL:
312 break;
313 case LTTNG_CONSUMER32_UST:
314 case LTTNG_CONSUMER64_UST:
315 lttng_ustconsumer_free_channel(channel);
316 break;
317 default:
318 ERR("Unknown consumer_data type");
319 abort();
320 }
321 free(channel);
322 }
323
324 /*
325 * RCU protected relayd socket pair free.
326 */
327 static void free_relayd_rcu(struct rcu_head *head)
328 {
329 struct lttng_ht_node_u64 *node =
330 caa_container_of(head, struct lttng_ht_node_u64, head);
331 struct consumer_relayd_sock_pair *relayd =
332 caa_container_of(node, struct consumer_relayd_sock_pair, node);
333
334 /*
335 * Close all sockets. This is done in the call RCU since we don't want the
336 * socket fds to be reassigned thus potentially creating bad state of the
337 * relayd object.
338 *
339 * We do not have to lock the control socket mutex here since at this stage
340 * there is no one referencing to this relayd object.
341 */
342 (void) relayd_close(&relayd->control_sock);
343 (void) relayd_close(&relayd->data_sock);
344
345 pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
346 free(relayd);
347 }
348
349 /*
350 * Destroy and free relayd socket pair object.
351 */
352 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
353 {
354 int ret;
355 struct lttng_ht_iter iter;
356
357 if (relayd == NULL) {
358 return;
359 }
360
361 DBG("Consumer destroy and close relayd socket pair");
362
363 iter.iter.node = &relayd->node.node;
364 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
365 if (ret != 0) {
366 /* We assume the relayd is being or is destroyed */
367 return;
368 }
369
370 /* RCU free() call */
371 call_rcu(&relayd->node.head, free_relayd_rcu);
372 }
373
374 /*
375 * Remove a channel from the global list protected by a mutex. This function is
376 * also responsible for freeing its data structures.
377 */
378 void consumer_del_channel(struct lttng_consumer_channel *channel)
379 {
380 struct lttng_ht_iter iter;
381
382 DBG("Consumer delete channel key %" PRIu64, channel->key);
383
384 pthread_mutex_lock(&consumer_data.lock);
385 pthread_mutex_lock(&channel->lock);
386
387 /* Destroy streams that might have been left in the stream list. */
388 clean_channel_stream_list(channel);
389
390 if (channel->live_timer_enabled == 1) {
391 consumer_timer_live_stop(channel);
392 }
393 if (channel->monitor_timer_enabled == 1) {
394 consumer_timer_monitor_stop(channel);
395 }
396
397 switch (consumer_data.type) {
398 case LTTNG_CONSUMER_KERNEL:
399 break;
400 case LTTNG_CONSUMER32_UST:
401 case LTTNG_CONSUMER64_UST:
402 lttng_ustconsumer_del_channel(channel);
403 break;
404 default:
405 ERR("Unknown consumer_data type");
406 assert(0);
407 goto end;
408 }
409
410 lttng_trace_chunk_put(channel->trace_chunk);
411 channel->trace_chunk = NULL;
412
413 if (channel->is_published) {
414 int ret;
415
416 rcu_read_lock();
417 iter.iter.node = &channel->node.node;
418 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
419 assert(!ret);
420
421 iter.iter.node = &channel->channels_by_session_id_ht_node.node;
422 ret = lttng_ht_del(consumer_data.channels_by_session_id_ht,
423 &iter);
424 assert(!ret);
425 rcu_read_unlock();
426 }
427
428 channel->is_deleted = true;
429 call_rcu(&channel->node.head, free_channel_rcu);
430 end:
431 pthread_mutex_unlock(&channel->lock);
432 pthread_mutex_unlock(&consumer_data.lock);
433 }
434
435 /*
436 * Iterate over the relayd hash table and destroy each element. Finally,
437 * destroy the whole hash table.
438 */
439 static void cleanup_relayd_ht(void)
440 {
441 struct lttng_ht_iter iter;
442 struct consumer_relayd_sock_pair *relayd;
443
444 rcu_read_lock();
445
446 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
447 node.node) {
448 consumer_destroy_relayd(relayd);
449 }
450
451 rcu_read_unlock();
452
453 lttng_ht_destroy(consumer_data.relayd_ht);
454 }
455
456 /*
457 * Update the end point status of all streams having the given network sequence
458 * index (relayd index).
459 *
460 * It's atomically set without having the stream mutex locked which is fine
461 * because we handle the write/read race with a pipe wakeup for each thread.
462 */
463 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
464 enum consumer_endpoint_status status)
465 {
466 struct lttng_ht_iter iter;
467 struct lttng_consumer_stream *stream;
468
469 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
470
471 rcu_read_lock();
472
473 /* Let's begin with metadata */
474 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
475 if (stream->net_seq_idx == net_seq_idx) {
476 uatomic_set(&stream->endpoint_status, status);
477 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
478 }
479 }
480
481 /* Follow up by the data streams */
482 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
483 if (stream->net_seq_idx == net_seq_idx) {
484 uatomic_set(&stream->endpoint_status, status);
485 DBG("Delete flag set to data stream %d", stream->wait_fd);
486 }
487 }
488 rcu_read_unlock();
489 }
490
491 /*
492 * Cleanup a relayd object by flagging every associated streams for deletion,
493 * destroying the object meaning removing it from the relayd hash table,
494 * closing the sockets and freeing the memory in a RCU call.
495 *
496 * If a local data context is available, notify the threads that the streams'
497 * state have changed.
498 */
499 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
500 {
501 uint64_t netidx;
502
503 assert(relayd);
504
505 DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
506
507 /* Save the net sequence index before destroying the object */
508 netidx = relayd->net_seq_idx;
509
510 /*
511 * Delete the relayd from the relayd hash table, close the sockets and free
512 * the object in a RCU call.
513 */
514 consumer_destroy_relayd(relayd);
515
516 /* Set inactive endpoint to all streams */
517 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
518
519 /*
520 * With a local data context, notify the threads that the streams' state
521 * have changed. The write() action on the pipe acts as an "implicit"
522 * memory barrier ordering the updates of the end point status from the
523 * read of this status which happens AFTER receiving this notify.
524 */
525 notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
526 notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
527 }
528
529 /*
530 * Flag a relayd socket pair for destruction. Destroy it if the refcount
531 * reaches zero.
532 *
533 * RCU read side lock MUST be aquired before calling this function.
534 */
535 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
536 {
537 assert(relayd);
538
539 /* Set destroy flag for this object */
540 uatomic_set(&relayd->destroy_flag, 1);
541
542 /* Destroy the relayd if refcount is 0 */
543 if (uatomic_read(&relayd->refcount) == 0) {
544 consumer_destroy_relayd(relayd);
545 }
546 }
547
548 /*
549 * Completly destroy stream from every visiable data structure and the given
550 * hash table if one.
551 *
552 * One this call returns, the stream object is not longer usable nor visible.
553 */
554 void consumer_del_stream(struct lttng_consumer_stream *stream,
555 struct lttng_ht *ht)
556 {
557 consumer_stream_destroy(stream, ht);
558 }
559
560 /*
561 * XXX naming of del vs destroy is all mixed up.
562 */
563 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
564 {
565 consumer_stream_destroy(stream, data_ht);
566 }
567
568 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
569 {
570 consumer_stream_destroy(stream, metadata_ht);
571 }
572
573 void consumer_stream_update_channel_attributes(
574 struct lttng_consumer_stream *stream,
575 struct lttng_consumer_channel *channel)
576 {
577 stream->channel_read_only_attributes.tracefile_size =
578 channel->tracefile_size;
579 }
580
581 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
582 uint64_t stream_key,
583 const char *channel_name,
584 uint64_t relayd_id,
585 uint64_t session_id,
586 struct lttng_trace_chunk *trace_chunk,
587 int cpu,
588 int *alloc_ret,
589 enum consumer_channel_type type,
590 unsigned int monitor)
591 {
592 int ret;
593 struct lttng_consumer_stream *stream;
594
595 stream = zmalloc(sizeof(*stream));
596 if (stream == NULL) {
597 PERROR("malloc struct lttng_consumer_stream");
598 ret = -ENOMEM;
599 goto end;
600 }
601
602 if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
603 ERR("Failed to acquire trace chunk reference during the creation of a stream");
604 ret = -1;
605 goto error;
606 }
607
608 rcu_read_lock();
609 stream->key = stream_key;
610 stream->trace_chunk = trace_chunk;
611 stream->out_fd = -1;
612 stream->out_fd_offset = 0;
613 stream->output_written = 0;
614 stream->net_seq_idx = relayd_id;
615 stream->session_id = session_id;
616 stream->monitor = monitor;
617 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
618 stream->index_file = NULL;
619 stream->last_sequence_number = -1ULL;
620 stream->rotate_position = -1ULL;
621 pthread_mutex_init(&stream->lock, NULL);
622 pthread_mutex_init(&stream->metadata_timer_lock, NULL);
623
624 /* If channel is the metadata, flag this stream as metadata. */
625 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
626 stream->metadata_flag = 1;
627 /* Metadata is flat out. */
628 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
629 /* Live rendez-vous point. */
630 pthread_cond_init(&stream->metadata_rdv, NULL);
631 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
632 } else {
633 /* Format stream name to <channel_name>_<cpu_number> */
634 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
635 channel_name, cpu);
636 if (ret < 0) {
637 PERROR("snprintf stream name");
638 goto error;
639 }
640 }
641
642 /* Key is always the wait_fd for streams. */
643 lttng_ht_node_init_u64(&stream->node, stream->key);
644
645 /* Init node per channel id key */
646 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
647
648 /* Init session id node with the stream session id */
649 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
650
651 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
652 " relayd_id %" PRIu64 ", session_id %" PRIu64,
653 stream->name, stream->key, channel_key,
654 stream->net_seq_idx, stream->session_id);
655
656 rcu_read_unlock();
657 return stream;
658
659 error:
660 rcu_read_unlock();
661 lttng_trace_chunk_put(stream->trace_chunk);
662 free(stream);
663 end:
664 if (alloc_ret) {
665 *alloc_ret = ret;
666 }
667 return NULL;
668 }
669
670 /*
671 * Add a stream to the global list protected by a mutex.
672 */
673 void consumer_add_data_stream(struct lttng_consumer_stream *stream)
674 {
675 struct lttng_ht *ht = data_ht;
676
677 assert(stream);
678 assert(ht);
679
680 DBG3("Adding consumer stream %" PRIu64, stream->key);
681
682 pthread_mutex_lock(&consumer_data.lock);
683 pthread_mutex_lock(&stream->chan->lock);
684 pthread_mutex_lock(&stream->chan->timer_lock);
685 pthread_mutex_lock(&stream->lock);
686 rcu_read_lock();
687
688 /* Steal stream identifier to avoid having streams with the same key */
689 steal_stream_key(stream->key, ht);
690
691 lttng_ht_add_unique_u64(ht, &stream->node);
692
693 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
694 &stream->node_channel_id);
695
696 /*
697 * Add stream to the stream_list_ht of the consumer data. No need to steal
698 * the key since the HT does not use it and we allow to add redundant keys
699 * into this table.
700 */
701 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
702
703 /*
704 * When nb_init_stream_left reaches 0, we don't need to trigger any action
705 * in terms of destroying the associated channel, because the action that
706 * causes the count to become 0 also causes a stream to be added. The
707 * channel deletion will thus be triggered by the following removal of this
708 * stream.
709 */
710 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
711 /* Increment refcount before decrementing nb_init_stream_left */
712 cmm_smp_wmb();
713 uatomic_dec(&stream->chan->nb_init_stream_left);
714 }
715
716 /* Update consumer data once the node is inserted. */
717 consumer_data.stream_count++;
718 consumer_data.need_update = 1;
719
720 rcu_read_unlock();
721 pthread_mutex_unlock(&stream->lock);
722 pthread_mutex_unlock(&stream->chan->timer_lock);
723 pthread_mutex_unlock(&stream->chan->lock);
724 pthread_mutex_unlock(&consumer_data.lock);
725 }
726
727 /*
728 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
729 * be acquired before calling this.
730 */
731 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
732 {
733 int ret = 0;
734 struct lttng_ht_node_u64 *node;
735 struct lttng_ht_iter iter;
736
737 assert(relayd);
738
739 lttng_ht_lookup(consumer_data.relayd_ht,
740 &relayd->net_seq_idx, &iter);
741 node = lttng_ht_iter_get_node_u64(&iter);
742 if (node != NULL) {
743 goto end;
744 }
745 lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
746
747 end:
748 return ret;
749 }
750
751 /*
752 * Allocate and return a consumer relayd socket.
753 */
754 static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
755 uint64_t net_seq_idx)
756 {
757 struct consumer_relayd_sock_pair *obj = NULL;
758
759 /* net sequence index of -1 is a failure */
760 if (net_seq_idx == (uint64_t) -1ULL) {
761 goto error;
762 }
763
764 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
765 if (obj == NULL) {
766 PERROR("zmalloc relayd sock");
767 goto error;
768 }
769
770 obj->net_seq_idx = net_seq_idx;
771 obj->refcount = 0;
772 obj->destroy_flag = 0;
773 obj->control_sock.sock.fd = -1;
774 obj->data_sock.sock.fd = -1;
775 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
776 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
777
778 error:
779 return obj;
780 }
781
782 /*
783 * Find a relayd socket pair in the global consumer data.
784 *
785 * Return the object if found else NULL.
786 * RCU read-side lock must be held across this call and while using the
787 * returned object.
788 */
789 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
790 {
791 struct lttng_ht_iter iter;
792 struct lttng_ht_node_u64 *node;
793 struct consumer_relayd_sock_pair *relayd = NULL;
794
795 /* Negative keys are lookup failures */
796 if (key == (uint64_t) -1ULL) {
797 goto error;
798 }
799
800 lttng_ht_lookup(consumer_data.relayd_ht, &key,
801 &iter);
802 node = lttng_ht_iter_get_node_u64(&iter);
803 if (node != NULL) {
804 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
805 }
806
807 error:
808 return relayd;
809 }
810
811 /*
812 * Find a relayd and send the stream
813 *
814 * Returns 0 on success, < 0 on error
815 */
816 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
817 char *path)
818 {
819 int ret = 0;
820 struct consumer_relayd_sock_pair *relayd;
821
822 assert(stream);
823 assert(stream->net_seq_idx != -1ULL);
824 assert(path);
825
826 /* The stream is not metadata. Get relayd reference if exists. */
827 rcu_read_lock();
828 relayd = consumer_find_relayd(stream->net_seq_idx);
829 if (relayd != NULL) {
830 /* Add stream on the relayd */
831 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
832 ret = relayd_add_stream(&relayd->control_sock, stream->name,
833 get_consumer_domain(), path, &stream->relayd_stream_id,
834 stream->chan->tracefile_size,
835 stream->chan->tracefile_count,
836 stream->trace_chunk);
837 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
838 if (ret < 0) {
839 ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
840 lttng_consumer_cleanup_relayd(relayd);
841 goto end;
842 }
843
844 uatomic_inc(&relayd->refcount);
845 stream->sent_to_relayd = 1;
846 } else {
847 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
848 stream->key, stream->net_seq_idx);
849 ret = -1;
850 goto end;
851 }
852
853 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
854 stream->name, stream->key, stream->net_seq_idx);
855
856 end:
857 rcu_read_unlock();
858 return ret;
859 }
860
861 /*
862 * Find a relayd and send the streams sent message
863 *
864 * Returns 0 on success, < 0 on error
865 */
866 int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
867 {
868 int ret = 0;
869 struct consumer_relayd_sock_pair *relayd;
870
871 assert(net_seq_idx != -1ULL);
872
873 /* The stream is not metadata. Get relayd reference if exists. */
874 rcu_read_lock();
875 relayd = consumer_find_relayd(net_seq_idx);
876 if (relayd != NULL) {
877 /* Add stream on the relayd */
878 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
879 ret = relayd_streams_sent(&relayd->control_sock);
880 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
881 if (ret < 0) {
882 ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
883 lttng_consumer_cleanup_relayd(relayd);
884 goto end;
885 }
886 } else {
887 ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
888 net_seq_idx);
889 ret = -1;
890 goto end;
891 }
892
893 ret = 0;
894 DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
895
896 end:
897 rcu_read_unlock();
898 return ret;
899 }
900
901 /*
902 * Find a relayd and close the stream
903 */
904 void close_relayd_stream(struct lttng_consumer_stream *stream)
905 {
906 struct consumer_relayd_sock_pair *relayd;
907
908 /* The stream is not metadata. Get relayd reference if exists. */
909 rcu_read_lock();
910 relayd = consumer_find_relayd(stream->net_seq_idx);
911 if (relayd) {
912 consumer_stream_relayd_close(stream, relayd);
913 }
914 rcu_read_unlock();
915 }
916
917 /*
918 * Handle stream for relayd transmission if the stream applies for network
919 * streaming where the net sequence index is set.
920 *
921 * Return destination file descriptor or negative value on error.
922 */
923 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
924 size_t data_size, unsigned long padding,
925 struct consumer_relayd_sock_pair *relayd)
926 {
927 int outfd = -1, ret;
928 struct lttcomm_relayd_data_hdr data_hdr;
929
930 /* Safety net */
931 assert(stream);
932 assert(relayd);
933
934 /* Reset data header */
935 memset(&data_hdr, 0, sizeof(data_hdr));
936
937 if (stream->metadata_flag) {
938 /* Caller MUST acquire the relayd control socket lock */
939 ret = relayd_send_metadata(&relayd->control_sock, data_size);
940 if (ret < 0) {
941 goto error;
942 }
943
944 /* Metadata are always sent on the control socket. */
945 outfd = relayd->control_sock.sock.fd;
946 } else {
947 /* Set header with stream information */
948 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
949 data_hdr.data_size = htobe32(data_size);
950 data_hdr.padding_size = htobe32(padding);
951
952 /*
953 * Note that net_seq_num below is assigned with the *current* value of
954 * next_net_seq_num and only after that the next_net_seq_num will be
955 * increment. This is why when issuing a command on the relayd using
956 * this next value, 1 should always be substracted in order to compare
957 * the last seen sequence number on the relayd side to the last sent.
958 */
959 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
960 /* Other fields are zeroed previously */
961
962 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
963 sizeof(data_hdr));
964 if (ret < 0) {
965 goto error;
966 }
967
968 ++stream->next_net_seq_num;
969
970 /* Set to go on data socket */
971 outfd = relayd->data_sock.sock.fd;
972 }
973
974 error:
975 return outfd;
976 }
977
978 /*
979 * Trigger a dump of the metadata content. Following/during the succesful
980 * completion of this call, the metadata poll thread will start receiving
981 * metadata packets to consume.
982 *
983 * The caller must hold the channel and stream locks.
984 */
985 static
986 int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream)
987 {
988 int ret;
989
990 ASSERT_LOCKED(stream->chan->lock);
991 ASSERT_LOCKED(stream->lock);
992 assert(stream->metadata_flag);
993 assert(stream->chan->trace_chunk);
994
995 switch (consumer_data.type) {
996 case LTTNG_CONSUMER_KERNEL:
997 /*
998 * Reset the position of what has been read from the
999 * metadata cache to 0 so we can dump it again.
1000 */
1001 ret = kernctl_metadata_cache_dump(stream->wait_fd);
1002 break;
1003 case LTTNG_CONSUMER32_UST:
1004 case LTTNG_CONSUMER64_UST:
1005 /*
1006 * Reset the position pushed from the metadata cache so it
1007 * will write from the beginning on the next push.
1008 */
1009 stream->ust_metadata_pushed = 0;
1010 ret = consumer_metadata_wakeup_pipe(stream->chan);
1011 break;
1012 default:
1013 ERR("Unknown consumer_data type");
1014 abort();
1015 }
1016 if (ret < 0) {
1017 ERR("Failed to dump the metadata cache");
1018 }
1019 return ret;
1020 }
1021
1022 static
1023 int lttng_consumer_channel_set_trace_chunk(
1024 struct lttng_consumer_channel *channel,
1025 struct lttng_trace_chunk *new_trace_chunk)
1026 {
1027 pthread_mutex_lock(&channel->lock);
1028 if (channel->is_deleted) {
1029 /*
1030 * The channel has been logically deleted and should no longer
1031 * be used. It has released its reference to its current trace
1032 * chunk and should not acquire a new one.
1033 *
1034 * Return success as there is nothing for the caller to do.
1035 */
1036 goto end;
1037 }
1038
1039 /*
1040 * The acquisition of the reference cannot fail (barring
1041 * a severe internal error) since a reference to the published
1042 * chunk is already held by the caller.
1043 */
1044 if (new_trace_chunk) {
1045 const bool acquired_reference = lttng_trace_chunk_get(
1046 new_trace_chunk);
1047
1048 assert(acquired_reference);
1049 }
1050
1051 lttng_trace_chunk_put(channel->trace_chunk);
1052 channel->trace_chunk = new_trace_chunk;
1053 end:
1054 pthread_mutex_unlock(&channel->lock);
1055 return 0;
1056 }
1057
1058 /*
1059 * Allocate and return a new lttng_consumer_channel object using the given key
1060 * to initialize the hash table node.
1061 *
1062 * On error, return NULL.
1063 */
1064 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
1065 uint64_t session_id,
1066 const uint64_t *chunk_id,
1067 const char *pathname,
1068 const char *name,
1069 uint64_t relayd_id,
1070 enum lttng_event_output output,
1071 uint64_t tracefile_size,
1072 uint64_t tracefile_count,
1073 uint64_t session_id_per_pid,
1074 unsigned int monitor,
1075 unsigned int live_timer_interval,
1076 const char *root_shm_path,
1077 const char *shm_path)
1078 {
1079 struct lttng_consumer_channel *channel = NULL;
1080 struct lttng_trace_chunk *trace_chunk = NULL;
1081
1082 if (chunk_id) {
1083 trace_chunk = lttng_trace_chunk_registry_find_chunk(
1084 consumer_data.chunk_registry, session_id,
1085 *chunk_id);
1086 if (!trace_chunk) {
1087 ERR("Failed to find trace chunk reference during creation of channel");
1088 goto end;
1089 }
1090 }
1091
1092 channel = zmalloc(sizeof(*channel));
1093 if (channel == NULL) {
1094 PERROR("malloc struct lttng_consumer_channel");
1095 goto end;
1096 }
1097
1098 channel->key = key;
1099 channel->refcount = 0;
1100 channel->session_id = session_id;
1101 channel->session_id_per_pid = session_id_per_pid;
1102 channel->relayd_id = relayd_id;
1103 channel->tracefile_size = tracefile_size;
1104 channel->tracefile_count = tracefile_count;
1105 channel->monitor = monitor;
1106 channel->live_timer_interval = live_timer_interval;
1107 pthread_mutex_init(&channel->lock, NULL);
1108 pthread_mutex_init(&channel->timer_lock, NULL);
1109
1110 switch (output) {
1111 case LTTNG_EVENT_SPLICE:
1112 channel->output = CONSUMER_CHANNEL_SPLICE;
1113 break;
1114 case LTTNG_EVENT_MMAP:
1115 channel->output = CONSUMER_CHANNEL_MMAP;
1116 break;
1117 default:
1118 assert(0);
1119 free(channel);
1120 channel = NULL;
1121 goto end;
1122 }
1123
1124 /*
1125 * In monitor mode, the streams associated with the channel will be put in
1126 * a special list ONLY owned by this channel. So, the refcount is set to 1
1127 * here meaning that the channel itself has streams that are referenced.
1128 *
1129 * On a channel deletion, once the channel is no longer visible, the
1130 * refcount is decremented and checked for a zero value to delete it. With
1131 * streams in no monitor mode, it will now be safe to destroy the channel.
1132 */
1133 if (!channel->monitor) {
1134 channel->refcount = 1;
1135 }
1136
1137 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
1138 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
1139
1140 strncpy(channel->name, name, sizeof(channel->name));
1141 channel->name[sizeof(channel->name) - 1] = '\0';
1142
1143 if (root_shm_path) {
1144 strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
1145 channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
1146 }
1147 if (shm_path) {
1148 strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
1149 channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
1150 }
1151
1152 lttng_ht_node_init_u64(&channel->node, channel->key);
1153 lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
1154 channel->session_id);
1155
1156 channel->wait_fd = -1;
1157 CDS_INIT_LIST_HEAD(&channel->streams.head);
1158
1159 if (trace_chunk) {
1160 int ret = lttng_consumer_channel_set_trace_chunk(channel,
1161 trace_chunk);
1162 if (ret) {
1163 goto error;
1164 }
1165 }
1166
1167 DBG("Allocated channel (key %" PRIu64 ")", channel->key);
1168
1169 end:
1170 lttng_trace_chunk_put(trace_chunk);
1171 return channel;
1172 error:
1173 consumer_del_channel(channel);
1174 channel = NULL;
1175 goto end;
1176 }
1177
1178 /*
1179 * Add a channel to the global list protected by a mutex.
1180 *
1181 * Always return 0 indicating success.
1182 */
1183 int consumer_add_channel(struct lttng_consumer_channel *channel,
1184 struct lttng_consumer_local_data *ctx)
1185 {
1186 pthread_mutex_lock(&consumer_data.lock);
1187 pthread_mutex_lock(&channel->lock);
1188 pthread_mutex_lock(&channel->timer_lock);
1189
1190 /*
1191 * This gives us a guarantee that the channel we are about to add to the
1192 * channel hash table will be unique. See this function comment on the why
1193 * we need to steel the channel key at this stage.
1194 */
1195 steal_channel_key(channel->key);
1196
1197 rcu_read_lock();
1198 lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
1199 lttng_ht_add_u64(consumer_data.channels_by_session_id_ht,
1200 &channel->channels_by_session_id_ht_node);
1201 rcu_read_unlock();
1202 channel->is_published = true;
1203
1204 pthread_mutex_unlock(&channel->timer_lock);
1205 pthread_mutex_unlock(&channel->lock);
1206 pthread_mutex_unlock(&consumer_data.lock);
1207
1208 if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
1209 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
1210 }
1211
1212 return 0;
1213 }
1214
1215 /*
1216 * Allocate the pollfd structure and the local view of the out fds to avoid
1217 * doing a lookup in the linked list and concurrency issues when writing is
1218 * needed. Called with consumer_data.lock held.
1219 *
1220 * Returns the number of fds in the structures.
1221 */
1222 static int update_poll_array(struct lttng_consumer_local_data *ctx,
1223 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
1224 struct lttng_ht *ht, int *nb_inactive_fd)
1225 {
1226 int i = 0;
1227 struct lttng_ht_iter iter;
1228 struct lttng_consumer_stream *stream;
1229
1230 assert(ctx);
1231 assert(ht);
1232 assert(pollfd);
1233 assert(local_stream);
1234
1235 DBG("Updating poll fd array");
1236 *nb_inactive_fd = 0;
1237 rcu_read_lock();
1238 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1239 /*
1240 * Only active streams with an active end point can be added to the
1241 * poll set and local stream storage of the thread.
1242 *
1243 * There is a potential race here for endpoint_status to be updated
1244 * just after the check. However, this is OK since the stream(s) will
1245 * be deleted once the thread is notified that the end point state has
1246 * changed where this function will be called back again.
1247 *
1248 * We track the number of inactive FDs because they still need to be
1249 * closed by the polling thread after a wakeup on the data_pipe or
1250 * metadata_pipe.
1251 */
1252 if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
1253 (*nb_inactive_fd)++;
1254 continue;
1255 }
1256 /*
1257 * This clobbers way too much the debug output. Uncomment that if you
1258 * need it for debugging purposes.
1259 */
1260 (*pollfd)[i].fd = stream->wait_fd;
1261 (*pollfd)[i].events = POLLIN | POLLPRI;
1262 local_stream[i] = stream;
1263 i++;
1264 }
1265 rcu_read_unlock();
1266
1267 /*
1268 * Insert the consumer_data_pipe at the end of the array and don't
1269 * increment i so nb_fd is the number of real FD.
1270 */
1271 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
1272 (*pollfd)[i].events = POLLIN | POLLPRI;
1273
1274 (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
1275 (*pollfd)[i + 1].events = POLLIN | POLLPRI;
1276 return i;
1277 }
1278
1279 /*
1280 * Poll on the should_quit pipe and the command socket return -1 on
1281 * error, 1 if should exit, 0 if data is available on the command socket
1282 */
1283 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1284 {
1285 int num_rdy;
1286
1287 restart:
1288 num_rdy = poll(consumer_sockpoll, 2, -1);
1289 if (num_rdy == -1) {
1290 /*
1291 * Restart interrupted system call.
1292 */
1293 if (errno == EINTR) {
1294 goto restart;
1295 }
1296 PERROR("Poll error");
1297 return -1;
1298 }
1299 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
1300 DBG("consumer_should_quit wake up");
1301 return 1;
1302 }
1303 return 0;
1304 }
1305
1306 /*
1307 * Set the error socket.
1308 */
1309 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
1310 int sock)
1311 {
1312 ctx->consumer_error_socket = sock;
1313 }
1314
1315 /*
1316 * Set the command socket path.
1317 */
1318 void lttng_consumer_set_command_sock_path(
1319 struct lttng_consumer_local_data *ctx, char *sock)
1320 {
1321 ctx->consumer_command_sock_path = sock;
1322 }
1323
1324 /*
1325 * Send return code to the session daemon.
1326 * If the socket is not defined, we return 0, it is not a fatal error
1327 */
1328 int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
1329 {
1330 if (ctx->consumer_error_socket > 0) {
1331 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
1332 sizeof(enum lttcomm_sessiond_command));
1333 }
1334
1335 return 0;
1336 }
1337
1338 /*
1339 * Close all the tracefiles and stream fds and MUST be called when all
1340 * instances are destroyed i.e. when all threads were joined and are ended.
1341 */
1342 void lttng_consumer_cleanup(void)
1343 {
1344 struct lttng_ht_iter iter;
1345 struct lttng_consumer_channel *channel;
1346 unsigned int trace_chunks_left;
1347
1348 rcu_read_lock();
1349
1350 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
1351 node.node) {
1352 consumer_del_channel(channel);
1353 }
1354
1355 rcu_read_unlock();
1356
1357 lttng_ht_destroy(consumer_data.channel_ht);
1358 lttng_ht_destroy(consumer_data.channels_by_session_id_ht);
1359
1360 cleanup_relayd_ht();
1361
1362 lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
1363
1364 /*
1365 * This HT contains streams that are freed by either the metadata thread or
1366 * the data thread so we do *nothing* on the hash table and simply destroy
1367 * it.
1368 */
1369 lttng_ht_destroy(consumer_data.stream_list_ht);
1370
1371 /*
1372 * Trace chunks in the registry may still exist if the session
1373 * daemon has encountered an internal error and could not
1374 * tear down its sessions and/or trace chunks properly.
1375 *
1376 * Release the session daemon's implicit reference to any remaining
1377 * trace chunk and print an error if any trace chunk was found. Note
1378 * that there are _no_ legitimate cases for trace chunks to be left,
1379 * it is a leak. However, it can happen following a crash of the
1380 * session daemon and not emptying the registry would cause an assertion
1381 * to hit.
1382 */
1383 trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk(
1384 consumer_data.chunk_registry);
1385 if (trace_chunks_left) {
1386 ERR("%u trace chunks are leaked by lttng-consumerd. "
1387 "This can be caused by an internal error of the session daemon.",
1388 trace_chunks_left);
1389 }
1390 /* Run all callbacks freeing each chunk. */
1391 rcu_barrier();
1392 lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry);
1393 }
1394
1395 /*
1396 * Called from signal handler.
1397 */
1398 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1399 {
1400 ssize_t ret;
1401
1402 CMM_STORE_SHARED(consumer_quit, 1);
1403 ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
1404 if (ret < 1) {
1405 PERROR("write consumer quit");
1406 }
1407
1408 DBG("Consumer flag that it should quit");
1409 }
1410
1411
1412 /*
1413 * Flush pending writes to trace output disk file.
1414 */
1415 static
1416 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1417 off_t orig_offset)
1418 {
1419 int ret;
1420 int outfd = stream->out_fd;
1421
1422 /*
1423 * This does a blocking write-and-wait on any page that belongs to the
1424 * subbuffer prior to the one we just wrote.
1425 * Don't care about error values, as these are just hints and ways to
1426 * limit the amount of page cache used.
1427 */
1428 if (orig_offset < stream->max_sb_size) {
1429 return;
1430 }
1431 lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
1432 stream->max_sb_size,
1433 SYNC_FILE_RANGE_WAIT_BEFORE
1434 | SYNC_FILE_RANGE_WRITE
1435 | SYNC_FILE_RANGE_WAIT_AFTER);
1436 /*
1437 * Give hints to the kernel about how we access the file:
1438 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1439 * we write it.
1440 *
1441 * We need to call fadvise again after the file grows because the
1442 * kernel does not seem to apply fadvise to non-existing parts of the
1443 * file.
1444 *
1445 * Call fadvise _after_ having waited for the page writeback to
1446 * complete because the dirty page writeback semantic is not well
1447 * defined. So it can be expected to lead to lower throughput in
1448 * streaming.
1449 */
1450 ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
1451 stream->max_sb_size, POSIX_FADV_DONTNEED);
1452 if (ret && ret != -ENOSYS) {
1453 errno = ret;
1454 PERROR("posix_fadvise on fd %i", outfd);
1455 }
1456 }
1457
1458 /*
1459 * Initialise the necessary environnement :
1460 * - create a new context
1461 * - create the poll_pipe
1462 * - create the should_quit pipe (for signal handler)
1463 * - create the thread pipe (for splice)
1464 *
1465 * Takes a function pointer as argument, this function is called when data is
1466 * available on a buffer. This function is responsible to do the
1467 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1468 * buffer configuration and then kernctl_put_next_subbuf at the end.
1469 *
1470 * Returns a pointer to the new context or NULL on error.
1471 */
1472 struct lttng_consumer_local_data *lttng_consumer_create(
1473 enum lttng_consumer_type type,
1474 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1475 struct lttng_consumer_local_data *ctx),
1476 int (*recv_channel)(struct lttng_consumer_channel *channel),
1477 int (*recv_stream)(struct lttng_consumer_stream *stream),
1478 int (*update_stream)(uint64_t stream_key, uint32_t state))
1479 {
1480 int ret;
1481 struct lttng_consumer_local_data *ctx;
1482
1483 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1484 consumer_data.type == type);
1485 consumer_data.type = type;
1486
1487 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
1488 if (ctx == NULL) {
1489 PERROR("allocating context");
1490 goto error;
1491 }
1492
1493 ctx->consumer_error_socket = -1;
1494 ctx->consumer_metadata_socket = -1;
1495 pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
1496 /* assign the callbacks */
1497 ctx->on_buffer_ready = buffer_ready;
1498 ctx->on_recv_channel = recv_channel;
1499 ctx->on_recv_stream = recv_stream;
1500 ctx->on_update_stream = update_stream;
1501
1502 ctx->consumer_data_pipe = lttng_pipe_open(0);
1503 if (!ctx->consumer_data_pipe) {
1504 goto error_poll_pipe;
1505 }
1506
1507 ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
1508 if (!ctx->consumer_wakeup_pipe) {
1509 goto error_wakeup_pipe;
1510 }
1511
1512 ret = pipe(ctx->consumer_should_quit);
1513 if (ret < 0) {
1514 PERROR("Error creating recv pipe");
1515 goto error_quit_pipe;
1516 }
1517
1518 ret = pipe(ctx->consumer_channel_pipe);
1519 if (ret < 0) {
1520 PERROR("Error creating channel pipe");
1521 goto error_channel_pipe;
1522 }
1523
1524 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1525 if (!ctx->consumer_metadata_pipe) {
1526 goto error_metadata_pipe;
1527 }
1528
1529 ctx->channel_monitor_pipe = -1;
1530
1531 return ctx;
1532
1533 error_metadata_pipe:
1534 utils_close_pipe(ctx->consumer_channel_pipe);
1535 error_channel_pipe:
1536 utils_close_pipe(ctx->consumer_should_quit);
1537 error_quit_pipe:
1538 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1539 error_wakeup_pipe:
1540 lttng_pipe_destroy(ctx->consumer_data_pipe);
1541 error_poll_pipe:
1542 free(ctx);
1543 error:
1544 return NULL;
1545 }
1546
1547 /*
1548 * Iterate over all streams of the hashtable and free them properly.
1549 */
1550 static void destroy_data_stream_ht(struct lttng_ht *ht)
1551 {
1552 struct lttng_ht_iter iter;
1553 struct lttng_consumer_stream *stream;
1554
1555 if (ht == NULL) {
1556 return;
1557 }
1558
1559 rcu_read_lock();
1560 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1561 /*
1562 * Ignore return value since we are currently cleaning up so any error
1563 * can't be handled.
1564 */
1565 (void) consumer_del_stream(stream, ht);
1566 }
1567 rcu_read_unlock();
1568
1569 lttng_ht_destroy(ht);
1570 }
1571
1572 /*
1573 * Iterate over all streams of the metadata hashtable and free them
1574 * properly.
1575 */
1576 static void destroy_metadata_stream_ht(struct lttng_ht *ht)
1577 {
1578 struct lttng_ht_iter iter;
1579 struct lttng_consumer_stream *stream;
1580
1581 if (ht == NULL) {
1582 return;
1583 }
1584
1585 rcu_read_lock();
1586 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1587 /*
1588 * Ignore return value since we are currently cleaning up so any error
1589 * can't be handled.
1590 */
1591 (void) consumer_del_metadata_stream(stream, ht);
1592 }
1593 rcu_read_unlock();
1594
1595 lttng_ht_destroy(ht);
1596 }
1597
1598 /*
1599 * Close all fds associated with the instance and free the context.
1600 */
1601 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1602 {
1603 int ret;
1604
1605 DBG("Consumer destroying it. Closing everything.");
1606
1607 if (!ctx) {
1608 return;
1609 }
1610
1611 destroy_data_stream_ht(data_ht);
1612 destroy_metadata_stream_ht(metadata_ht);
1613
1614 ret = close(ctx->consumer_error_socket);
1615 if (ret) {
1616 PERROR("close");
1617 }
1618 ret = close(ctx->consumer_metadata_socket);
1619 if (ret) {
1620 PERROR("close");
1621 }
1622 utils_close_pipe(ctx->consumer_channel_pipe);
1623 lttng_pipe_destroy(ctx->consumer_data_pipe);
1624 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1625 lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
1626 utils_close_pipe(ctx->consumer_should_quit);
1627
1628 unlink(ctx->consumer_command_sock_path);
1629 free(ctx);
1630 }
1631
1632 /*
1633 * Write the metadata stream id on the specified file descriptor.
1634 */
1635 static int write_relayd_metadata_id(int fd,
1636 struct lttng_consumer_stream *stream,
1637 unsigned long padding)
1638 {
1639 ssize_t ret;
1640 struct lttcomm_relayd_metadata_payload hdr;
1641
1642 hdr.stream_id = htobe64(stream->relayd_stream_id);
1643 hdr.padding_size = htobe32(padding);
1644 ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
1645 if (ret < sizeof(hdr)) {
1646 /*
1647 * This error means that the fd's end is closed so ignore the PERROR
1648 * not to clubber the error output since this can happen in a normal
1649 * code path.
1650 */
1651 if (errno != EPIPE) {
1652 PERROR("write metadata stream id");
1653 }
1654 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
1655 /*
1656 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1657 * handle writting the missing part so report that as an error and
1658 * don't lie to the caller.
1659 */
1660 ret = -1;
1661 goto end;
1662 }
1663 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1664 stream->relayd_stream_id, padding);
1665
1666 end:
1667 return (int) ret;
1668 }
1669
1670 /*
1671 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1672 * core function for writing trace buffers to either the local filesystem or
1673 * the network.
1674 *
1675 * It must be called with the stream and the channel lock held.
1676 *
1677 * Careful review MUST be put if any changes occur!
1678 *
1679 * Returns the number of bytes written
1680 */
1681 ssize_t lttng_consumer_on_read_subbuffer_mmap(
1682 struct lttng_consumer_local_data *ctx,
1683 struct lttng_consumer_stream *stream, unsigned long len,
1684 unsigned long padding,
1685 struct ctf_packet_index *index)
1686 {
1687 unsigned long mmap_offset;
1688 void *mmap_base;
1689 ssize_t ret = 0;
1690 off_t orig_offset = stream->out_fd_offset;
1691 /* Default is on the disk */
1692 int outfd = stream->out_fd;
1693 struct consumer_relayd_sock_pair *relayd = NULL;
1694 unsigned int relayd_hang_up = 0;
1695
1696 /* RCU lock for the relayd pointer */
1697 rcu_read_lock();
1698 assert(stream->net_seq_idx != (uint64_t) -1ULL ||
1699 stream->trace_chunk);
1700
1701 /* Flag that the current stream if set for network streaming. */
1702 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1703 relayd = consumer_find_relayd(stream->net_seq_idx);
1704 if (relayd == NULL) {
1705 ret = -EPIPE;
1706 goto end;
1707 }
1708 }
1709
1710 /* get the offset inside the fd to mmap */
1711 switch (consumer_data.type) {
1712 case LTTNG_CONSUMER_KERNEL:
1713 mmap_base = stream->mmap_base;
1714 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1715 if (ret < 0) {
1716 PERROR("tracer ctl get_mmap_read_offset");
1717 goto end;
1718 }
1719 break;
1720 case LTTNG_CONSUMER32_UST:
1721 case LTTNG_CONSUMER64_UST:
1722 mmap_base = lttng_ustctl_get_mmap_base(stream);
1723 if (!mmap_base) {
1724 ERR("read mmap get mmap base for stream %s", stream->name);
1725 ret = -EPERM;
1726 goto end;
1727 }
1728 ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
1729 if (ret != 0) {
1730 PERROR("tracer ctl get_mmap_read_offset");
1731 ret = -EINVAL;
1732 goto end;
1733 }
1734 break;
1735 default:
1736 ERR("Unknown consumer_data type");
1737 assert(0);
1738 }
1739
1740 /* Handle stream on the relayd if the output is on the network */
1741 if (relayd) {
1742 unsigned long netlen = len;
1743
1744 /*
1745 * Lock the control socket for the complete duration of the function
1746 * since from this point on we will use the socket.
1747 */
1748 if (stream->metadata_flag) {
1749 /* Metadata requires the control socket. */
1750 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1751 if (stream->reset_metadata_flag) {
1752 ret = relayd_reset_metadata(&relayd->control_sock,
1753 stream->relayd_stream_id,
1754 stream->metadata_version);
1755 if (ret < 0) {
1756 relayd_hang_up = 1;
1757 goto write_error;
1758 }
1759 stream->reset_metadata_flag = 0;
1760 }
1761 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1762 }
1763
1764 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1765 if (ret < 0) {
1766 relayd_hang_up = 1;
1767 goto write_error;
1768 }
1769 /* Use the returned socket. */
1770 outfd = ret;
1771
1772 /* Write metadata stream id before payload */
1773 if (stream->metadata_flag) {
1774 ret = write_relayd_metadata_id(outfd, stream, padding);
1775 if (ret < 0) {
1776 relayd_hang_up = 1;
1777 goto write_error;
1778 }
1779 }
1780 } else {
1781 /* No streaming, we have to set the len with the full padding */
1782 len += padding;
1783
1784 if (stream->metadata_flag && stream->reset_metadata_flag) {
1785 ret = utils_truncate_stream_file(stream->out_fd, 0);
1786 if (ret < 0) {
1787 ERR("Reset metadata file");
1788 goto end;
1789 }
1790 stream->reset_metadata_flag = 0;
1791 }
1792
1793 /*
1794 * Check if we need to change the tracefile before writing the packet.
1795 */
1796 if (stream->chan->tracefile_size > 0 &&
1797 (stream->tracefile_size_current + len) >
1798 stream->chan->tracefile_size) {
1799 ret = consumer_stream_rotate_output_files(stream);
1800 if (ret) {
1801 goto end;
1802 }
1803 outfd = stream->out_fd;
1804 orig_offset = 0;
1805 }
1806 stream->tracefile_size_current += len;
1807 if (index) {
1808 index->offset = htobe64(stream->out_fd_offset);
1809 }
1810 }
1811
1812 /*
1813 * This call guarantee that len or less is returned. It's impossible to
1814 * receive a ret value that is bigger than len.
1815 */
1816 ret = lttng_write(outfd, mmap_base + mmap_offset, len);
1817 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
1818 if (ret < 0 || ((size_t) ret != len)) {
1819 /*
1820 * Report error to caller if nothing was written else at least send the
1821 * amount written.
1822 */
1823 if (ret < 0) {
1824 ret = -errno;
1825 }
1826 relayd_hang_up = 1;
1827
1828 /* Socket operation failed. We consider the relayd dead */
1829 if (errno == EPIPE) {
1830 /*
1831 * This is possible if the fd is closed on the other side
1832 * (outfd) or any write problem. It can be verbose a bit for a
1833 * normal execution if for instance the relayd is stopped
1834 * abruptly. This can happen so set this to a DBG statement.
1835 */
1836 DBG("Consumer mmap write detected relayd hang up");
1837 } else {
1838 /* Unhandled error, print it and stop function right now. */
1839 PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
1840 }
1841 goto write_error;
1842 }
1843 stream->output_written += ret;
1844
1845 /* This call is useless on a socket so better save a syscall. */
1846 if (!relayd) {
1847 /* This won't block, but will start writeout asynchronously */
1848 lttng_sync_file_range(outfd, stream->out_fd_offset, len,
1849 SYNC_FILE_RANGE_WRITE);
1850 stream->out_fd_offset += len;
1851 lttng_consumer_sync_trace_file(stream, orig_offset);
1852 }
1853
1854 write_error:
1855 /*
1856 * This is a special case that the relayd has closed its socket. Let's
1857 * cleanup the relayd object and all associated streams.
1858 */
1859 if (relayd && relayd_hang_up) {
1860 ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
1861 lttng_consumer_cleanup_relayd(relayd);
1862 }
1863
1864 end:
1865 /* Unlock only if ctrl socket used */
1866 if (relayd && stream->metadata_flag) {
1867 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1868 }
1869
1870 rcu_read_unlock();
1871 return ret;
1872 }
1873
1874 /*
1875 * Splice the data from the ring buffer to the tracefile.
1876 *
1877 * It must be called with the stream lock held.
1878 *
1879 * Returns the number of bytes spliced.
1880 */
1881 ssize_t lttng_consumer_on_read_subbuffer_splice(
1882 struct lttng_consumer_local_data *ctx,
1883 struct lttng_consumer_stream *stream, unsigned long len,
1884 unsigned long padding,
1885 struct ctf_packet_index *index)
1886 {
1887 ssize_t ret = 0, written = 0, ret_splice = 0;
1888 loff_t offset = 0;
1889 off_t orig_offset = stream->out_fd_offset;
1890 int fd = stream->wait_fd;
1891 /* Default is on the disk */
1892 int outfd = stream->out_fd;
1893 struct consumer_relayd_sock_pair *relayd = NULL;
1894 int *splice_pipe;
1895 unsigned int relayd_hang_up = 0;
1896
1897 switch (consumer_data.type) {
1898 case LTTNG_CONSUMER_KERNEL:
1899 break;
1900 case LTTNG_CONSUMER32_UST:
1901 case LTTNG_CONSUMER64_UST:
1902 /* Not supported for user space tracing */
1903 return -ENOSYS;
1904 default:
1905 ERR("Unknown consumer_data type");
1906 assert(0);
1907 }
1908
1909 /* RCU lock for the relayd pointer */
1910 rcu_read_lock();
1911
1912 /* Flag that the current stream if set for network streaming. */
1913 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1914 relayd = consumer_find_relayd(stream->net_seq_idx);
1915 if (relayd == NULL) {
1916 written = -ret;
1917 goto end;
1918 }
1919 }
1920 splice_pipe = stream->splice_pipe;
1921
1922 /* Write metadata stream id before payload */
1923 if (relayd) {
1924 unsigned long total_len = len;
1925
1926 if (stream->metadata_flag) {
1927 /*
1928 * Lock the control socket for the complete duration of the function
1929 * since from this point on we will use the socket.
1930 */
1931 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1932
1933 if (stream->reset_metadata_flag) {
1934 ret = relayd_reset_metadata(&relayd->control_sock,
1935 stream->relayd_stream_id,
1936 stream->metadata_version);
1937 if (ret < 0) {
1938 relayd_hang_up = 1;
1939 goto write_error;
1940 }
1941 stream->reset_metadata_flag = 0;
1942 }
1943 ret = write_relayd_metadata_id(splice_pipe[1], stream,
1944 padding);
1945 if (ret < 0) {
1946 written = ret;
1947 relayd_hang_up = 1;
1948 goto write_error;
1949 }
1950
1951 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1952 }
1953
1954 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1955 if (ret < 0) {
1956 written = ret;
1957 relayd_hang_up = 1;
1958 goto write_error;
1959 }
1960 /* Use the returned socket. */
1961 outfd = ret;
1962 } else {
1963 /* No streaming, we have to set the len with the full padding */
1964 len += padding;
1965
1966 if (stream->metadata_flag && stream->reset_metadata_flag) {
1967 ret = utils_truncate_stream_file(stream->out_fd, 0);
1968 if (ret < 0) {
1969 ERR("Reset metadata file");
1970 goto end;
1971 }
1972 stream->reset_metadata_flag = 0;
1973 }
1974 /*
1975 * Check if we need to change the tracefile before writing the packet.
1976 */
1977 if (stream->chan->tracefile_size > 0 &&
1978 (stream->tracefile_size_current + len) >
1979 stream->chan->tracefile_size) {
1980 ret = consumer_stream_rotate_output_files(stream);
1981 if (ret < 0) {
1982 written = ret;
1983 goto end;
1984 }
1985 outfd = stream->out_fd;
1986 orig_offset = 0;
1987 }
1988 stream->tracefile_size_current += len;
1989 index->offset = htobe64(stream->out_fd_offset);
1990 }
1991
1992 while (len > 0) {
1993 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1994 (unsigned long)offset, len, fd, splice_pipe[1]);
1995 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
1996 SPLICE_F_MOVE | SPLICE_F_MORE);
1997 DBG("splice chan to pipe, ret %zd", ret_splice);
1998 if (ret_splice < 0) {
1999 ret = errno;
2000 written = -ret;
2001 PERROR("Error in relay splice");
2002 goto splice_error;
2003 }
2004
2005 /* Handle stream on the relayd if the output is on the network */
2006 if (relayd && stream->metadata_flag) {
2007 size_t metadata_payload_size =
2008 sizeof(struct lttcomm_relayd_metadata_payload);
2009
2010 /* Update counter to fit the spliced data */
2011 ret_splice += metadata_payload_size;
2012 len += metadata_payload_size;
2013 /*
2014 * We do this so the return value can match the len passed as
2015 * argument to this function.
2016 */
2017 written -= metadata_payload_size;
2018 }
2019
2020 /* Splice data out */
2021 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
2022 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
2023 DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
2024 outfd, ret_splice);
2025 if (ret_splice < 0) {
2026 ret = errno;
2027 written = -ret;
2028 relayd_hang_up = 1;
2029 goto write_error;
2030 } else if (ret_splice > len) {
2031 /*
2032 * We don't expect this code path to be executed but you never know
2033 * so this is an extra protection agains a buggy splice().
2034 */
2035 ret = errno;
2036 written += ret_splice;
2037 PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
2038 len);
2039 goto splice_error;
2040 } else {
2041 /* All good, update current len and continue. */
2042 len -= ret_splice;
2043 }
2044
2045 /* This call is useless on a socket so better save a syscall. */
2046 if (!relayd) {
2047 /* This won't block, but will start writeout asynchronously */
2048 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
2049 SYNC_FILE_RANGE_WRITE);
2050 stream->out_fd_offset += ret_splice;
2051 }
2052 stream->output_written += ret_splice;
2053 written += ret_splice;
2054 }
2055 if (!relayd) {
2056 lttng_consumer_sync_trace_file(stream, orig_offset);
2057 }
2058 goto end;
2059
2060 write_error:
2061 /*
2062 * This is a special case that the relayd has closed its socket. Let's
2063 * cleanup the relayd object and all associated streams.
2064 */
2065 if (relayd && relayd_hang_up) {
2066 ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
2067 lttng_consumer_cleanup_relayd(relayd);
2068 /* Skip splice error so the consumer does not fail */
2069 goto end;
2070 }
2071
2072 splice_error:
2073 /* send the appropriate error description to sessiond */
2074 switch (ret) {
2075 case EINVAL:
2076 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
2077 break;
2078 case ENOMEM:
2079 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
2080 break;
2081 case ESPIPE:
2082 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
2083 break;
2084 }
2085
2086 end:
2087 if (relayd && stream->metadata_flag) {
2088 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2089 }
2090
2091 rcu_read_unlock();
2092 return written;
2093 }
2094
2095 /*
2096 * Sample the snapshot positions for a specific fd
2097 *
2098 * Returns 0 on success, < 0 on error
2099 */
2100 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
2101 {
2102 switch (consumer_data.type) {
2103 case LTTNG_CONSUMER_KERNEL:
2104 return lttng_kconsumer_sample_snapshot_positions(stream);
2105 case LTTNG_CONSUMER32_UST:
2106 case LTTNG_CONSUMER64_UST:
2107 return lttng_ustconsumer_sample_snapshot_positions(stream);
2108 default:
2109 ERR("Unknown consumer_data type");
2110 assert(0);
2111 return -ENOSYS;
2112 }
2113 }
2114 /*
2115 * Take a snapshot for a specific fd
2116 *
2117 * Returns 0 on success, < 0 on error
2118 */
2119 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
2120 {
2121 switch (consumer_data.type) {
2122 case LTTNG_CONSUMER_KERNEL:
2123 return lttng_kconsumer_take_snapshot(stream);
2124 case LTTNG_CONSUMER32_UST:
2125 case LTTNG_CONSUMER64_UST:
2126 return lttng_ustconsumer_take_snapshot(stream);
2127 default:
2128 ERR("Unknown consumer_data type");
2129 assert(0);
2130 return -ENOSYS;
2131 }
2132 }
2133
2134 /*
2135 * Get the produced position
2136 *
2137 * Returns 0 on success, < 0 on error
2138 */
2139 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
2140 unsigned long *pos)
2141 {
2142 switch (consumer_data.type) {
2143 case LTTNG_CONSUMER_KERNEL:
2144 return lttng_kconsumer_get_produced_snapshot(stream, pos);
2145 case LTTNG_CONSUMER32_UST:
2146 case LTTNG_CONSUMER64_UST:
2147 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
2148 default:
2149 ERR("Unknown consumer_data type");
2150 assert(0);
2151 return -ENOSYS;
2152 }
2153 }
2154
2155 /*
2156 * Get the consumed position (free-running counter position in bytes).
2157 *
2158 * Returns 0 on success, < 0 on error
2159 */
2160 int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
2161 unsigned long *pos)
2162 {
2163 switch (consumer_data.type) {
2164 case LTTNG_CONSUMER_KERNEL:
2165 return lttng_kconsumer_get_consumed_snapshot(stream, pos);
2166 case LTTNG_CONSUMER32_UST:
2167 case LTTNG_CONSUMER64_UST:
2168 return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
2169 default:
2170 ERR("Unknown consumer_data type");
2171 assert(0);
2172 return -ENOSYS;
2173 }
2174 }
2175
2176 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
2177 int sock, struct pollfd *consumer_sockpoll)
2178 {
2179 switch (consumer_data.type) {
2180 case LTTNG_CONSUMER_KERNEL:
2181 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2182 case LTTNG_CONSUMER32_UST:
2183 case LTTNG_CONSUMER64_UST:
2184 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
2185 default:
2186 ERR("Unknown consumer_data type");
2187 assert(0);
2188 return -ENOSYS;
2189 }
2190 }
2191
2192 static
2193 void lttng_consumer_close_all_metadata(void)
2194 {
2195 switch (consumer_data.type) {
2196 case LTTNG_CONSUMER_KERNEL:
2197 /*
2198 * The Kernel consumer has a different metadata scheme so we don't
2199 * close anything because the stream will be closed by the session
2200 * daemon.
2201 */
2202 break;
2203 case LTTNG_CONSUMER32_UST:
2204 case LTTNG_CONSUMER64_UST:
2205 /*
2206 * Close all metadata streams. The metadata hash table is passed and
2207 * this call iterates over it by closing all wakeup fd. This is safe
2208 * because at this point we are sure that the metadata producer is
2209 * either dead or blocked.
2210 */
2211 lttng_ustconsumer_close_all_metadata(metadata_ht);
2212 break;
2213 default:
2214 ERR("Unknown consumer_data type");
2215 assert(0);
2216 }
2217 }
2218
2219 /*
2220 * Clean up a metadata stream and free its memory.
2221 */
2222 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
2223 struct lttng_ht *ht)
2224 {
2225 struct lttng_consumer_channel *channel = NULL;
2226 bool free_channel = false;
2227
2228 assert(stream);
2229 /*
2230 * This call should NEVER receive regular stream. It must always be
2231 * metadata stream and this is crucial for data structure synchronization.
2232 */
2233 assert(stream->metadata_flag);
2234
2235 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
2236
2237 pthread_mutex_lock(&consumer_data.lock);
2238 /*
2239 * Note that this assumes that a stream's channel is never changed and
2240 * that the stream's lock doesn't need to be taken to sample its
2241 * channel.
2242 */
2243 channel = stream->chan;
2244 pthread_mutex_lock(&channel->lock);
2245 pthread_mutex_lock(&stream->lock);
2246 if (channel->metadata_cache) {
2247 /* Only applicable to userspace consumers. */
2248 pthread_mutex_lock(&channel->metadata_cache->lock);
2249 }
2250
2251 /* Remove any reference to that stream. */
2252 consumer_stream_delete(stream, ht);
2253
2254 /* Close down everything including the relayd if one. */
2255 consumer_stream_close(stream);
2256 /* Destroy tracer buffers of the stream. */
2257 consumer_stream_destroy_buffers(stream);
2258
2259 /* Atomically decrement channel refcount since other threads can use it. */
2260 if (!uatomic_sub_return(&channel->refcount, 1)
2261 && !uatomic_read(&channel->nb_init_stream_left)) {
2262 /* Go for channel deletion! */
2263 free_channel = true;
2264 }
2265 stream->chan = NULL;
2266
2267 /*
2268 * Nullify the stream reference so it is not used after deletion. The
2269 * channel lock MUST be acquired before being able to check for a NULL
2270 * pointer value.
2271 */
2272 channel->metadata_stream = NULL;
2273
2274 if (channel->metadata_cache) {
2275 pthread_mutex_unlock(&channel->metadata_cache->lock);
2276 }
2277 pthread_mutex_unlock(&stream->lock);
2278 pthread_mutex_unlock(&channel->lock);
2279 pthread_mutex_unlock(&consumer_data.lock);
2280
2281 if (free_channel) {
2282 consumer_del_channel(channel);
2283 }
2284
2285 lttng_trace_chunk_put(stream->trace_chunk);
2286 stream->trace_chunk = NULL;
2287 consumer_stream_free(stream);
2288 }
2289
2290 /*
2291 * Action done with the metadata stream when adding it to the consumer internal
2292 * data structures to handle it.
2293 */
2294 void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
2295 {
2296 struct lttng_ht *ht = metadata_ht;
2297 struct lttng_ht_iter iter;
2298 struct lttng_ht_node_u64 *node;
2299
2300 assert(stream);
2301 assert(ht);
2302
2303 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
2304
2305 pthread_mutex_lock(&consumer_data.lock);
2306 pthread_mutex_lock(&stream->chan->lock);
2307 pthread_mutex_lock(&stream->chan->timer_lock);
2308 pthread_mutex_lock(&stream->lock);
2309
2310 /*
2311 * From here, refcounts are updated so be _careful_ when returning an error
2312 * after this point.
2313 */
2314
2315 rcu_read_lock();
2316
2317 /*
2318 * Lookup the stream just to make sure it does not exist in our internal
2319 * state. This should NEVER happen.
2320 */
2321 lttng_ht_lookup(ht, &stream->key, &iter);
2322 node = lttng_ht_iter_get_node_u64(&iter);
2323 assert(!node);
2324
2325 /*
2326 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2327 * in terms of destroying the associated channel, because the action that
2328 * causes the count to become 0 also causes a stream to be added. The
2329 * channel deletion will thus be triggered by the following removal of this
2330 * stream.
2331 */
2332 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
2333 /* Increment refcount before decrementing nb_init_stream_left */
2334 cmm_smp_wmb();
2335 uatomic_dec(&stream->chan->nb_init_stream_left);
2336 }
2337
2338 lttng_ht_add_unique_u64(ht, &stream->node);
2339
2340 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
2341 &stream->node_channel_id);
2342
2343 /*
2344 * Add stream to the stream_list_ht of the consumer data. No need to steal
2345 * the key since the HT does not use it and we allow to add redundant keys
2346 * into this table.
2347 */
2348 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
2349
2350 rcu_read_unlock();
2351
2352 pthread_mutex_unlock(&stream->lock);
2353 pthread_mutex_unlock(&stream->chan->lock);
2354 pthread_mutex_unlock(&stream->chan->timer_lock);
2355 pthread_mutex_unlock(&consumer_data.lock);
2356 }
2357
2358 /*
2359 * Delete data stream that are flagged for deletion (endpoint_status).
2360 */
2361 static void validate_endpoint_status_data_stream(void)
2362 {
2363 struct lttng_ht_iter iter;
2364 struct lttng_consumer_stream *stream;
2365
2366 DBG("Consumer delete flagged data stream");
2367
2368 rcu_read_lock();
2369 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
2370 /* Validate delete flag of the stream */
2371 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2372 continue;
2373 }
2374 /* Delete it right now */
2375 consumer_del_stream(stream, data_ht);
2376 }
2377 rcu_read_unlock();
2378 }
2379
2380 /*
2381 * Delete metadata stream that are flagged for deletion (endpoint_status).
2382 */
2383 static void validate_endpoint_status_metadata_stream(
2384 struct lttng_poll_event *pollset)
2385 {
2386 struct lttng_ht_iter iter;
2387 struct lttng_consumer_stream *stream;
2388
2389 DBG("Consumer delete flagged metadata stream");
2390
2391 assert(pollset);
2392
2393 rcu_read_lock();
2394 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
2395 /* Validate delete flag of the stream */
2396 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2397 continue;
2398 }
2399 /*
2400 * Remove from pollset so the metadata thread can continue without
2401 * blocking on a deleted stream.
2402 */
2403 lttng_poll_del(pollset, stream->wait_fd);
2404
2405 /* Delete it right now */
2406 consumer_del_metadata_stream(stream, metadata_ht);
2407 }
2408 rcu_read_unlock();
2409 }
2410
2411 /*
2412 * Thread polls on metadata file descriptor and write them on disk or on the
2413 * network.
2414 */
2415 void *consumer_thread_metadata_poll(void *data)
2416 {
2417 int ret, i, pollfd, err = -1;
2418 uint32_t revents, nb_fd;
2419 struct lttng_consumer_stream *stream = NULL;
2420 struct lttng_ht_iter iter;
2421 struct lttng_ht_node_u64 *node;
2422 struct lttng_poll_event events;
2423 struct lttng_consumer_local_data *ctx = data;
2424 ssize_t len;
2425
2426 rcu_register_thread();
2427
2428 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2429
2430 if (testpoint(consumerd_thread_metadata)) {
2431 goto error_testpoint;
2432 }
2433
2434 health_code_update();
2435
2436 DBG("Thread metadata poll started");
2437
2438 /* Size is set to 1 for the consumer_metadata pipe */
2439 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2440 if (ret < 0) {
2441 ERR("Poll set creation failed");
2442 goto end_poll;
2443 }
2444
2445 ret = lttng_poll_add(&events,
2446 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
2447 if (ret < 0) {
2448 goto end;
2449 }
2450
2451 /* Main loop */
2452 DBG("Metadata main loop started");
2453
2454 while (1) {
2455 restart:
2456 health_code_update();
2457 health_poll_entry();
2458 DBG("Metadata poll wait");
2459 ret = lttng_poll_wait(&events, -1);
2460 DBG("Metadata poll return from wait with %d fd(s)",
2461 LTTNG_POLL_GETNB(&events));
2462 health_poll_exit();
2463 DBG("Metadata event caught in thread");
2464 if (ret < 0) {
2465 if (errno == EINTR) {
2466 ERR("Poll EINTR caught");
2467 goto restart;
2468 }
2469 if (LTTNG_POLL_GETNB(&events) == 0) {
2470 err = 0; /* All is OK */
2471 }
2472 goto end;
2473 }
2474
2475 nb_fd = ret;
2476
2477 /* From here, the event is a metadata wait fd */
2478 for (i = 0; i < nb_fd; i++) {
2479 health_code_update();
2480
2481 revents = LTTNG_POLL_GETEV(&events, i);
2482 pollfd = LTTNG_POLL_GETFD(&events, i);
2483
2484 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
2485 if (revents & LPOLLIN) {
2486 ssize_t pipe_len;
2487
2488 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2489 &stream, sizeof(stream));
2490 if (pipe_len < sizeof(stream)) {
2491 if (pipe_len < 0) {
2492 PERROR("read metadata stream");
2493 }
2494 /*
2495 * Remove the pipe from the poll set and continue the loop
2496 * since their might be data to consume.
2497 */
2498 lttng_poll_del(&events,
2499 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2500 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2501 continue;
2502 }
2503
2504 /* A NULL stream means that the state has changed. */
2505 if (stream == NULL) {
2506 /* Check for deleted streams. */
2507 validate_endpoint_status_metadata_stream(&events);
2508 goto restart;
2509 }
2510
2511 DBG("Adding metadata stream %d to poll set",
2512 stream->wait_fd);
2513
2514 /* Add metadata stream to the global poll events list */
2515 lttng_poll_add(&events, stream->wait_fd,
2516 LPOLLIN | LPOLLPRI | LPOLLHUP);
2517 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2518 DBG("Metadata thread pipe hung up");
2519 /*
2520 * Remove the pipe from the poll set and continue the loop
2521 * since their might be data to consume.
2522 */
2523 lttng_poll_del(&events,
2524 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2525 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2526 continue;
2527 } else {
2528 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2529 goto end;
2530 }
2531
2532 /* Handle other stream */
2533 continue;
2534 }
2535
2536 rcu_read_lock();
2537 {
2538 uint64_t tmp_id = (uint64_t) pollfd;
2539
2540 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2541 }
2542 node = lttng_ht_iter_get_node_u64(&iter);
2543 assert(node);
2544
2545 stream = caa_container_of(node, struct lttng_consumer_stream,
2546 node);
2547
2548 if (revents & (LPOLLIN | LPOLLPRI)) {
2549 /* Get the data out of the metadata file descriptor */
2550 DBG("Metadata available on fd %d", pollfd);
2551 assert(stream->wait_fd == pollfd);
2552
2553 do {
2554 health_code_update();
2555
2556 len = ctx->on_buffer_ready(stream, ctx);
2557 /*
2558 * We don't check the return value here since if we get
2559 * a negative len, it means an error occurred thus we
2560 * simply remove it from the poll set and free the
2561 * stream.
2562 */
2563 } while (len > 0);
2564
2565 /* It's ok to have an unavailable sub-buffer */
2566 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2567 /* Clean up stream from consumer and free it. */
2568 lttng_poll_del(&events, stream->wait_fd);
2569 consumer_del_metadata_stream(stream, metadata_ht);
2570 }
2571 } else if (revents & (LPOLLERR | LPOLLHUP)) {
2572 DBG("Metadata fd %d is hup|err.", pollfd);
2573 if (!stream->hangup_flush_done
2574 && (consumer_data.type == LTTNG_CONSUMER32_UST
2575 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2576 DBG("Attempting to flush and consume the UST buffers");
2577 lttng_ustconsumer_on_stream_hangup(stream);
2578
2579 /* We just flushed the stream now read it. */
2580 do {
2581 health_code_update();
2582
2583 len = ctx->on_buffer_ready(stream, ctx);
2584 /*
2585 * We don't check the return value here since if we get
2586 * a negative len, it means an error occurred thus we
2587 * simply remove it from the poll set and free the
2588 * stream.
2589 */
2590 } while (len > 0);
2591 }
2592
2593 lttng_poll_del(&events, stream->wait_fd);
2594 /*
2595 * This call update the channel states, closes file descriptors
2596 * and securely free the stream.
2597 */
2598 consumer_del_metadata_stream(stream, metadata_ht);
2599 } else {
2600 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2601 rcu_read_unlock();
2602 goto end;
2603 }
2604 /* Release RCU lock for the stream looked up */
2605 rcu_read_unlock();
2606 }
2607 }
2608
2609 /* All is OK */
2610 err = 0;
2611 end:
2612 DBG("Metadata poll thread exiting");
2613
2614 lttng_poll_clean(&events);
2615 end_poll:
2616 error_testpoint:
2617 if (err) {
2618 health_error();
2619 ERR("Health error occurred in %s", __func__);
2620 }
2621 health_unregister(health_consumerd);
2622 rcu_unregister_thread();
2623 return NULL;
2624 }
2625
2626 /*
2627 * This thread polls the fds in the set to consume the data and write
2628 * it to tracefile if necessary.
2629 */
2630 void *consumer_thread_data_poll(void *data)
2631 {
2632 int num_rdy, num_hup, high_prio, ret, i, err = -1;
2633 struct pollfd *pollfd = NULL;
2634 /* local view of the streams */
2635 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
2636 /* local view of consumer_data.fds_count */
2637 int nb_fd = 0;
2638 /* 2 for the consumer_data_pipe and wake up pipe */
2639 const int nb_pipes_fd = 2;
2640 /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
2641 int nb_inactive_fd = 0;
2642 struct lttng_consumer_local_data *ctx = data;
2643 ssize_t len;
2644
2645 rcu_register_thread();
2646
2647 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2648
2649 if (testpoint(consumerd_thread_data)) {
2650 goto error_testpoint;
2651 }
2652
2653 health_code_update();
2654
2655 local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
2656 if (local_stream == NULL) {
2657 PERROR("local_stream malloc");
2658 goto end;
2659 }
2660
2661 while (1) {
2662 health_code_update();
2663
2664 high_prio = 0;
2665 num_hup = 0;
2666
2667 /*
2668 * the fds set has been updated, we need to update our
2669 * local array as well
2670 */
2671 pthread_mutex_lock(&consumer_data.lock);
2672 if (consumer_data.need_update) {
2673 free(pollfd);
2674 pollfd = NULL;
2675
2676 free(local_stream);
2677 local_stream = NULL;
2678
2679 /* Allocate for all fds */
2680 pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
2681 if (pollfd == NULL) {
2682 PERROR("pollfd malloc");
2683 pthread_mutex_unlock(&consumer_data.lock);
2684 goto end;
2685 }
2686
2687 local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
2688 sizeof(struct lttng_consumer_stream *));
2689 if (local_stream == NULL) {
2690 PERROR("local_stream malloc");
2691 pthread_mutex_unlock(&consumer_data.lock);
2692 goto end;
2693 }
2694 ret = update_poll_array(ctx, &pollfd, local_stream,
2695 data_ht, &nb_inactive_fd);
2696 if (ret < 0) {
2697 ERR("Error in allocating pollfd or local_outfds");
2698 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2699 pthread_mutex_unlock(&consumer_data.lock);
2700 goto end;
2701 }
2702 nb_fd = ret;
2703 consumer_data.need_update = 0;
2704 }
2705 pthread_mutex_unlock(&consumer_data.lock);
2706
2707 /* No FDs and consumer_quit, consumer_cleanup the thread */
2708 if (nb_fd == 0 && nb_inactive_fd == 0 &&
2709 CMM_LOAD_SHARED(consumer_quit) == 1) {
2710 err = 0; /* All is OK */
2711 goto end;
2712 }
2713 /* poll on the array of fds */
2714 restart:
2715 DBG("polling on %d fd", nb_fd + nb_pipes_fd);
2716 if (testpoint(consumerd_thread_data_poll)) {
2717 goto end;
2718 }
2719 health_poll_entry();
2720 num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
2721 health_poll_exit();
2722 DBG("poll num_rdy : %d", num_rdy);
2723 if (num_rdy == -1) {
2724 /*
2725 * Restart interrupted system call.
2726 */
2727 if (errno == EINTR) {
2728 goto restart;
2729 }
2730 PERROR("Poll error");
2731 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2732 goto end;
2733 } else if (num_rdy == 0) {
2734 DBG("Polling thread timed out");
2735 goto end;
2736 }
2737
2738 if (caa_unlikely(data_consumption_paused)) {
2739 DBG("Data consumption paused, sleeping...");
2740 sleep(1);
2741 goto restart;
2742 }
2743
2744 /*
2745 * If the consumer_data_pipe triggered poll go directly to the
2746 * beginning of the loop to update the array. We want to prioritize
2747 * array update over low-priority reads.
2748 */
2749 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2750 ssize_t pipe_readlen;
2751
2752 DBG("consumer_data_pipe wake up");
2753 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2754 &new_stream, sizeof(new_stream));
2755 if (pipe_readlen < sizeof(new_stream)) {
2756 PERROR("Consumer data pipe");
2757 /* Continue so we can at least handle the current stream(s). */
2758 continue;
2759 }
2760
2761 /*
2762 * If the stream is NULL, just ignore it. It's also possible that
2763 * the sessiond poll thread changed the consumer_quit state and is
2764 * waking us up to test it.
2765 */
2766 if (new_stream == NULL) {
2767 validate_endpoint_status_data_stream();
2768 continue;
2769 }
2770
2771 /* Continue to update the local streams and handle prio ones */
2772 continue;
2773 }
2774
2775 /* Handle wakeup pipe. */
2776 if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
2777 char dummy;
2778 ssize_t pipe_readlen;
2779
2780 pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
2781 sizeof(dummy));
2782 if (pipe_readlen < 0) {
2783 PERROR("Consumer data wakeup pipe");
2784 }
2785 /* We've been awakened to handle stream(s). */
2786 ctx->has_wakeup = 0;
2787 }
2788
2789 /* Take care of high priority channels first. */
2790 for (i = 0; i < nb_fd; i++) {
2791 health_code_update();
2792
2793 if (local_stream[i] == NULL) {
2794 continue;
2795 }
2796 if (pollfd[i].revents & POLLPRI) {
2797 DBG("Urgent read on fd %d", pollfd[i].fd);
2798 high_prio = 1;
2799 len = ctx->on_buffer_ready(local_stream[i], ctx);
2800 /* it's ok to have an unavailable sub-buffer */
2801 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2802 /* Clean the stream and free it. */
2803 consumer_del_stream(local_stream[i], data_ht);
2804 local_stream[i] = NULL;
2805 } else if (len > 0) {
2806 local_stream[i]->data_read = 1;
2807 }
2808 }
2809 }
2810
2811 /*
2812 * If we read high prio channel in this loop, try again
2813 * for more high prio data.
2814 */
2815 if (high_prio) {
2816 continue;
2817 }
2818
2819 /* Take care of low priority channels. */
2820 for (i = 0; i < nb_fd; i++) {
2821 health_code_update();
2822
2823 if (local_stream[i] == NULL) {
2824 continue;
2825 }
2826 if ((pollfd[i].revents & POLLIN) ||
2827 local_stream[i]->hangup_flush_done ||
2828 local_stream[i]->has_data) {
2829 DBG("Normal read on fd %d", pollfd[i].fd);
2830 len = ctx->on_buffer_ready(local_stream[i], ctx);
2831 /* it's ok to have an unavailable sub-buffer */
2832 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2833 /* Clean the stream and free it. */
2834 consumer_del_stream(local_stream[i], data_ht);
2835 local_stream[i] = NULL;
2836 } else if (len > 0) {
2837 local_stream[i]->data_read = 1;
2838 }
2839 }
2840 }
2841
2842 /* Handle hangup and errors */
2843 for (i = 0; i < nb_fd; i++) {
2844 health_code_update();
2845
2846 if (local_stream[i] == NULL) {
2847 continue;
2848 }
2849 if (!local_stream[i]->hangup_flush_done
2850 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2851 && (consumer_data.type == LTTNG_CONSUMER32_UST
2852 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2853 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2854 pollfd[i].fd);
2855 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2856 /* Attempt read again, for the data we just flushed. */
2857 local_stream[i]->data_read = 1;
2858 }
2859 /*
2860 * If the poll flag is HUP/ERR/NVAL and we have
2861 * read no data in this pass, we can remove the
2862 * stream from its hash table.
2863 */
2864 if ((pollfd[i].revents & POLLHUP)) {
2865 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2866 if (!local_stream[i]->data_read) {
2867 consumer_del_stream(local_stream[i], data_ht);
2868 local_stream[i] = NULL;
2869 num_hup++;
2870 }
2871 } else if (pollfd[i].revents & POLLERR) {
2872 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2873 if (!local_stream[i]->data_read) {
2874 consumer_del_stream(local_stream[i], data_ht);
2875 local_stream[i] = NULL;
2876 num_hup++;
2877 }
2878 } else if (pollfd[i].revents & POLLNVAL) {
2879 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2880 if (!local_stream[i]->data_read) {
2881 consumer_del_stream(local_stream[i], data_ht);
2882 local_stream[i] = NULL;
2883 num_hup++;
2884 }
2885 }
2886 if (local_stream[i] != NULL) {
2887 local_stream[i]->data_read = 0;
2888 }
2889 }
2890 }
2891 /* All is OK */
2892 err = 0;
2893 end:
2894 DBG("polling thread exiting");
2895 free(pollfd);
2896 free(local_stream);
2897
2898 /*
2899 * Close the write side of the pipe so epoll_wait() in
2900 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2901 * read side of the pipe. If we close them both, epoll_wait strangely does
2902 * not return and could create a endless wait period if the pipe is the
2903 * only tracked fd in the poll set. The thread will take care of closing
2904 * the read side.
2905 */
2906 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
2907
2908 error_testpoint:
2909 if (err) {
2910 health_error();
2911 ERR("Health error occurred in %s", __func__);
2912 }
2913 health_unregister(health_consumerd);
2914
2915 rcu_unregister_thread();
2916 return NULL;
2917 }
2918
2919 /*
2920 * Close wake-up end of each stream belonging to the channel. This will
2921 * allow the poll() on the stream read-side to detect when the
2922 * write-side (application) finally closes them.
2923 */
2924 static
2925 void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2926 {
2927 struct lttng_ht *ht;
2928 struct lttng_consumer_stream *stream;
2929 struct lttng_ht_iter iter;
2930
2931 ht = consumer_data.stream_per_chan_id_ht;
2932
2933 rcu_read_lock();
2934 cds_lfht_for_each_entry_duplicate(ht->ht,
2935 ht->hash_fct(&channel->key, lttng_ht_seed),
2936 ht->match_fct, &channel->key,
2937 &iter.iter, stream, node_channel_id.node) {
2938 /*
2939 * Protect against teardown with mutex.
2940 */
2941 pthread_mutex_lock(&stream->lock);
2942 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2943 goto next;
2944 }
2945 switch (consumer_data.type) {
2946 case LTTNG_CONSUMER_KERNEL:
2947 break;
2948 case LTTNG_CONSUMER32_UST:
2949 case LTTNG_CONSUMER64_UST:
2950 if (stream->metadata_flag) {
2951 /* Safe and protected by the stream lock. */
2952 lttng_ustconsumer_close_metadata(stream->chan);
2953 } else {
2954 /*
2955 * Note: a mutex is taken internally within
2956 * liblttng-ust-ctl to protect timer wakeup_fd
2957 * use from concurrent close.
2958 */
2959 lttng_ustconsumer_close_stream_wakeup(stream);
2960 }
2961 break;
2962 default:
2963 ERR("Unknown consumer_data type");
2964 assert(0);
2965 }
2966 next:
2967 pthread_mutex_unlock(&stream->lock);
2968 }
2969 rcu_read_unlock();
2970 }
2971
2972 static void destroy_channel_ht(struct lttng_ht *ht)
2973 {
2974 struct lttng_ht_iter iter;
2975 struct lttng_consumer_channel *channel;
2976 int ret;
2977
2978 if (ht == NULL) {
2979 return;
2980 }
2981
2982 rcu_read_lock();
2983 cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
2984 ret = lttng_ht_del(ht, &iter);
2985 assert(ret != 0);
2986 }
2987 rcu_read_unlock();
2988
2989 lttng_ht_destroy(ht);
2990 }
2991
2992 /*
2993 * This thread polls the channel fds to detect when they are being
2994 * closed. It closes all related streams if the channel is detected as
2995 * closed. It is currently only used as a shim layer for UST because the
2996 * consumerd needs to keep the per-stream wakeup end of pipes open for
2997 * periodical flush.
2998 */
2999 void *consumer_thread_channel_poll(void *data)
3000 {
3001 int ret, i, pollfd, err = -1;
3002 uint32_t revents, nb_fd;
3003 struct lttng_consumer_channel *chan = NULL;
3004 struct lttng_ht_iter iter;
3005 struct lttng_ht_node_u64 *node;
3006 struct lttng_poll_event events;
3007 struct lttng_consumer_local_data *ctx = data;
3008 struct lttng_ht *channel_ht;
3009
3010 rcu_register_thread();
3011
3012 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
3013
3014 if (testpoint(consumerd_thread_channel)) {
3015 goto error_testpoint;
3016 }
3017
3018 health_code_update();
3019
3020 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3021 if (!channel_ht) {
3022 /* ENOMEM at this point. Better to bail out. */
3023 goto end_ht;
3024 }
3025
3026 DBG("Thread channel poll started");
3027
3028 /* Size is set to 1 for the consumer_channel pipe */
3029 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
3030 if (ret < 0) {
3031 ERR("Poll set creation failed");
3032 goto end_poll;
3033 }
3034
3035 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
3036 if (ret < 0) {
3037 goto end;
3038 }
3039
3040 /* Main loop */
3041 DBG("Channel main loop started");
3042
3043 while (1) {
3044 restart:
3045 health_code_update();
3046 DBG("Channel poll wait");
3047 health_poll_entry();
3048 ret = lttng_poll_wait(&events, -1);
3049 DBG("Channel poll return from wait with %d fd(s)",
3050 LTTNG_POLL_GETNB(&events));
3051 health_poll_exit();
3052 DBG("Channel event caught in thread");
3053 if (ret < 0) {
3054 if (errno == EINTR) {
3055 ERR("Poll EINTR caught");
3056 goto restart;
3057 }
3058 if (LTTNG_POLL_GETNB(&events) == 0) {
3059 err = 0; /* All is OK */
3060 }
3061 goto end;
3062 }
3063
3064 nb_fd = ret;
3065
3066 /* From here, the event is a channel wait fd */
3067 for (i = 0; i < nb_fd; i++) {
3068 health_code_update();
3069
3070 revents = LTTNG_POLL_GETEV(&events, i);
3071 pollfd = LTTNG_POLL_GETFD(&events, i);
3072
3073 if (pollfd == ctx->consumer_channel_pipe[0]) {
3074 if (revents & LPOLLIN) {
3075 enum consumer_channel_action action;
3076 uint64_t key;
3077
3078 ret = read_channel_pipe(ctx, &chan, &key, &action);
3079 if (ret <= 0) {
3080 if (ret < 0) {
3081 ERR("Error reading channel pipe");
3082 }
3083 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3084 continue;
3085 }
3086
3087 switch (action) {
3088 case CONSUMER_CHANNEL_ADD:
3089 DBG("Adding channel %d to poll set",
3090 chan->wait_fd);
3091
3092 lttng_ht_node_init_u64(&chan->wait_fd_node,
3093 chan->wait_fd);
3094 rcu_read_lock();
3095 lttng_ht_add_unique_u64(channel_ht,
3096 &chan->wait_fd_node);
3097 rcu_read_unlock();
3098 /* Add channel to the global poll events list */
3099 lttng_poll_add(&events, chan->wait_fd,
3100 LPOLLERR | LPOLLHUP);
3101 break;
3102 case CONSUMER_CHANNEL_DEL:
3103 {
3104 /*
3105 * This command should never be called if the channel
3106 * has streams monitored by either the data or metadata
3107 * thread. The consumer only notify this thread with a
3108 * channel del. command if it receives a destroy
3109 * channel command from the session daemon that send it
3110 * if a command prior to the GET_CHANNEL failed.
3111 */
3112
3113 rcu_read_lock();
3114 chan = consumer_find_channel(key);
3115 if (!chan) {
3116 rcu_read_unlock();
3117 ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
3118 break;
3119 }
3120 lttng_poll_del(&events, chan->wait_fd);
3121 iter.iter.node = &chan->wait_fd_node.node;
3122 ret = lttng_ht_del(channel_ht, &iter);
3123 assert(ret == 0);
3124
3125 switch (consumer_data.type) {
3126 case LTTNG_CONSUMER_KERNEL:
3127 break;
3128 case LTTNG_CONSUMER32_UST:
3129 case LTTNG_CONSUMER64_UST:
3130 health_code_update();
3131 /* Destroy streams that might have been left in the stream list. */
3132 clean_channel_stream_list(chan);
3133 break;
3134 default:
3135 ERR("Unknown consumer_data type");
3136 assert(0);
3137 }
3138
3139 /*
3140 * Release our own refcount. Force channel deletion even if
3141 * streams were not initialized.
3142 */
3143 if (!uatomic_sub_return(&chan->refcount, 1)) {
3144 consumer_del_channel(chan);
3145 }
3146 rcu_read_unlock();
3147 goto restart;
3148 }
3149 case CONSUMER_CHANNEL_QUIT:
3150 /*
3151 * Remove the pipe from the poll set and continue the loop
3152 * since their might be data to consume.
3153 */
3154 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3155 continue;
3156 default:
3157 ERR("Unknown action");
3158 break;
3159 }
3160 } else if (revents & (LPOLLERR | LPOLLHUP)) {
3161 DBG("Channel thread pipe hung up");
3162 /*
3163 * Remove the pipe from the poll set and continue the loop
3164 * since their might be data to consume.
3165 */
3166 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
3167 continue;
3168 } else {
3169 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3170 goto end;
3171 }
3172
3173 /* Handle other stream */
3174 continue;
3175 }
3176
3177 rcu_read_lock();
3178 {
3179 uint64_t tmp_id = (uint64_t) pollfd;
3180
3181 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
3182 }
3183 node = lttng_ht_iter_get_node_u64(&iter);
3184 assert(node);
3185
3186 chan = caa_container_of(node, struct lttng_consumer_channel,
3187 wait_fd_node);
3188
3189 /* Check for error event */
3190 if (revents & (LPOLLERR | LPOLLHUP)) {
3191 DBG("Channel fd %d is hup|err.", pollfd);
3192
3193 lttng_poll_del(&events, chan->wait_fd);
3194 ret = lttng_ht_del(channel_ht, &iter);
3195 assert(ret == 0);
3196
3197 /*
3198 * This will close the wait fd for each stream associated to
3199 * this channel AND monitored by the data/metadata thread thus
3200 * will be clean by the right thread.
3201 */
3202 consumer_close_channel_streams(chan);
3203
3204 /* Release our own refcount */
3205 if (!uatomic_sub_return(&chan->refcount, 1)
3206 && !uatomic_read(&chan->nb_init_stream_left)) {
3207 consumer_del_channel(chan);
3208 }
3209 } else {
3210 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3211 rcu_read_unlock();
3212 goto end;
3213 }
3214
3215 /* Release RCU lock for the channel looked up */
3216 rcu_read_unlock();
3217 }
3218 }
3219
3220 /* All is OK */
3221 err = 0;
3222 end:
3223 lttng_poll_clean(&events);
3224 end_poll:
3225 destroy_channel_ht(channel_ht);
3226 end_ht:
3227 error_testpoint:
3228 DBG("Channel poll thread exiting");
3229 if (err) {
3230 health_error();
3231 ERR("Health error occurred in %s", __func__);
3232 }
3233 health_unregister(health_consumerd);
3234 rcu_unregister_thread();
3235 return NULL;
3236 }
3237
3238 static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
3239 struct pollfd *sockpoll, int client_socket)
3240 {
3241 int ret;
3242
3243 assert(ctx);
3244 assert(sockpoll);
3245
3246 ret = lttng_consumer_poll_socket(sockpoll);
3247 if (ret) {
3248 goto error;
3249 }
3250 DBG("Metadata connection on client_socket");
3251
3252 /* Blocking call, waiting for transmission */
3253 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
3254 if (ctx->consumer_metadata_socket < 0) {
3255 WARN("On accept metadata");
3256 ret = -1;
3257 goto error;
3258 }
3259 ret = 0;
3260
3261 error:
3262 return ret;
3263 }
3264
3265 /*
3266 * This thread listens on the consumerd socket and receives the file
3267 * descriptors from the session daemon.
3268 */
3269 void *consumer_thread_sessiond_poll(void *data)
3270 {
3271 int sock = -1, client_socket, ret, err = -1;
3272 /*
3273 * structure to poll for incoming data on communication socket avoids
3274 * making blocking sockets.
3275 */
3276 struct pollfd consumer_sockpoll[2];
3277 struct lttng_consumer_local_data *ctx = data;
3278
3279 rcu_register_thread();
3280
3281 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3282
3283 if (testpoint(consumerd_thread_sessiond)) {
3284 goto error_testpoint;
3285 }
3286
3287 health_code_update();
3288
3289 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3290 unlink(ctx->consumer_command_sock_path);
3291 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3292 if (client_socket < 0) {
3293 ERR("Cannot create command socket");
3294 goto end;
3295 }
3296
3297 ret = lttcomm_listen_unix_sock(client_socket);
3298 if (ret < 0) {
3299 goto end;
3300 }
3301
3302 DBG("Sending ready command to lttng-sessiond");
3303 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3304 /* return < 0 on error, but == 0 is not fatal */
3305 if (ret < 0) {
3306 ERR("Error sending ready command to lttng-sessiond");
3307 goto end;
3308 }
3309
3310 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3311 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3312 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3313 consumer_sockpoll[1].fd = client_socket;
3314 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3315
3316 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3317 if (ret) {
3318 if (ret > 0) {
3319 /* should exit */
3320 err = 0;
3321 }
3322 goto end;
3323 }
3324 DBG("Connection on client_socket");
3325
3326 /* Blocking call, waiting for transmission */
3327 sock = lttcomm_accept_unix_sock(client_socket);
3328 if (sock < 0) {
3329 WARN("On accept");
3330 goto end;
3331 }
3332
3333 /*
3334 * Setup metadata socket which is the second socket connection on the
3335 * command unix socket.
3336 */
3337 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
3338 if (ret) {
3339 if (ret > 0) {
3340 /* should exit */
3341 err = 0;
3342 }
3343 goto end;
3344 }
3345
3346 /* This socket is not useful anymore. */
3347 ret = close(client_socket);
3348 if (ret < 0) {
3349 PERROR("close client_socket");
3350 }
3351 client_socket = -1;
3352
3353 /* update the polling structure to poll on the established socket */
3354 consumer_sockpoll[1].fd = sock;
3355 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3356
3357 while (1) {
3358 health_code_update();
3359
3360 health_poll_entry();
3361 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3362 health_poll_exit();
3363 if (ret) {
3364 if (ret > 0) {
3365 /* should exit */
3366 err = 0;
3367 }
3368 goto end;
3369 }
3370 DBG("Incoming command on sock");
3371 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
3372 if (ret <= 0) {
3373 /*
3374 * This could simply be a session daemon quitting. Don't output
3375 * ERR() here.
3376 */
3377 DBG("Communication interrupted on command socket");
3378 err = 0;
3379 goto end;
3380 }
3381 if (CMM_LOAD_SHARED(consumer_quit)) {
3382 DBG("consumer_thread_receive_fds received quit from signal");
3383 err = 0; /* All is OK */
3384 goto end;
3385 }
3386 DBG("received command on sock");
3387 }
3388 /* All is OK */
3389 err = 0;
3390
3391 end:
3392 DBG("Consumer thread sessiond poll exiting");
3393
3394 /*
3395 * Close metadata streams since the producer is the session daemon which
3396 * just died.
3397 *
3398 * NOTE: for now, this only applies to the UST tracer.
3399 */
3400 lttng_consumer_close_all_metadata();
3401
3402 /*
3403 * when all fds have hung up, the polling thread
3404 * can exit cleanly
3405 */
3406 CMM_STORE_SHARED(consumer_quit, 1);
3407
3408 /*
3409 * Notify the data poll thread to poll back again and test the
3410 * consumer_quit state that we just set so to quit gracefully.
3411 */
3412 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
3413
3414 notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
3415
3416 notify_health_quit_pipe(health_quit_pipe);
3417
3418 /* Cleaning up possibly open sockets. */
3419 if (sock >= 0) {
3420 ret = close(sock);
3421 if (ret < 0) {
3422 PERROR("close sock sessiond poll");
3423 }
3424 }
3425 if (client_socket >= 0) {
3426 ret = close(client_socket);
3427 if (ret < 0) {
3428 PERROR("close client_socket sessiond poll");
3429 }
3430 }
3431
3432 error_testpoint:
3433 if (err) {
3434 health_error();
3435 ERR("Health error occurred in %s", __func__);
3436 }
3437 health_unregister(health_consumerd);
3438
3439 rcu_unregister_thread();
3440 return NULL;
3441 }
3442
3443 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
3444 struct lttng_consumer_local_data *ctx)
3445 {
3446 ssize_t ret;
3447
3448 pthread_mutex_lock(&stream->chan->lock);
3449 pthread_mutex_lock(&stream->lock);
3450 if (stream->metadata_flag) {
3451 pthread_mutex_lock(&stream->metadata_rdv_lock);
3452 }
3453
3454 switch (consumer_data.type) {
3455 case LTTNG_CONSUMER_KERNEL:
3456 ret = lttng_kconsumer_read_subbuffer(stream, ctx);
3457 break;
3458 case LTTNG_CONSUMER32_UST:
3459 case LTTNG_CONSUMER64_UST:
3460 ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
3461 break;
3462 default:
3463 ERR("Unknown consumer_data type");
3464 assert(0);
3465 ret = -ENOSYS;
3466 break;
3467 }
3468
3469 if (stream->metadata_flag) {
3470 pthread_cond_broadcast(&stream->metadata_rdv);
3471 pthread_mutex_unlock(&stream->metadata_rdv_lock);
3472 }
3473 pthread_mutex_unlock(&stream->lock);
3474 pthread_mutex_unlock(&stream->chan->lock);
3475
3476 return ret;
3477 }
3478
3479 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3480 {
3481 switch (consumer_data.type) {
3482 case LTTNG_CONSUMER_KERNEL:
3483 return lttng_kconsumer_on_recv_stream(stream);
3484 case LTTNG_CONSUMER32_UST:
3485 case LTTNG_CONSUMER64_UST:
3486 return lttng_ustconsumer_on_recv_stream(stream);
3487 default:
3488 ERR("Unknown consumer_data type");
3489 assert(0);
3490 return -ENOSYS;
3491 }
3492 }
3493
3494 /*
3495 * Allocate and set consumer data hash tables.
3496 */
3497 int lttng_consumer_init(void)
3498 {
3499 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3500 if (!consumer_data.channel_ht) {
3501 goto error;
3502 }
3503
3504 consumer_data.channels_by_session_id_ht =
3505 lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3506 if (!consumer_data.channels_by_session_id_ht) {
3507 goto error;
3508 }
3509
3510 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3511 if (!consumer_data.relayd_ht) {
3512 goto error;
3513 }
3514
3515 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3516 if (!consumer_data.stream_list_ht) {
3517 goto error;
3518 }
3519
3520 consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3521 if (!consumer_data.stream_per_chan_id_ht) {
3522 goto error;
3523 }
3524
3525 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3526 if (!data_ht) {
3527 goto error;
3528 }
3529
3530 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3531 if (!metadata_ht) {
3532 goto error;
3533 }
3534
3535 consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
3536 if (!consumer_data.chunk_registry) {
3537 goto error;
3538 }
3539
3540 return 0;
3541
3542 error:
3543 return -1;
3544 }
3545
3546 /*
3547 * Process the ADD_RELAYD command receive by a consumer.
3548 *
3549 * This will create a relayd socket pair and add it to the relayd hash table.
3550 * The caller MUST acquire a RCU read side lock before calling it.
3551 */
3552 void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
3553 struct lttng_consumer_local_data *ctx, int sock,
3554 struct pollfd *consumer_sockpoll,
3555 struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
3556 uint64_t relayd_session_id)
3557 {
3558 int fd = -1, ret = -1, relayd_created = 0;
3559 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3560 struct consumer_relayd_sock_pair *relayd = NULL;
3561
3562 assert(ctx);
3563 assert(relayd_sock);
3564
3565 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
3566
3567 /* Get relayd reference if exists. */
3568 relayd = consumer_find_relayd(net_seq_idx);
3569 if (relayd == NULL) {
3570 assert(sock_type == LTTNG_STREAM_CONTROL);
3571 /* Not found. Allocate one. */
3572 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3573 if (relayd == NULL) {
3574 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3575 goto error;
3576 } else {
3577 relayd->sessiond_session_id = sessiond_id;
3578 relayd_created = 1;
3579 }
3580
3581 /*
3582 * This code path MUST continue to the consumer send status message to
3583 * we can notify the session daemon and continue our work without
3584 * killing everything.
3585 */
3586 } else {
3587 /*
3588 * relayd key should never be found for control socket.
3589 */
3590 assert(sock_type != LTTNG_STREAM_CONTROL);
3591 }
3592
3593 /* First send a status message before receiving the fds. */
3594 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
3595 if (ret < 0) {
3596 /* Somehow, the session daemon is not responding anymore. */
3597 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3598 goto error_nosignal;
3599 }
3600
3601 /* Poll on consumer socket. */
3602 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3603 if (ret) {
3604 /* Needing to exit in the middle of a command: error. */
3605 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3606 goto error_nosignal;
3607 }
3608
3609 /* Get relayd socket from session daemon */
3610 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3611 if (ret != sizeof(fd)) {
3612 fd = -1; /* Just in case it gets set with an invalid value. */
3613
3614 /*
3615 * Failing to receive FDs might indicate a major problem such as
3616 * reaching a fd limit during the receive where the kernel returns a
3617 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3618 * don't take any chances and stop everything.
3619 *
3620 * XXX: Feature request #558 will fix that and avoid this possible
3621 * issue when reaching the fd limit.
3622 */
3623 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
3624 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
3625 goto error;
3626 }
3627
3628 /* Copy socket information and received FD */
3629 switch (sock_type) {
3630 case LTTNG_STREAM_CONTROL:
3631 /* Copy received lttcomm socket */
3632 lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
3633 ret = lttcomm_create_sock(&relayd->control_sock.sock);
3634 /* Handle create_sock error. */
3635 if (ret < 0) {
3636 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3637 goto error;
3638 }
3639 /*
3640 * Close the socket created internally by
3641 * lttcomm_create_sock, so we can replace it by the one
3642 * received from sessiond.
3643 */
3644 if (close(relayd->control_sock.sock.fd)) {
3645 PERROR("close");
3646 }
3647
3648 /* Assign new file descriptor */
3649 relayd->control_sock.sock.fd = fd;
3650 /* Assign version values. */
3651 relayd->control_sock.major = relayd_sock->major;
3652 relayd->control_sock.minor = relayd_sock->minor;
3653
3654 relayd->relayd_session_id = relayd_session_id;
3655
3656 break;
3657 case LTTNG_STREAM_DATA:
3658 /* Copy received lttcomm socket */
3659 lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
3660 ret = lttcomm_create_sock(&relayd->data_sock.sock);
3661 /* Handle create_sock error. */
3662 if (ret < 0) {
3663 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3664 goto error;
3665 }
3666 /*
3667 * Close the socket created internally by
3668 * lttcomm_create_sock, so we can replace it by the one
3669 * received from sessiond.
3670 */
3671 if (close(relayd->data_sock.sock.fd)) {
3672 PERROR("close");
3673 }
3674
3675 /* Assign new file descriptor */
3676 relayd->data_sock.sock.fd = fd;
3677 /* Assign version values. */
3678 relayd->data_sock.major = relayd_sock->major;
3679 relayd->data_sock.minor = relayd_sock->minor;
3680 break;
3681 default:
3682 ERR("Unknown relayd socket type (%d)", sock_type);
3683 ret_code = LTTCOMM_CONSUMERD_FATAL;
3684 goto error;
3685 }
3686
3687 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
3688 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3689 relayd->net_seq_idx, fd);
3690 /*
3691 * We gave the ownership of the fd to the relayd structure. Set the
3692 * fd to -1 so we don't call close() on it in the error path below.
3693 */
3694 fd = -1;
3695
3696 /* We successfully added the socket. Send status back. */
3697 ret = consumer_send_status_msg(sock, ret_code);
3698 if (ret < 0) {
3699 /* Somehow, the session daemon is not responding anymore. */
3700 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3701 goto error_nosignal;
3702 }
3703
3704 /*
3705 * Add relayd socket pair to consumer data hashtable. If object already
3706 * exists or on error, the function gracefully returns.
3707 */
3708 relayd->ctx = ctx;
3709 add_relayd(relayd);
3710
3711 /* All good! */
3712 return;
3713
3714 error:
3715 if (consumer_send_status_msg(sock, ret_code) < 0) {
3716 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3717 }
3718
3719 error_nosignal:
3720 /* Close received socket if valid. */
3721 if (fd >= 0) {
3722 if (close(fd)) {
3723 PERROR("close received socket");
3724 }
3725 }
3726
3727 if (relayd_created) {
3728 free(relayd);
3729 }
3730 }
3731
3732 /*
3733 * Search for a relayd associated to the session id and return the reference.
3734 *
3735 * A rcu read side lock MUST be acquire before calling this function and locked
3736 * until the relayd object is no longer necessary.
3737 */
3738 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3739 {
3740 struct lttng_ht_iter iter;
3741 struct consumer_relayd_sock_pair *relayd = NULL;
3742
3743 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3744 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
3745 node.node) {
3746 /*
3747 * Check by sessiond id which is unique here where the relayd session
3748 * id might not be when having multiple relayd.
3749 */
3750 if (relayd->sessiond_session_id == id) {
3751 /* Found the relayd. There can be only one per id. */
3752 goto found;
3753 }
3754 }
3755
3756 return NULL;
3757
3758 found:
3759 return relayd;
3760 }
3761
3762 /*
3763 * Check if for a given session id there is still data needed to be extract
3764 * from the buffers.
3765 *
3766 * Return 1 if data is pending or else 0 meaning ready to be read.
3767 */
3768 int consumer_data_pending(uint64_t id)
3769 {
3770 int ret;
3771 struct lttng_ht_iter iter;
3772 struct lttng_ht *ht;
3773 struct lttng_consumer_stream *stream;
3774 struct consumer_relayd_sock_pair *relayd = NULL;
3775 int (*data_pending)(struct lttng_consumer_stream *);
3776
3777 DBG("Consumer data pending command on session id %" PRIu64, id);
3778
3779 rcu_read_lock();
3780 pthread_mutex_lock(&consumer_data.lock);
3781
3782 switch (consumer_data.type) {
3783 case LTTNG_CONSUMER_KERNEL:
3784 data_pending = lttng_kconsumer_data_pending;
3785 break;
3786 case LTTNG_CONSUMER32_UST:
3787 case LTTNG_CONSUMER64_UST:
3788 data_pending = lttng_ustconsumer_data_pending;
3789 break;
3790 default:
3791 ERR("Unknown consumer data type");
3792 assert(0);
3793 }
3794
3795 /* Ease our life a bit */
3796 ht = consumer_data.stream_list_ht;
3797
3798 cds_lfht_for_each_entry_duplicate(ht->ht,
3799 ht->hash_fct(&id, lttng_ht_seed),
3800 ht->match_fct, &id,
3801 &iter.iter, stream, node_session_id.node) {
3802 pthread_mutex_lock(&stream->lock);
3803
3804 /*
3805 * A removed node from the hash table indicates that the stream has
3806 * been deleted thus having a guarantee that the buffers are closed
3807 * on the consumer side. However, data can still be transmitted
3808 * over the network so don't skip the relayd check.
3809 */
3810 ret = cds_lfht_is_node_deleted(&stream->node.node);
3811 if (!ret) {
3812 /* Check the stream if there is data in the buffers. */
3813 ret = data_pending(stream);
3814 if (ret == 1) {
3815 pthread_mutex_unlock(&stream->lock);
3816 goto data_pending;
3817 }
3818 }
3819
3820 pthread_mutex_unlock(&stream->lock);
3821 }
3822
3823 relayd = find_relayd_by_session_id(id);
3824 if (relayd) {
3825 unsigned int is_data_inflight = 0;
3826
3827 /* Send init command for data pending. */
3828 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3829 ret = relayd_begin_data_pending(&relayd->control_sock,
3830 relayd->relayd_session_id);
3831 if (ret < 0) {
3832 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3833 /* Communication error thus the relayd so no data pending. */
3834 goto data_not_pending;
3835 }
3836
3837 cds_lfht_for_each_entry_duplicate(ht->ht,
3838 ht->hash_fct(&id, lttng_ht_seed),
3839 ht->match_fct, &id,
3840 &iter.iter, stream, node_session_id.node) {
3841 if (stream->metadata_flag) {
3842 ret = relayd_quiescent_control(&relayd->control_sock,
3843 stream->relayd_stream_id);
3844 } else {
3845 ret = relayd_data_pending(&relayd->control_sock,
3846 stream->relayd_stream_id,
3847 stream->next_net_seq_num - 1);
3848 }
3849
3850 if (ret == 1) {
3851 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3852 goto data_pending;
3853 } else if (ret < 0) {
3854 ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
3855 lttng_consumer_cleanup_relayd(relayd);
3856 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3857 goto data_not_pending;
3858 }
3859 }
3860
3861 /* Send end command for data pending. */
3862 ret = relayd_end_data_pending(&relayd->control_sock,
3863 relayd->relayd_session_id, &is_data_inflight);
3864 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3865 if (ret < 0) {
3866 ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
3867 lttng_consumer_cleanup_relayd(relayd);
3868 goto data_not_pending;
3869 }
3870 if (is_data_inflight) {
3871 goto data_pending;
3872 }
3873 }
3874
3875 /*
3876 * Finding _no_ node in the hash table and no inflight data means that the
3877 * stream(s) have been removed thus data is guaranteed to be available for
3878 * analysis from the trace files.
3879 */
3880
3881 data_not_pending:
3882 /* Data is available to be read by a viewer. */
3883 pthread_mutex_unlock(&consumer_data.lock);
3884 rcu_read_unlock();
3885 return 0;
3886
3887 data_pending:
3888 /* Data is still being extracted from buffers. */
3889 pthread_mutex_unlock(&consumer_data.lock);
3890 rcu_read_unlock();
3891 return 1;
3892 }
3893
3894 /*
3895 * Send a ret code status message to the sessiond daemon.
3896 *
3897 * Return the sendmsg() return value.
3898 */
3899 int consumer_send_status_msg(int sock, int ret_code)
3900 {
3901 struct lttcomm_consumer_status_msg msg;
3902
3903 memset(&msg, 0, sizeof(msg));
3904 msg.ret_code = ret_code;
3905
3906 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3907 }
3908
3909 /*
3910 * Send a channel status message to the sessiond daemon.
3911 *
3912 * Return the sendmsg() return value.
3913 */
3914 int consumer_send_status_channel(int sock,
3915 struct lttng_consumer_channel *channel)
3916 {
3917 struct lttcomm_consumer_status_channel msg;
3918
3919 assert(sock >= 0);
3920
3921 memset(&msg, 0, sizeof(msg));
3922 if (!channel) {
3923 msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
3924 } else {
3925 msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3926 msg.key = channel->key;
3927 msg.stream_count = channel->streams.count;
3928 }
3929
3930 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3931 }
3932
3933 unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
3934 unsigned long produced_pos, uint64_t nb_packets_per_stream,
3935 uint64_t max_sb_size)
3936 {
3937 unsigned long start_pos;
3938
3939 if (!nb_packets_per_stream) {
3940 return consumed_pos; /* Grab everything */
3941 }
3942 start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size);
3943 start_pos -= max_sb_size * nb_packets_per_stream;
3944 if ((long) (start_pos - consumed_pos) < 0) {
3945 return consumed_pos; /* Grab everything */
3946 }
3947 return start_pos;
3948 }
3949
3950 static
3951 int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
3952 {
3953 int ret = 0;
3954
3955 switch (consumer_data.type) {
3956 case LTTNG_CONSUMER_KERNEL:
3957 if (producer_active) {
3958 ret = kernctl_buffer_flush(stream->wait_fd);
3959 if (ret < 0) {
3960 ERR("Failed to flush kernel stream");
3961 goto end;
3962 }
3963 } else {
3964 ret = kernctl_buffer_flush_empty(stream->wait_fd);
3965 if (ret < 0) {
3966 ERR("Failed to flush kernel stream");
3967 goto end;
3968 }
3969 }
3970 break;
3971 case LTTNG_CONSUMER32_UST:
3972 case LTTNG_CONSUMER64_UST:
3973 lttng_ustconsumer_flush_buffer(stream, producer_active);
3974 break;
3975 default:
3976 ERR("Unknown consumer_data type");
3977 abort();
3978 }
3979
3980 end:
3981 return ret;
3982 }
3983
3984 /*
3985 * Sample the rotate position for all the streams of a channel. If a stream
3986 * is already at the rotate position (produced == consumed), we flag it as
3987 * ready for rotation. The rotation of ready streams occurs after we have
3988 * replied to the session daemon that we have finished sampling the positions.
3989 * Must be called with RCU read-side lock held to ensure existence of channel.
3990 *
3991 * Returns 0 on success, < 0 on error
3992 */
3993 int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
3994 uint64_t key, uint64_t relayd_id, uint32_t metadata,
3995 struct lttng_consumer_local_data *ctx)
3996 {
3997 int ret;
3998 struct lttng_consumer_stream *stream;
3999 struct lttng_ht_iter iter;
4000 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
4001 struct lttng_dynamic_array stream_rotation_positions;
4002 uint64_t next_chunk_id, stream_count = 0;
4003 enum lttng_trace_chunk_status chunk_status;
4004 const bool is_local_trace = relayd_id == -1ULL;
4005 struct consumer_relayd_sock_pair *relayd = NULL;
4006 bool rotating_to_new_chunk = true;
4007
4008 DBG("Consumer sample rotate position for channel %" PRIu64, key);
4009
4010 lttng_dynamic_array_init(&stream_rotation_positions,
4011 sizeof(struct relayd_stream_rotation_position), NULL);
4012
4013 rcu_read_lock();
4014
4015 pthread_mutex_lock(&channel->lock);
4016 assert(channel->trace_chunk);
4017 chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
4018 &next_chunk_id);
4019 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4020 ret = -1;
4021 goto end_unlock_channel;
4022 }
4023
4024 cds_lfht_for_each_entry_duplicate(ht->ht,
4025 ht->hash_fct(&channel->key, lttng_ht_seed),
4026 ht->match_fct, &channel->key, &iter.iter,
4027 stream, node_channel_id.node) {
4028 unsigned long produced_pos = 0, consumed_pos = 0;
4029
4030 health_code_update();
4031
4032 /*
4033 * Lock stream because we are about to change its state.
4034 */
4035 pthread_mutex_lock(&stream->lock);
4036
4037 if (stream->trace_chunk == stream->chan->trace_chunk) {
4038 rotating_to_new_chunk = false;
4039 }
4040
4041 /*
4042 * Do not flush an empty packet when rotating from a NULL trace
4043 * chunk. The stream has no means to output data, and the prior
4044 * rotation which rotated to NULL performed that side-effect already.
4045 */
4046 if (stream->trace_chunk) {
4047 /*
4048 * For metadata stream, do an active flush, which does not
4049 * produce empty packets. For data streams, empty-flush;
4050 * ensures we have at least one packet in each stream per trace
4051 * chunk, even if no data was produced.
4052 */
4053 ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
4054 if (ret < 0) {
4055 ERR("Failed to flush stream %" PRIu64 " during channel rotation",
4056 stream->key);
4057 goto end_unlock_stream;
4058 }
4059 }
4060
4061 ret = lttng_consumer_take_snapshot(stream);
4062 if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
4063 ERR("Failed to sample snapshot position during channel rotation");
4064 goto end_unlock_stream;
4065 }
4066 if (!ret) {
4067 ret = lttng_consumer_get_produced_snapshot(stream,
4068 &produced_pos);
4069 if (ret < 0) {
4070 ERR("Failed to sample produced position during channel rotation");
4071 goto end_unlock_stream;
4072 }
4073
4074 ret = lttng_consumer_get_consumed_snapshot(stream,
4075 &consumed_pos);
4076 if (ret < 0) {
4077 ERR("Failed to sample consumed position during channel rotation");
4078 goto end_unlock_stream;
4079 }
4080 }
4081 /*
4082 * Align produced position on the start-of-packet boundary of the first
4083 * packet going into the next trace chunk.
4084 */
4085 produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
4086 if (consumed_pos == produced_pos) {
4087 stream->rotate_ready = true;
4088 }
4089 /*
4090 * The rotation position is based on the packet_seq_num of the
4091 * packet following the last packet that was consumed for this
4092 * stream, incremented by the offset between produced and
4093 * consumed positions. This rotation position is a lower bound
4094 * (inclusive) at which the next trace chunk starts. Since it
4095 * is a lower bound, it is OK if the packet_seq_num does not
4096 * correspond exactly to the same packet identified by the
4097 * consumed_pos, which can happen in overwrite mode.
4098 */
4099 if (stream->sequence_number_unavailable) {
4100 /*
4101 * Rotation should never be performed on a session which
4102 * interacts with a pre-2.8 lttng-modules, which does
4103 * not implement packet sequence number.
4104 */
4105 ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
4106 stream->key);
4107 ret = -1;
4108 goto end_unlock_stream;
4109 }
4110 stream->rotate_position = stream->last_sequence_number + 1 +
4111 ((produced_pos - consumed_pos) / stream->max_sb_size);
4112
4113 if (!is_local_trace) {
4114 /*
4115 * The relay daemon control protocol expects a rotation
4116 * position as "the sequence number of the first packet
4117 * _after_ the current trace chunk".
4118 */
4119 const struct relayd_stream_rotation_position position = {
4120 .stream_id = stream->relayd_stream_id,
4121 .rotate_at_seq_num = stream->rotate_position,
4122 };
4123
4124 ret = lttng_dynamic_array_add_element(
4125 &stream_rotation_positions,
4126 &position);
4127 if (ret) {
4128 ERR("Failed to allocate stream rotation position");
4129 goto end_unlock_stream;
4130 }
4131 stream_count++;
4132 }
4133 pthread_mutex_unlock(&stream->lock);
4134 }
4135 stream = NULL;
4136 pthread_mutex_unlock(&channel->lock);
4137
4138 if (is_local_trace) {
4139 ret = 0;
4140 goto end;
4141 }
4142
4143 relayd = consumer_find_relayd(relayd_id);
4144 if (!relayd) {
4145 ERR("Failed to find relayd %" PRIu64, relayd_id);
4146 ret = -1;
4147 goto end;
4148 }
4149
4150 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4151 ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
4152 rotating_to_new_chunk ? &next_chunk_id : NULL,
4153 (const struct relayd_stream_rotation_position *)
4154 stream_rotation_positions.buffer.data);
4155 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4156 if (ret < 0) {
4157 ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
4158 relayd->net_seq_idx);
4159 lttng_consumer_cleanup_relayd(relayd);
4160 goto end;
4161 }
4162
4163 ret = 0;
4164 goto end;
4165
4166 end_unlock_stream:
4167 pthread_mutex_unlock(&stream->lock);
4168 end_unlock_channel:
4169 pthread_mutex_unlock(&channel->lock);
4170 end:
4171 rcu_read_unlock();
4172 lttng_dynamic_array_reset(&stream_rotation_positions);
4173 return ret;
4174 }
4175
4176 static
4177 int consumer_clear_buffer(struct lttng_consumer_stream *stream)
4178 {
4179 int ret = 0;
4180 unsigned long consumed_pos_before, consumed_pos_after;
4181
4182 ret = lttng_consumer_sample_snapshot_positions(stream);
4183 if (ret < 0) {
4184 ERR("Taking snapshot positions");
4185 goto end;
4186 }
4187
4188 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
4189 if (ret < 0) {
4190 ERR("Consumed snapshot position");
4191 goto end;
4192 }
4193
4194 switch (consumer_data.type) {
4195 case LTTNG_CONSUMER_KERNEL:
4196 ret = kernctl_buffer_clear(stream->wait_fd);
4197 if (ret < 0) {
4198 ERR("Failed to flush kernel stream");
4199 goto end;
4200 }
4201 break;
4202 case LTTNG_CONSUMER32_UST:
4203 case LTTNG_CONSUMER64_UST:
4204 lttng_ustconsumer_clear_buffer(stream);
4205 break;
4206 default:
4207 ERR("Unknown consumer_data type");
4208 abort();
4209 }
4210
4211 ret = lttng_consumer_sample_snapshot_positions(stream);
4212 if (ret < 0) {
4213 ERR("Taking snapshot positions");
4214 goto end;
4215 }
4216 ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
4217 if (ret < 0) {
4218 ERR("Consumed snapshot position");
4219 goto end;
4220 }
4221 DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
4222 end:
4223 return ret;
4224 }
4225
4226 static
4227 int consumer_clear_stream(struct lttng_consumer_stream *stream)
4228 {
4229 int ret;
4230
4231 ret = consumer_flush_buffer(stream, 1);
4232 if (ret < 0) {
4233 ERR("Failed to flush stream %" PRIu64 " during channel clear",
4234 stream->key);
4235 ret = LTTCOMM_CONSUMERD_FATAL;
4236 goto error;
4237 }
4238
4239 ret = consumer_clear_buffer(stream);
4240 if (ret < 0) {
4241 ERR("Failed to clear stream %" PRIu64 " during channel clear",
4242 stream->key);
4243 ret = LTTCOMM_CONSUMERD_FATAL;
4244 goto error;
4245 }
4246
4247 ret = LTTCOMM_CONSUMERD_SUCCESS;
4248 error:
4249 return ret;
4250 }
4251
4252 static
4253 int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
4254 {
4255 int ret;
4256 struct lttng_consumer_stream *stream;
4257
4258 rcu_read_lock();
4259 pthread_mutex_lock(&channel->lock);
4260 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
4261 health_code_update();
4262 pthread_mutex_lock(&stream->lock);
4263 ret = consumer_clear_stream(stream);
4264 if (ret) {
4265 goto error_unlock;
4266 }
4267 pthread_mutex_unlock(&stream->lock);
4268 }
4269 pthread_mutex_unlock(&channel->lock);
4270 rcu_read_unlock();
4271 return 0;
4272
4273 error_unlock:
4274 pthread_mutex_unlock(&stream->lock);
4275 pthread_mutex_unlock(&channel->lock);
4276 rcu_read_unlock();
4277 if (ret) {
4278 goto error;
4279 }
4280 ret = LTTCOMM_CONSUMERD_SUCCESS;
4281 error:
4282 return ret;
4283 }
4284
4285 /*
4286 * Check if a stream is ready to be rotated after extracting it.
4287 *
4288 * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
4289 * error. Stream lock must be held.
4290 */
4291 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
4292 {
4293 if (stream->rotate_ready) {
4294 return 1;
4295 }
4296
4297 /*
4298 * If packet seq num is unavailable, it means we are interacting
4299 * with a pre-2.8 lttng-modules which does not implement the
4300 * sequence number. Rotation should never be used by sessiond in this
4301 * scenario.
4302 */
4303 if (stream->sequence_number_unavailable) {
4304 ERR("Internal error: rotation used on stream %" PRIu64
4305 " with unavailable sequence number",
4306 stream->key);
4307 return -1;
4308 }
4309
4310 if (stream->rotate_position == -1ULL ||
4311 stream->last_sequence_number == -1ULL) {
4312 return 0;
4313 }
4314
4315 /*
4316 * Rotate position not reached yet. The stream rotate position is
4317 * the position of the next packet belonging to the next trace chunk,
4318 * but consumerd considers rotation ready when reaching the last
4319 * packet of the current chunk, hence the "rotate_position - 1".
4320 */
4321 if (stream->last_sequence_number >= stream->rotate_position - 1) {
4322 return 1;
4323 }
4324
4325 return 0;
4326 }
4327
4328 /*
4329 * Reset the state for a stream after a rotation occurred.
4330 */
4331 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
4332 {
4333 stream->rotate_position = -1ULL;
4334 stream->rotate_ready = false;
4335 }
4336
4337 /*
4338 * Perform the rotation a local stream file.
4339 */
4340 static
4341 int rotate_local_stream(struct lttng_consumer_local_data *ctx,
4342 struct lttng_consumer_stream *stream)
4343 {
4344 int ret = 0;
4345
4346 DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
4347 stream->key,
4348 stream->chan->key);
4349 stream->tracefile_size_current = 0;
4350 stream->tracefile_count_current = 0;
4351
4352 if (stream->out_fd >= 0) {
4353 ret = close(stream->out_fd);
4354 if (ret) {
4355 PERROR("Failed to close stream out_fd of channel \"%s\"",
4356 stream->chan->name);
4357 }
4358 stream->out_fd = -1;
4359 }
4360
4361 if (stream->index_file) {
4362 lttng_index_file_put(stream->index_file);
4363 stream->index_file = NULL;
4364 }
4365
4366 if (!stream->trace_chunk) {
4367 goto end;
4368 }
4369
4370 ret = consumer_stream_create_output_files(stream, true);
4371 end:
4372 return ret;
4373 }
4374
4375 /*
4376 * Performs the stream rotation for the rotate session feature if needed.
4377 * It must be called with the channel and stream locks held.
4378 *
4379 * Return 0 on success, a negative number of error.
4380 */
4381 int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
4382 struct lttng_consumer_stream *stream)
4383 {
4384 int ret;
4385
4386 DBG("Consumer rotate stream %" PRIu64, stream->key);
4387
4388 /*
4389 * Update the stream's 'current' chunk to the session's (channel)
4390 * now-current chunk.
4391 */
4392 lttng_trace_chunk_put(stream->trace_chunk);
4393 if (stream->chan->trace_chunk == stream->trace_chunk) {
4394 /*
4395 * A channel can be rotated and not have a "next" chunk
4396 * to transition to. In that case, the channel's "current chunk"
4397 * has not been closed yet, but it has not been updated to
4398 * a "next" trace chunk either. Hence, the stream, like its
4399 * parent channel, becomes part of no chunk and can't output
4400 * anything until a new trace chunk is created.
4401 */
4402 stream->trace_chunk = NULL;
4403 } else if (stream->chan->trace_chunk &&
4404 !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
4405 ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
4406 ret = -1;
4407 goto error;
4408 } else {
4409 /*
4410 * Update the stream's trace chunk to its parent channel's
4411 * current trace chunk.
4412 */
4413 stream->trace_chunk = stream->chan->trace_chunk;
4414 }
4415
4416 if (stream->net_seq_idx == (uint64_t) -1ULL) {
4417 ret = rotate_local_stream(ctx, stream);
4418 if (ret < 0) {
4419 ERR("Failed to rotate stream, ret = %i", ret);
4420 goto error;
4421 }
4422 }
4423
4424 if (stream->metadata_flag && stream->trace_chunk) {
4425 /*
4426 * If the stream has transitioned to a new trace
4427 * chunk, the metadata should be re-dumped to the
4428 * newest chunk.
4429 *
4430 * However, it is possible for a stream to transition to
4431 * a "no-chunk" state. This can happen if a rotation
4432 * occurs on an inactive session. In such cases, the metadata
4433 * regeneration will happen when the next trace chunk is
4434 * created.
4435 */
4436 ret = consumer_metadata_stream_dump(stream);
4437 if (ret) {
4438 goto error;
4439 }
4440 }
4441 lttng_consumer_reset_stream_rotate_state(stream);
4442
4443 ret = 0;
4444
4445 error:
4446 return ret;
4447 }
4448
4449 /*
4450 * Rotate all the ready streams now.
4451 *
4452 * This is especially important for low throughput streams that have already
4453 * been consumed, we cannot wait for their next packet to perform the
4454 * rotation.
4455 * Need to be called with RCU read-side lock held to ensure existence of
4456 * channel.
4457 *
4458 * Returns 0 on success, < 0 on error
4459 */
4460 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
4461 uint64_t key, struct lttng_consumer_local_data *ctx)
4462 {
4463 int ret;
4464 struct lttng_consumer_stream *stream;
4465 struct lttng_ht_iter iter;
4466 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
4467
4468 rcu_read_lock();
4469
4470 DBG("Consumer rotate ready streams in channel %" PRIu64, key);
4471
4472 cds_lfht_for_each_entry_duplicate(ht->ht,
4473 ht->hash_fct(&channel->key, lttng_ht_seed),
4474 ht->match_fct, &channel->key, &iter.iter,
4475 stream, node_channel_id.node) {
4476 health_code_update();
4477
4478 pthread_mutex_lock(&stream->chan->lock);
4479 pthread_mutex_lock(&stream->lock);
4480
4481 if (!stream->rotate_ready) {
4482 pthread_mutex_unlock(&stream->lock);
4483 pthread_mutex_unlock(&stream->chan->lock);
4484 continue;
4485 }
4486 DBG("Consumer rotate ready stream %" PRIu64, stream->key);
4487
4488 ret = lttng_consumer_rotate_stream(ctx, stream);
4489 pthread_mutex_unlock(&stream->lock);
4490 pthread_mutex_unlock(&stream->chan->lock);
4491 if (ret) {
4492 goto end;
4493 }
4494 }
4495
4496 ret = 0;
4497
4498 end:
4499 rcu_read_unlock();
4500 return ret;
4501 }
4502
4503 enum lttcomm_return_code lttng_consumer_init_command(
4504 struct lttng_consumer_local_data *ctx,
4505 const lttng_uuid sessiond_uuid)
4506 {
4507 enum lttcomm_return_code ret;
4508 char uuid_str[LTTNG_UUID_STR_LEN];
4509
4510 if (ctx->sessiond_uuid.is_set) {
4511 ret = LTTCOMM_CONSUMERD_ALREADY_SET;
4512 goto end;
4513 }
4514
4515 ctx->sessiond_uuid.is_set = true;
4516 memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
4517 ret = LTTCOMM_CONSUMERD_SUCCESS;
4518 lttng_uuid_to_str(sessiond_uuid, uuid_str);
4519 DBG("Received session daemon UUID: %s", uuid_str);
4520 end:
4521 return ret;
4522 }
4523
4524 enum lttcomm_return_code lttng_consumer_create_trace_chunk(
4525 const uint64_t *relayd_id, uint64_t session_id,
4526 uint64_t chunk_id,
4527 time_t chunk_creation_timestamp,
4528 const char *chunk_override_name,
4529 const struct lttng_credentials *credentials,
4530 struct lttng_directory_handle *chunk_directory_handle)
4531 {
4532 int ret;
4533 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4534 struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL;
4535 enum lttng_trace_chunk_status chunk_status;
4536 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4537 char creation_timestamp_buffer[ISO8601_STR_LEN];
4538 const char *relayd_id_str = "(none)";
4539 const char *creation_timestamp_str;
4540 struct lttng_ht_iter iter;
4541 struct lttng_consumer_channel *channel;
4542
4543 if (relayd_id) {
4544 /* Only used for logging purposes. */
4545 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
4546 "%" PRIu64, *relayd_id);
4547 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4548 relayd_id_str = relayd_id_buffer;
4549 } else {
4550 relayd_id_str = "(formatting error)";
4551 }
4552 }
4553
4554 /* Local protocol error. */
4555 assert(chunk_creation_timestamp);
4556 ret = time_to_iso8601_str(chunk_creation_timestamp,
4557 creation_timestamp_buffer,
4558 sizeof(creation_timestamp_buffer));
4559 creation_timestamp_str = !ret ? creation_timestamp_buffer :
4560 "(formatting error)";
4561
4562 DBG("Consumer create trace chunk command: relay_id = %s"
4563 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
4564 ", chunk_override_name = %s"
4565 ", chunk_creation_timestamp = %s",
4566 relayd_id_str, session_id, chunk_id,
4567 chunk_override_name ? : "(none)",
4568 creation_timestamp_str);
4569
4570 /*
4571 * The trace chunk registry, as used by the consumer daemon, implicitly
4572 * owns the trace chunks. This is only needed in the consumer since
4573 * the consumer has no notion of a session beyond session IDs being
4574 * used to identify other objects.
4575 *
4576 * The lttng_trace_chunk_registry_publish() call below provides a
4577 * reference which is not released; it implicitly becomes the session
4578 * daemon's reference to the chunk in the consumer daemon.
4579 *
4580 * The lifetime of trace chunks in the consumer daemon is managed by
4581 * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
4582 * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
4583 */
4584 created_chunk = lttng_trace_chunk_create(chunk_id,
4585 chunk_creation_timestamp);
4586 if (!created_chunk) {
4587 ERR("Failed to create trace chunk");
4588 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4589 goto error;
4590 }
4591
4592 if (chunk_override_name) {
4593 chunk_status = lttng_trace_chunk_override_name(created_chunk,
4594 chunk_override_name);
4595 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4596 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4597 goto error;
4598 }
4599 }
4600
4601 if (chunk_directory_handle) {
4602 chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
4603 credentials);
4604 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4605 ERR("Failed to set trace chunk credentials");
4606 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4607 goto error;
4608 }
4609 /*
4610 * The consumer daemon has no ownership of the chunk output
4611 * directory.
4612 */
4613 chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
4614 chunk_directory_handle);
4615 chunk_directory_handle = NULL;
4616 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4617 ERR("Failed to set trace chunk's directory handle");
4618 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4619 goto error;
4620 }
4621 }
4622
4623 published_chunk = lttng_trace_chunk_registry_publish_chunk(
4624 consumer_data.chunk_registry, session_id,
4625 created_chunk);
4626 lttng_trace_chunk_put(created_chunk);
4627 created_chunk = NULL;
4628 if (!published_chunk) {
4629 ERR("Failed to publish trace chunk");
4630 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4631 goto error;
4632 }
4633
4634 rcu_read_lock();
4635 cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht,
4636 consumer_data.channels_by_session_id_ht->hash_fct(
4637 &session_id, lttng_ht_seed),
4638 consumer_data.channels_by_session_id_ht->match_fct,
4639 &session_id, &iter.iter, channel,
4640 channels_by_session_id_ht_node.node) {
4641 ret = lttng_consumer_channel_set_trace_chunk(channel,
4642 published_chunk);
4643 if (ret) {
4644 /*
4645 * Roll-back the creation of this chunk.
4646 *
4647 * This is important since the session daemon will
4648 * assume that the creation of this chunk failed and
4649 * will never ask for it to be closed, resulting
4650 * in a leak and an inconsistent state for some
4651 * channels.
4652 */
4653 enum lttcomm_return_code close_ret;
4654 char path[LTTNG_PATH_MAX];
4655
4656 DBG("Failed to set new trace chunk on existing channels, rolling back");
4657 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
4658 session_id, chunk_id,
4659 chunk_creation_timestamp, NULL,
4660 path);
4661 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4662 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4663 session_id, chunk_id);
4664 }
4665
4666 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4667 break;
4668 }
4669 }
4670
4671 if (relayd_id) {
4672 struct consumer_relayd_sock_pair *relayd;
4673
4674 relayd = consumer_find_relayd(*relayd_id);
4675 if (relayd) {
4676 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4677 ret = relayd_create_trace_chunk(
4678 &relayd->control_sock, published_chunk);
4679 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4680 } else {
4681 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
4682 }
4683
4684 if (!relayd || ret) {
4685 enum lttcomm_return_code close_ret;
4686 char path[LTTNG_PATH_MAX];
4687
4688 close_ret = lttng_consumer_close_trace_chunk(relayd_id,
4689 session_id,
4690 chunk_id,
4691 chunk_creation_timestamp,
4692 NULL, path);
4693 if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
4694 ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
4695 session_id,
4696 chunk_id);
4697 }
4698
4699 ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
4700 goto error_unlock;
4701 }
4702 }
4703 error_unlock:
4704 rcu_read_unlock();
4705 error:
4706 /* Release the reference returned by the "publish" operation. */
4707 lttng_trace_chunk_put(published_chunk);
4708 lttng_trace_chunk_put(created_chunk);
4709 return ret_code;
4710 }
4711
4712 enum lttcomm_return_code lttng_consumer_close_trace_chunk(
4713 const uint64_t *relayd_id, uint64_t session_id,
4714 uint64_t chunk_id, time_t chunk_close_timestamp,
4715 const enum lttng_trace_chunk_command_type *close_command,
4716 char *path)
4717 {
4718 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
4719 struct lttng_trace_chunk *chunk;
4720 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4721 const char *relayd_id_str = "(none)";
4722 const char *close_command_name = "none";
4723 struct lttng_ht_iter iter;
4724 struct lttng_consumer_channel *channel;
4725 enum lttng_trace_chunk_status chunk_status;
4726
4727 if (relayd_id) {
4728 int ret;
4729
4730 /* Only used for logging purposes. */
4731 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
4732 "%" PRIu64, *relayd_id);
4733 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4734 relayd_id_str = relayd_id_buffer;
4735 } else {
4736 relayd_id_str = "(formatting error)";
4737 }
4738 }
4739 if (close_command) {
4740 close_command_name = lttng_trace_chunk_command_type_get_name(
4741 *close_command);
4742 }
4743
4744 DBG("Consumer close trace chunk command: relayd_id = %s"
4745 ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
4746 ", close command = %s",
4747 relayd_id_str, session_id, chunk_id,
4748 close_command_name);
4749
4750 chunk = lttng_trace_chunk_registry_find_chunk(
4751 consumer_data.chunk_registry, session_id, chunk_id);
4752 if (!chunk) {
4753 ERR("Failed to find chunk: session_id = %" PRIu64
4754 ", chunk_id = %" PRIu64,
4755 session_id, chunk_id);
4756 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4757 goto end;
4758 }
4759
4760 chunk_status = lttng_trace_chunk_set_close_timestamp(chunk,
4761 chunk_close_timestamp);
4762 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4763 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4764 goto end;
4765 }
4766
4767 if (close_command) {
4768 chunk_status = lttng_trace_chunk_set_close_command(
4769 chunk, *close_command);
4770 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
4771 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4772 goto end;
4773 }
4774 }
4775
4776 /*
4777 * chunk is now invalid to access as we no longer hold a reference to
4778 * it; it is only kept around to compare it (by address) to the
4779 * current chunk found in the session's channels.
4780 */
4781 rcu_read_lock();
4782 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter,
4783 channel, node.node) {
4784 int ret;
4785
4786 /*
4787 * Only change the channel's chunk to NULL if it still
4788 * references the chunk being closed. The channel may
4789 * reference a newer channel in the case of a session
4790 * rotation. When a session rotation occurs, the "next"
4791 * chunk is created before the "current" chunk is closed.
4792 */
4793 if (channel->trace_chunk != chunk) {
4794 continue;
4795 }
4796 ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
4797 if (ret) {
4798 /*
4799 * Attempt to close the chunk on as many channels as
4800 * possible.
4801 */
4802 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4803 }
4804 }
4805
4806 if (relayd_id) {
4807 int ret;
4808 struct consumer_relayd_sock_pair *relayd;
4809
4810 relayd = consumer_find_relayd(*relayd_id);
4811 if (relayd) {
4812 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4813 ret = relayd_close_trace_chunk(
4814 &relayd->control_sock, chunk,
4815 path);
4816 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4817 } else {
4818 ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
4819 *relayd_id);
4820 }
4821
4822 if (!relayd || ret) {
4823 ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
4824 goto error_unlock;
4825 }
4826 }
4827 error_unlock:
4828 rcu_read_unlock();
4829 end:
4830 /*
4831 * Release the reference returned by the "find" operation and
4832 * the session daemon's implicit reference to the chunk.
4833 */
4834 lttng_trace_chunk_put(chunk);
4835 lttng_trace_chunk_put(chunk);
4836
4837 return ret_code;
4838 }
4839
4840 enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
4841 const uint64_t *relayd_id, uint64_t session_id,
4842 uint64_t chunk_id)
4843 {
4844 int ret;
4845 enum lttcomm_return_code ret_code;
4846 char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
4847 const char *relayd_id_str = "(none)";
4848 const bool is_local_trace = !relayd_id;
4849 struct consumer_relayd_sock_pair *relayd = NULL;
4850 bool chunk_exists_local, chunk_exists_remote;
4851
4852 if (relayd_id) {
4853 int ret;
4854
4855 /* Only used for logging purposes. */
4856 ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
4857 "%" PRIu64, *relayd_id);
4858 if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
4859 relayd_id_str = relayd_id_buffer;
4860 } else {
4861 relayd_id_str = "(formatting error)";
4862 }
4863 }
4864
4865 DBG("Consumer trace chunk exists command: relayd_id = %s"
4866 ", chunk_id = %" PRIu64, relayd_id_str,
4867 chunk_id);
4868 ret = lttng_trace_chunk_registry_chunk_exists(
4869 consumer_data.chunk_registry, session_id,
4870 chunk_id, &chunk_exists_local);
4871 if (ret) {
4872 /* Internal error. */
4873 ERR("Failed to query the existence of a trace chunk");
4874 ret_code = LTTCOMM_CONSUMERD_FATAL;
4875 goto end;
4876 }
4877 DBG("Trace chunk %s locally",
4878 chunk_exists_local ? "exists" : "does not exist");
4879 if (chunk_exists_local) {
4880 ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
4881 goto end;
4882 } else if (is_local_trace) {
4883 ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4884 goto end;
4885 }
4886
4887 rcu_read_lock();
4888 relayd = consumer_find_relayd(*relayd_id);
4889 if (!relayd) {
4890 ERR("Failed to find relayd %" PRIu64, *relayd_id);
4891 ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
4892 goto end_rcu_unlock;
4893 }
4894 DBG("Looking up existence of trace chunk on relay daemon");
4895 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
4896 ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
4897 &chunk_exists_remote);
4898 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
4899 if (ret < 0) {
4900 ERR("Failed to look-up the existence of trace chunk on relay daemon");
4901 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
4902 goto end_rcu_unlock;
4903 }
4904
4905 ret_code = chunk_exists_remote ?
4906 LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
4907 LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
4908 DBG("Trace chunk %s on relay daemon",
4909 chunk_exists_remote ? "exists" : "does not exist");
4910
4911 end_rcu_unlock:
4912 rcu_read_unlock();
4913 end:
4914 return ret_code;
4915 }
4916
4917 static
4918 int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
4919 {
4920 struct lttng_ht *ht;
4921 struct lttng_consumer_stream *stream;
4922 struct lttng_ht_iter iter;
4923 int ret;
4924
4925 ht = consumer_data.stream_per_chan_id_ht;
4926
4927 rcu_read_lock();
4928 cds_lfht_for_each_entry_duplicate(ht->ht,
4929 ht->hash_fct(&channel->key, lttng_ht_seed),
4930 ht->match_fct, &channel->key,
4931 &iter.iter, stream, node_channel_id.node) {
4932 /*
4933 * Protect against teardown with mutex.
4934 */
4935 pthread_mutex_lock(&stream->lock);
4936 if (cds_lfht_is_node_deleted(&stream->node.node)) {
4937 goto next;
4938 }
4939 ret = consumer_clear_stream(stream);
4940 if (ret) {
4941 goto error_unlock;
4942 }
4943 next:
4944 pthread_mutex_unlock(&stream->lock);
4945 }
4946 rcu_read_unlock();
4947 return LTTCOMM_CONSUMERD_SUCCESS;
4948
4949 error_unlock:
4950 pthread_mutex_unlock(&stream->lock);
4951 rcu_read_unlock();
4952 return ret;
4953 }
4954
4955 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
4956 {
4957 int ret;
4958
4959 DBG("Consumer clear channel %" PRIu64, channel->key);
4960
4961 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
4962 /*
4963 * Nothing to do for the metadata channel/stream.
4964 * Snapshot mechanism already take care of the metadata
4965 * handling/generation, and monitored channels only need to
4966 * have their data stream cleared..
4967 */
4968 ret = LTTCOMM_CONSUMERD_SUCCESS;
4969 goto end;
4970 }
4971
4972 if (!channel->monitor) {
4973 ret = consumer_clear_unmonitored_channel(channel);
4974 } else {
4975 ret = consumer_clear_monitored_channel(channel);
4976 }
4977 end:
4978 return ret;
4979 }
This page took 0.180784 seconds and 4 git commands to generate.