consumerd: register threads to health monitoring
[lttng-tools.git] / src / common / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <inttypes.h>
31 #include <signal.h>
32
33 #include <common/common.h>
34 #include <common/utils.h>
35 #include <common/compat/poll.h>
36 #include <common/index/index.h>
37 #include <common/kernel-ctl/kernel-ctl.h>
38 #include <common/sessiond-comm/relayd.h>
39 #include <common/sessiond-comm/sessiond-comm.h>
40 #include <common/kernel-consumer/kernel-consumer.h>
41 #include <common/relayd/relayd.h>
42 #include <common/ust-consumer/ust-consumer.h>
43 #include <common/consumer-timer.h>
44
45 #include "consumer.h"
46 #include "consumer-stream.h"
47 #include "../bin/lttng-consumerd/health-consumerd.h"
48
49 struct lttng_consumer_global_data consumer_data = {
50 .stream_count = 0,
51 .need_update = 1,
52 .type = LTTNG_CONSUMER_UNKNOWN,
53 };
54
55 enum consumer_channel_action {
56 CONSUMER_CHANNEL_ADD,
57 CONSUMER_CHANNEL_DEL,
58 CONSUMER_CHANNEL_QUIT,
59 };
60
61 struct consumer_channel_msg {
62 enum consumer_channel_action action;
63 struct lttng_consumer_channel *chan; /* add */
64 uint64_t key; /* del */
65 };
66
67 /*
68 * Flag to inform the polling thread to quit when all fd hung up. Updated by
69 * the consumer_thread_receive_fds when it notices that all fds has hung up.
70 * Also updated by the signal handler (consumer_should_exit()). Read by the
71 * polling threads.
72 */
73 volatile int consumer_quit;
74
75 /*
76 * Global hash table containing respectively metadata and data streams. The
77 * stream element in this ht should only be updated by the metadata poll thread
78 * for the metadata and the data poll thread for the data.
79 */
80 static struct lttng_ht *metadata_ht;
81 static struct lttng_ht *data_ht;
82
83 /*
84 * Notify a thread lttng pipe to poll back again. This usually means that some
85 * global state has changed so we just send back the thread in a poll wait
86 * call.
87 */
88 static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
89 {
90 struct lttng_consumer_stream *null_stream = NULL;
91
92 assert(pipe);
93
94 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
95 }
96
97 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
98 struct lttng_consumer_channel *chan,
99 uint64_t key,
100 enum consumer_channel_action action)
101 {
102 struct consumer_channel_msg msg;
103 int ret;
104
105 memset(&msg, 0, sizeof(msg));
106
107 msg.action = action;
108 msg.chan = chan;
109 msg.key = key;
110 do {
111 ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
112 } while (ret < 0 && errno == EINTR);
113 }
114
115 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
116 uint64_t key)
117 {
118 notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
119 }
120
121 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
122 struct lttng_consumer_channel **chan,
123 uint64_t *key,
124 enum consumer_channel_action *action)
125 {
126 struct consumer_channel_msg msg;
127 int ret;
128
129 do {
130 ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
131 } while (ret < 0 && errno == EINTR);
132 if (ret > 0) {
133 *action = msg.action;
134 *chan = msg.chan;
135 *key = msg.key;
136 }
137 return ret;
138 }
139
140 /*
141 * Find a stream. The consumer_data.lock must be locked during this
142 * call.
143 */
144 static struct lttng_consumer_stream *find_stream(uint64_t key,
145 struct lttng_ht *ht)
146 {
147 struct lttng_ht_iter iter;
148 struct lttng_ht_node_u64 *node;
149 struct lttng_consumer_stream *stream = NULL;
150
151 assert(ht);
152
153 /* -1ULL keys are lookup failures */
154 if (key == (uint64_t) -1ULL) {
155 return NULL;
156 }
157
158 rcu_read_lock();
159
160 lttng_ht_lookup(ht, &key, &iter);
161 node = lttng_ht_iter_get_node_u64(&iter);
162 if (node != NULL) {
163 stream = caa_container_of(node, struct lttng_consumer_stream, node);
164 }
165
166 rcu_read_unlock();
167
168 return stream;
169 }
170
171 static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
172 {
173 struct lttng_consumer_stream *stream;
174
175 rcu_read_lock();
176 stream = find_stream(key, ht);
177 if (stream) {
178 stream->key = (uint64_t) -1ULL;
179 /*
180 * We don't want the lookup to match, but we still need
181 * to iterate on this stream when iterating over the hash table. Just
182 * change the node key.
183 */
184 stream->node.key = (uint64_t) -1ULL;
185 }
186 rcu_read_unlock();
187 }
188
189 /*
190 * Return a channel object for the given key.
191 *
192 * RCU read side lock MUST be acquired before calling this function and
193 * protects the channel ptr.
194 */
195 struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
196 {
197 struct lttng_ht_iter iter;
198 struct lttng_ht_node_u64 *node;
199 struct lttng_consumer_channel *channel = NULL;
200
201 /* -1ULL keys are lookup failures */
202 if (key == (uint64_t) -1ULL) {
203 return NULL;
204 }
205
206 lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
207 node = lttng_ht_iter_get_node_u64(&iter);
208 if (node != NULL) {
209 channel = caa_container_of(node, struct lttng_consumer_channel, node);
210 }
211
212 return channel;
213 }
214
215 static void free_stream_rcu(struct rcu_head *head)
216 {
217 struct lttng_ht_node_u64 *node =
218 caa_container_of(head, struct lttng_ht_node_u64, head);
219 struct lttng_consumer_stream *stream =
220 caa_container_of(node, struct lttng_consumer_stream, node);
221
222 free(stream);
223 }
224
225 static void free_channel_rcu(struct rcu_head *head)
226 {
227 struct lttng_ht_node_u64 *node =
228 caa_container_of(head, struct lttng_ht_node_u64, head);
229 struct lttng_consumer_channel *channel =
230 caa_container_of(node, struct lttng_consumer_channel, node);
231
232 free(channel);
233 }
234
235 /*
236 * RCU protected relayd socket pair free.
237 */
238 static void free_relayd_rcu(struct rcu_head *head)
239 {
240 struct lttng_ht_node_u64 *node =
241 caa_container_of(head, struct lttng_ht_node_u64, head);
242 struct consumer_relayd_sock_pair *relayd =
243 caa_container_of(node, struct consumer_relayd_sock_pair, node);
244
245 /*
246 * Close all sockets. This is done in the call RCU since we don't want the
247 * socket fds to be reassigned thus potentially creating bad state of the
248 * relayd object.
249 *
250 * We do not have to lock the control socket mutex here since at this stage
251 * there is no one referencing to this relayd object.
252 */
253 (void) relayd_close(&relayd->control_sock);
254 (void) relayd_close(&relayd->data_sock);
255
256 free(relayd);
257 }
258
259 /*
260 * Destroy and free relayd socket pair object.
261 */
262 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
263 {
264 int ret;
265 struct lttng_ht_iter iter;
266
267 if (relayd == NULL) {
268 return;
269 }
270
271 DBG("Consumer destroy and close relayd socket pair");
272
273 iter.iter.node = &relayd->node.node;
274 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
275 if (ret != 0) {
276 /* We assume the relayd is being or is destroyed */
277 return;
278 }
279
280 /* RCU free() call */
281 call_rcu(&relayd->node.head, free_relayd_rcu);
282 }
283
284 /*
285 * Remove a channel from the global list protected by a mutex. This function is
286 * also responsible for freeing its data structures.
287 */
288 void consumer_del_channel(struct lttng_consumer_channel *channel)
289 {
290 int ret;
291 struct lttng_ht_iter iter;
292 struct lttng_consumer_stream *stream, *stmp;
293
294 DBG("Consumer delete channel key %" PRIu64, channel->key);
295
296 pthread_mutex_lock(&consumer_data.lock);
297 pthread_mutex_lock(&channel->lock);
298
299 /* Delete streams that might have been left in the stream list. */
300 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
301 send_node) {
302 cds_list_del(&stream->send_node);
303 /*
304 * Once a stream is added to this list, the buffers were created so
305 * we have a guarantee that this call will succeed.
306 */
307 consumer_stream_destroy(stream, NULL);
308 }
309
310 if (channel->live_timer_enabled == 1) {
311 consumer_timer_live_stop(channel);
312 }
313
314 switch (consumer_data.type) {
315 case LTTNG_CONSUMER_KERNEL:
316 break;
317 case LTTNG_CONSUMER32_UST:
318 case LTTNG_CONSUMER64_UST:
319 lttng_ustconsumer_del_channel(channel);
320 break;
321 default:
322 ERR("Unknown consumer_data type");
323 assert(0);
324 goto end;
325 }
326
327 rcu_read_lock();
328 iter.iter.node = &channel->node.node;
329 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
330 assert(!ret);
331 rcu_read_unlock();
332
333 call_rcu(&channel->node.head, free_channel_rcu);
334 end:
335 pthread_mutex_unlock(&channel->lock);
336 pthread_mutex_unlock(&consumer_data.lock);
337 }
338
339 /*
340 * Iterate over the relayd hash table and destroy each element. Finally,
341 * destroy the whole hash table.
342 */
343 static void cleanup_relayd_ht(void)
344 {
345 struct lttng_ht_iter iter;
346 struct consumer_relayd_sock_pair *relayd;
347
348 rcu_read_lock();
349
350 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
351 node.node) {
352 consumer_destroy_relayd(relayd);
353 }
354
355 rcu_read_unlock();
356
357 lttng_ht_destroy(consumer_data.relayd_ht);
358 }
359
360 /*
361 * Update the end point status of all streams having the given network sequence
362 * index (relayd index).
363 *
364 * It's atomically set without having the stream mutex locked which is fine
365 * because we handle the write/read race with a pipe wakeup for each thread.
366 */
367 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
368 enum consumer_endpoint_status status)
369 {
370 struct lttng_ht_iter iter;
371 struct lttng_consumer_stream *stream;
372
373 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
374
375 rcu_read_lock();
376
377 /* Let's begin with metadata */
378 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
379 if (stream->net_seq_idx == net_seq_idx) {
380 uatomic_set(&stream->endpoint_status, status);
381 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
382 }
383 }
384
385 /* Follow up by the data streams */
386 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
387 if (stream->net_seq_idx == net_seq_idx) {
388 uatomic_set(&stream->endpoint_status, status);
389 DBG("Delete flag set to data stream %d", stream->wait_fd);
390 }
391 }
392 rcu_read_unlock();
393 }
394
395 /*
396 * Cleanup a relayd object by flagging every associated streams for deletion,
397 * destroying the object meaning removing it from the relayd hash table,
398 * closing the sockets and freeing the memory in a RCU call.
399 *
400 * If a local data context is available, notify the threads that the streams'
401 * state have changed.
402 */
403 static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
404 struct lttng_consumer_local_data *ctx)
405 {
406 uint64_t netidx;
407
408 assert(relayd);
409
410 DBG("Cleaning up relayd sockets");
411
412 /* Save the net sequence index before destroying the object */
413 netidx = relayd->net_seq_idx;
414
415 /*
416 * Delete the relayd from the relayd hash table, close the sockets and free
417 * the object in a RCU call.
418 */
419 consumer_destroy_relayd(relayd);
420
421 /* Set inactive endpoint to all streams */
422 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
423
424 /*
425 * With a local data context, notify the threads that the streams' state
426 * have changed. The write() action on the pipe acts as an "implicit"
427 * memory barrier ordering the updates of the end point status from the
428 * read of this status which happens AFTER receiving this notify.
429 */
430 if (ctx) {
431 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
432 notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
433 }
434 }
435
436 /*
437 * Flag a relayd socket pair for destruction. Destroy it if the refcount
438 * reaches zero.
439 *
440 * RCU read side lock MUST be aquired before calling this function.
441 */
442 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
443 {
444 assert(relayd);
445
446 /* Set destroy flag for this object */
447 uatomic_set(&relayd->destroy_flag, 1);
448
449 /* Destroy the relayd if refcount is 0 */
450 if (uatomic_read(&relayd->refcount) == 0) {
451 consumer_destroy_relayd(relayd);
452 }
453 }
454
455 /*
456 * Completly destroy stream from every visiable data structure and the given
457 * hash table if one.
458 *
459 * One this call returns, the stream object is not longer usable nor visible.
460 */
461 void consumer_del_stream(struct lttng_consumer_stream *stream,
462 struct lttng_ht *ht)
463 {
464 consumer_stream_destroy(stream, ht);
465 }
466
467 /*
468 * XXX naming of del vs destroy is all mixed up.
469 */
470 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
471 {
472 consumer_stream_destroy(stream, data_ht);
473 }
474
475 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
476 {
477 consumer_stream_destroy(stream, metadata_ht);
478 }
479
480 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
481 uint64_t stream_key,
482 enum lttng_consumer_stream_state state,
483 const char *channel_name,
484 uid_t uid,
485 gid_t gid,
486 uint64_t relayd_id,
487 uint64_t session_id,
488 int cpu,
489 int *alloc_ret,
490 enum consumer_channel_type type,
491 unsigned int monitor)
492 {
493 int ret;
494 struct lttng_consumer_stream *stream;
495
496 stream = zmalloc(sizeof(*stream));
497 if (stream == NULL) {
498 PERROR("malloc struct lttng_consumer_stream");
499 ret = -ENOMEM;
500 goto end;
501 }
502
503 rcu_read_lock();
504
505 stream->key = stream_key;
506 stream->out_fd = -1;
507 stream->out_fd_offset = 0;
508 stream->output_written = 0;
509 stream->state = state;
510 stream->uid = uid;
511 stream->gid = gid;
512 stream->net_seq_idx = relayd_id;
513 stream->session_id = session_id;
514 stream->monitor = monitor;
515 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
516 stream->index_fd = -1;
517 pthread_mutex_init(&stream->lock, NULL);
518
519 /* If channel is the metadata, flag this stream as metadata. */
520 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
521 stream->metadata_flag = 1;
522 /* Metadata is flat out. */
523 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
524 /* Live rendez-vous point. */
525 pthread_cond_init(&stream->metadata_rdv, NULL);
526 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
527 } else {
528 /* Format stream name to <channel_name>_<cpu_number> */
529 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
530 channel_name, cpu);
531 if (ret < 0) {
532 PERROR("snprintf stream name");
533 goto error;
534 }
535 }
536
537 /* Key is always the wait_fd for streams. */
538 lttng_ht_node_init_u64(&stream->node, stream->key);
539
540 /* Init node per channel id key */
541 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
542
543 /* Init session id node with the stream session id */
544 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
545
546 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
547 " relayd_id %" PRIu64 ", session_id %" PRIu64,
548 stream->name, stream->key, channel_key,
549 stream->net_seq_idx, stream->session_id);
550
551 rcu_read_unlock();
552 return stream;
553
554 error:
555 rcu_read_unlock();
556 free(stream);
557 end:
558 if (alloc_ret) {
559 *alloc_ret = ret;
560 }
561 return NULL;
562 }
563
564 /*
565 * Add a stream to the global list protected by a mutex.
566 */
567 int consumer_add_data_stream(struct lttng_consumer_stream *stream)
568 {
569 struct lttng_ht *ht = data_ht;
570 int ret = 0;
571
572 assert(stream);
573 assert(ht);
574
575 DBG3("Adding consumer stream %" PRIu64, stream->key);
576
577 pthread_mutex_lock(&consumer_data.lock);
578 pthread_mutex_lock(&stream->chan->lock);
579 pthread_mutex_lock(&stream->chan->timer_lock);
580 pthread_mutex_lock(&stream->lock);
581 rcu_read_lock();
582
583 /* Steal stream identifier to avoid having streams with the same key */
584 steal_stream_key(stream->key, ht);
585
586 lttng_ht_add_unique_u64(ht, &stream->node);
587
588 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
589 &stream->node_channel_id);
590
591 /*
592 * Add stream to the stream_list_ht of the consumer data. No need to steal
593 * the key since the HT does not use it and we allow to add redundant keys
594 * into this table.
595 */
596 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
597
598 /*
599 * When nb_init_stream_left reaches 0, we don't need to trigger any action
600 * in terms of destroying the associated channel, because the action that
601 * causes the count to become 0 also causes a stream to be added. The
602 * channel deletion will thus be triggered by the following removal of this
603 * stream.
604 */
605 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
606 /* Increment refcount before decrementing nb_init_stream_left */
607 cmm_smp_wmb();
608 uatomic_dec(&stream->chan->nb_init_stream_left);
609 }
610
611 /* Update consumer data once the node is inserted. */
612 consumer_data.stream_count++;
613 consumer_data.need_update = 1;
614
615 rcu_read_unlock();
616 pthread_mutex_unlock(&stream->lock);
617 pthread_mutex_unlock(&stream->chan->timer_lock);
618 pthread_mutex_unlock(&stream->chan->lock);
619 pthread_mutex_unlock(&consumer_data.lock);
620
621 return ret;
622 }
623
624 void consumer_del_data_stream(struct lttng_consumer_stream *stream)
625 {
626 consumer_del_stream(stream, data_ht);
627 }
628
629 /*
630 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
631 * be acquired before calling this.
632 */
633 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
634 {
635 int ret = 0;
636 struct lttng_ht_node_u64 *node;
637 struct lttng_ht_iter iter;
638
639 assert(relayd);
640
641 lttng_ht_lookup(consumer_data.relayd_ht,
642 &relayd->net_seq_idx, &iter);
643 node = lttng_ht_iter_get_node_u64(&iter);
644 if (node != NULL) {
645 goto end;
646 }
647 lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
648
649 end:
650 return ret;
651 }
652
653 /*
654 * Allocate and return a consumer relayd socket.
655 */
656 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
657 uint64_t net_seq_idx)
658 {
659 struct consumer_relayd_sock_pair *obj = NULL;
660
661 /* net sequence index of -1 is a failure */
662 if (net_seq_idx == (uint64_t) -1ULL) {
663 goto error;
664 }
665
666 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
667 if (obj == NULL) {
668 PERROR("zmalloc relayd sock");
669 goto error;
670 }
671
672 obj->net_seq_idx = net_seq_idx;
673 obj->refcount = 0;
674 obj->destroy_flag = 0;
675 obj->control_sock.sock.fd = -1;
676 obj->data_sock.sock.fd = -1;
677 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
678 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
679
680 error:
681 return obj;
682 }
683
684 /*
685 * Find a relayd socket pair in the global consumer data.
686 *
687 * Return the object if found else NULL.
688 * RCU read-side lock must be held across this call and while using the
689 * returned object.
690 */
691 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
692 {
693 struct lttng_ht_iter iter;
694 struct lttng_ht_node_u64 *node;
695 struct consumer_relayd_sock_pair *relayd = NULL;
696
697 /* Negative keys are lookup failures */
698 if (key == (uint64_t) -1ULL) {
699 goto error;
700 }
701
702 lttng_ht_lookup(consumer_data.relayd_ht, &key,
703 &iter);
704 node = lttng_ht_iter_get_node_u64(&iter);
705 if (node != NULL) {
706 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
707 }
708
709 error:
710 return relayd;
711 }
712
713 /*
714 * Find a relayd and send the stream
715 *
716 * Returns 0 on success, < 0 on error
717 */
718 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
719 char *path)
720 {
721 int ret = 0;
722 struct consumer_relayd_sock_pair *relayd;
723
724 assert(stream);
725 assert(stream->net_seq_idx != -1ULL);
726 assert(path);
727
728 /* The stream is not metadata. Get relayd reference if exists. */
729 rcu_read_lock();
730 relayd = consumer_find_relayd(stream->net_seq_idx);
731 if (relayd != NULL) {
732 /* Add stream on the relayd */
733 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
734 ret = relayd_add_stream(&relayd->control_sock, stream->name,
735 path, &stream->relayd_stream_id,
736 stream->chan->tracefile_size, stream->chan->tracefile_count);
737 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
738 if (ret < 0) {
739 goto end;
740 }
741
742 uatomic_inc(&relayd->refcount);
743 stream->sent_to_relayd = 1;
744 } else {
745 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
746 stream->key, stream->net_seq_idx);
747 ret = -1;
748 goto end;
749 }
750
751 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
752 stream->name, stream->key, stream->net_seq_idx);
753
754 end:
755 rcu_read_unlock();
756 return ret;
757 }
758
759 /*
760 * Find a relayd and close the stream
761 */
762 void close_relayd_stream(struct lttng_consumer_stream *stream)
763 {
764 struct consumer_relayd_sock_pair *relayd;
765
766 /* The stream is not metadata. Get relayd reference if exists. */
767 rcu_read_lock();
768 relayd = consumer_find_relayd(stream->net_seq_idx);
769 if (relayd) {
770 consumer_stream_relayd_close(stream, relayd);
771 }
772 rcu_read_unlock();
773 }
774
775 /*
776 * Handle stream for relayd transmission if the stream applies for network
777 * streaming where the net sequence index is set.
778 *
779 * Return destination file descriptor or negative value on error.
780 */
781 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
782 size_t data_size, unsigned long padding,
783 struct consumer_relayd_sock_pair *relayd)
784 {
785 int outfd = -1, ret;
786 struct lttcomm_relayd_data_hdr data_hdr;
787
788 /* Safety net */
789 assert(stream);
790 assert(relayd);
791
792 /* Reset data header */
793 memset(&data_hdr, 0, sizeof(data_hdr));
794
795 if (stream->metadata_flag) {
796 /* Caller MUST acquire the relayd control socket lock */
797 ret = relayd_send_metadata(&relayd->control_sock, data_size);
798 if (ret < 0) {
799 goto error;
800 }
801
802 /* Metadata are always sent on the control socket. */
803 outfd = relayd->control_sock.sock.fd;
804 } else {
805 /* Set header with stream information */
806 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
807 data_hdr.data_size = htobe32(data_size);
808 data_hdr.padding_size = htobe32(padding);
809 /*
810 * Note that net_seq_num below is assigned with the *current* value of
811 * next_net_seq_num and only after that the next_net_seq_num will be
812 * increment. This is why when issuing a command on the relayd using
813 * this next value, 1 should always be substracted in order to compare
814 * the last seen sequence number on the relayd side to the last sent.
815 */
816 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
817 /* Other fields are zeroed previously */
818
819 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
820 sizeof(data_hdr));
821 if (ret < 0) {
822 goto error;
823 }
824
825 ++stream->next_net_seq_num;
826
827 /* Set to go on data socket */
828 outfd = relayd->data_sock.sock.fd;
829 }
830
831 error:
832 return outfd;
833 }
834
835 /*
836 * Allocate and return a new lttng_consumer_channel object using the given key
837 * to initialize the hash table node.
838 *
839 * On error, return NULL.
840 */
841 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
842 uint64_t session_id,
843 const char *pathname,
844 const char *name,
845 uid_t uid,
846 gid_t gid,
847 uint64_t relayd_id,
848 enum lttng_event_output output,
849 uint64_t tracefile_size,
850 uint64_t tracefile_count,
851 uint64_t session_id_per_pid,
852 unsigned int monitor,
853 unsigned int live_timer_interval)
854 {
855 struct lttng_consumer_channel *channel;
856
857 channel = zmalloc(sizeof(*channel));
858 if (channel == NULL) {
859 PERROR("malloc struct lttng_consumer_channel");
860 goto end;
861 }
862
863 channel->key = key;
864 channel->refcount = 0;
865 channel->session_id = session_id;
866 channel->session_id_per_pid = session_id_per_pid;
867 channel->uid = uid;
868 channel->gid = gid;
869 channel->relayd_id = relayd_id;
870 channel->output = output;
871 channel->tracefile_size = tracefile_size;
872 channel->tracefile_count = tracefile_count;
873 channel->monitor = monitor;
874 channel->live_timer_interval = live_timer_interval;
875 pthread_mutex_init(&channel->lock, NULL);
876 pthread_mutex_init(&channel->timer_lock, NULL);
877
878 /*
879 * In monitor mode, the streams associated with the channel will be put in
880 * a special list ONLY owned by this channel. So, the refcount is set to 1
881 * here meaning that the channel itself has streams that are referenced.
882 *
883 * On a channel deletion, once the channel is no longer visible, the
884 * refcount is decremented and checked for a zero value to delete it. With
885 * streams in no monitor mode, it will now be safe to destroy the channel.
886 */
887 if (!channel->monitor) {
888 channel->refcount = 1;
889 }
890
891 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
892 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
893
894 strncpy(channel->name, name, sizeof(channel->name));
895 channel->name[sizeof(channel->name) - 1] = '\0';
896
897 lttng_ht_node_init_u64(&channel->node, channel->key);
898
899 channel->wait_fd = -1;
900
901 CDS_INIT_LIST_HEAD(&channel->streams.head);
902
903 DBG("Allocated channel (key %" PRIu64 ")", channel->key)
904
905 end:
906 return channel;
907 }
908
909 /*
910 * Add a channel to the global list protected by a mutex.
911 *
912 * On success 0 is returned else a negative value.
913 */
914 int consumer_add_channel(struct lttng_consumer_channel *channel,
915 struct lttng_consumer_local_data *ctx)
916 {
917 int ret = 0;
918 struct lttng_ht_node_u64 *node;
919 struct lttng_ht_iter iter;
920
921 pthread_mutex_lock(&consumer_data.lock);
922 pthread_mutex_lock(&channel->lock);
923 pthread_mutex_lock(&channel->timer_lock);
924 rcu_read_lock();
925
926 lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
927 node = lttng_ht_iter_get_node_u64(&iter);
928 if (node != NULL) {
929 /* Channel already exist. Ignore the insertion */
930 ERR("Consumer add channel key %" PRIu64 " already exists!",
931 channel->key);
932 ret = -EEXIST;
933 goto end;
934 }
935
936 lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
937
938 end:
939 rcu_read_unlock();
940 pthread_mutex_unlock(&channel->timer_lock);
941 pthread_mutex_unlock(&channel->lock);
942 pthread_mutex_unlock(&consumer_data.lock);
943
944 if (!ret && channel->wait_fd != -1 &&
945 channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
946 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
947 }
948 return ret;
949 }
950
951 /*
952 * Allocate the pollfd structure and the local view of the out fds to avoid
953 * doing a lookup in the linked list and concurrency issues when writing is
954 * needed. Called with consumer_data.lock held.
955 *
956 * Returns the number of fds in the structures.
957 */
958 static int update_poll_array(struct lttng_consumer_local_data *ctx,
959 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
960 struct lttng_ht *ht)
961 {
962 int i = 0;
963 struct lttng_ht_iter iter;
964 struct lttng_consumer_stream *stream;
965
966 assert(ctx);
967 assert(ht);
968 assert(pollfd);
969 assert(local_stream);
970
971 DBG("Updating poll fd array");
972 rcu_read_lock();
973 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
974 /*
975 * Only active streams with an active end point can be added to the
976 * poll set and local stream storage of the thread.
977 *
978 * There is a potential race here for endpoint_status to be updated
979 * just after the check. However, this is OK since the stream(s) will
980 * be deleted once the thread is notified that the end point state has
981 * changed where this function will be called back again.
982 */
983 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
984 stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
985 continue;
986 }
987 /*
988 * This clobbers way too much the debug output. Uncomment that if you
989 * need it for debugging purposes.
990 *
991 * DBG("Active FD %d", stream->wait_fd);
992 */
993 (*pollfd)[i].fd = stream->wait_fd;
994 (*pollfd)[i].events = POLLIN | POLLPRI;
995 local_stream[i] = stream;
996 i++;
997 }
998 rcu_read_unlock();
999
1000 /*
1001 * Insert the consumer_data_pipe at the end of the array and don't
1002 * increment i so nb_fd is the number of real FD.
1003 */
1004 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
1005 (*pollfd)[i].events = POLLIN | POLLPRI;
1006 return i;
1007 }
1008
1009 /*
1010 * Poll on the should_quit pipe and the command socket return -1 on error and
1011 * should exit, 0 if data is available on the command socket
1012 */
1013 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1014 {
1015 int num_rdy;
1016
1017 restart:
1018 num_rdy = poll(consumer_sockpoll, 2, -1);
1019 if (num_rdy == -1) {
1020 /*
1021 * Restart interrupted system call.
1022 */
1023 if (errno == EINTR) {
1024 goto restart;
1025 }
1026 PERROR("Poll error");
1027 goto exit;
1028 }
1029 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
1030 DBG("consumer_should_quit wake up");
1031 goto exit;
1032 }
1033 return 0;
1034
1035 exit:
1036 return -1;
1037 }
1038
1039 /*
1040 * Set the error socket.
1041 */
1042 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
1043 int sock)
1044 {
1045 ctx->consumer_error_socket = sock;
1046 }
1047
1048 /*
1049 * Set the command socket path.
1050 */
1051 void lttng_consumer_set_command_sock_path(
1052 struct lttng_consumer_local_data *ctx, char *sock)
1053 {
1054 ctx->consumer_command_sock_path = sock;
1055 }
1056
1057 /*
1058 * Send return code to the session daemon.
1059 * If the socket is not defined, we return 0, it is not a fatal error
1060 */
1061 int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
1062 {
1063 if (ctx->consumer_error_socket > 0) {
1064 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
1065 sizeof(enum lttcomm_sessiond_command));
1066 }
1067
1068 return 0;
1069 }
1070
1071 /*
1072 * Close all the tracefiles and stream fds and MUST be called when all
1073 * instances are destroyed i.e. when all threads were joined and are ended.
1074 */
1075 void lttng_consumer_cleanup(void)
1076 {
1077 struct lttng_ht_iter iter;
1078 struct lttng_consumer_channel *channel;
1079
1080 rcu_read_lock();
1081
1082 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
1083 node.node) {
1084 consumer_del_channel(channel);
1085 }
1086
1087 rcu_read_unlock();
1088
1089 lttng_ht_destroy(consumer_data.channel_ht);
1090
1091 cleanup_relayd_ht();
1092
1093 lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
1094
1095 /*
1096 * This HT contains streams that are freed by either the metadata thread or
1097 * the data thread so we do *nothing* on the hash table and simply destroy
1098 * it.
1099 */
1100 lttng_ht_destroy(consumer_data.stream_list_ht);
1101 }
1102
1103 /*
1104 * Called from signal handler.
1105 */
1106 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1107 {
1108 int ret;
1109 consumer_quit = 1;
1110 do {
1111 ret = write(ctx->consumer_should_quit[1], "4", 1);
1112 } while (ret < 0 && errno == EINTR);
1113 if (ret < 0 || ret != 1) {
1114 PERROR("write consumer quit");
1115 }
1116
1117 DBG("Consumer flag that it should quit");
1118 }
1119
1120 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1121 off_t orig_offset)
1122 {
1123 int outfd = stream->out_fd;
1124
1125 /*
1126 * This does a blocking write-and-wait on any page that belongs to the
1127 * subbuffer prior to the one we just wrote.
1128 * Don't care about error values, as these are just hints and ways to
1129 * limit the amount of page cache used.
1130 */
1131 if (orig_offset < stream->max_sb_size) {
1132 return;
1133 }
1134 lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
1135 stream->max_sb_size,
1136 SYNC_FILE_RANGE_WAIT_BEFORE
1137 | SYNC_FILE_RANGE_WRITE
1138 | SYNC_FILE_RANGE_WAIT_AFTER);
1139 /*
1140 * Give hints to the kernel about how we access the file:
1141 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1142 * we write it.
1143 *
1144 * We need to call fadvise again after the file grows because the
1145 * kernel does not seem to apply fadvise to non-existing parts of the
1146 * file.
1147 *
1148 * Call fadvise _after_ having waited for the page writeback to
1149 * complete because the dirty page writeback semantic is not well
1150 * defined. So it can be expected to lead to lower throughput in
1151 * streaming.
1152 */
1153 posix_fadvise(outfd, orig_offset - stream->max_sb_size,
1154 stream->max_sb_size, POSIX_FADV_DONTNEED);
1155 }
1156
1157 /*
1158 * Initialise the necessary environnement :
1159 * - create a new context
1160 * - create the poll_pipe
1161 * - create the should_quit pipe (for signal handler)
1162 * - create the thread pipe (for splice)
1163 *
1164 * Takes a function pointer as argument, this function is called when data is
1165 * available on a buffer. This function is responsible to do the
1166 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1167 * buffer configuration and then kernctl_put_next_subbuf at the end.
1168 *
1169 * Returns a pointer to the new context or NULL on error.
1170 */
1171 struct lttng_consumer_local_data *lttng_consumer_create(
1172 enum lttng_consumer_type type,
1173 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1174 struct lttng_consumer_local_data *ctx),
1175 int (*recv_channel)(struct lttng_consumer_channel *channel),
1176 int (*recv_stream)(struct lttng_consumer_stream *stream),
1177 int (*update_stream)(uint64_t stream_key, uint32_t state))
1178 {
1179 int ret;
1180 struct lttng_consumer_local_data *ctx;
1181
1182 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1183 consumer_data.type == type);
1184 consumer_data.type = type;
1185
1186 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
1187 if (ctx == NULL) {
1188 PERROR("allocating context");
1189 goto error;
1190 }
1191
1192 ctx->consumer_error_socket = -1;
1193 ctx->consumer_metadata_socket = -1;
1194 pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
1195 /* assign the callbacks */
1196 ctx->on_buffer_ready = buffer_ready;
1197 ctx->on_recv_channel = recv_channel;
1198 ctx->on_recv_stream = recv_stream;
1199 ctx->on_update_stream = update_stream;
1200
1201 ctx->consumer_data_pipe = lttng_pipe_open(0);
1202 if (!ctx->consumer_data_pipe) {
1203 goto error_poll_pipe;
1204 }
1205
1206 ret = pipe(ctx->consumer_should_quit);
1207 if (ret < 0) {
1208 PERROR("Error creating recv pipe");
1209 goto error_quit_pipe;
1210 }
1211
1212 ret = pipe(ctx->consumer_thread_pipe);
1213 if (ret < 0) {
1214 PERROR("Error creating thread pipe");
1215 goto error_thread_pipe;
1216 }
1217
1218 ret = pipe(ctx->consumer_channel_pipe);
1219 if (ret < 0) {
1220 PERROR("Error creating channel pipe");
1221 goto error_channel_pipe;
1222 }
1223
1224 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1225 if (!ctx->consumer_metadata_pipe) {
1226 goto error_metadata_pipe;
1227 }
1228
1229 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1230 if (ret < 0) {
1231 goto error_splice_pipe;
1232 }
1233
1234 return ctx;
1235
1236 error_splice_pipe:
1237 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1238 error_metadata_pipe:
1239 utils_close_pipe(ctx->consumer_channel_pipe);
1240 error_channel_pipe:
1241 utils_close_pipe(ctx->consumer_thread_pipe);
1242 error_thread_pipe:
1243 utils_close_pipe(ctx->consumer_should_quit);
1244 error_quit_pipe:
1245 lttng_pipe_destroy(ctx->consumer_data_pipe);
1246 error_poll_pipe:
1247 free(ctx);
1248 error:
1249 return NULL;
1250 }
1251
1252 /*
1253 * Close all fds associated with the instance and free the context.
1254 */
1255 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1256 {
1257 int ret;
1258
1259 DBG("Consumer destroying it. Closing everything.");
1260
1261 ret = close(ctx->consumer_error_socket);
1262 if (ret) {
1263 PERROR("close");
1264 }
1265 ret = close(ctx->consumer_metadata_socket);
1266 if (ret) {
1267 PERROR("close");
1268 }
1269 utils_close_pipe(ctx->consumer_thread_pipe);
1270 utils_close_pipe(ctx->consumer_channel_pipe);
1271 lttng_pipe_destroy(ctx->consumer_data_pipe);
1272 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1273 utils_close_pipe(ctx->consumer_should_quit);
1274 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1275
1276 unlink(ctx->consumer_command_sock_path);
1277 free(ctx);
1278 }
1279
1280 /*
1281 * Write the metadata stream id on the specified file descriptor.
1282 */
1283 static int write_relayd_metadata_id(int fd,
1284 struct lttng_consumer_stream *stream,
1285 struct consumer_relayd_sock_pair *relayd, unsigned long padding)
1286 {
1287 int ret;
1288 struct lttcomm_relayd_metadata_payload hdr;
1289
1290 hdr.stream_id = htobe64(stream->relayd_stream_id);
1291 hdr.padding_size = htobe32(padding);
1292 do {
1293 ret = write(fd, (void *) &hdr, sizeof(hdr));
1294 } while (ret < 0 && errno == EINTR);
1295 if (ret < 0 || ret != sizeof(hdr)) {
1296 /*
1297 * This error means that the fd's end is closed so ignore the perror
1298 * not to clubber the error output since this can happen in a normal
1299 * code path.
1300 */
1301 if (errno != EPIPE) {
1302 PERROR("write metadata stream id");
1303 }
1304 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
1305 /*
1306 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1307 * handle writting the missing part so report that as an error and
1308 * don't lie to the caller.
1309 */
1310 ret = -1;
1311 goto end;
1312 }
1313 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1314 stream->relayd_stream_id, padding);
1315
1316 end:
1317 return ret;
1318 }
1319
1320 /*
1321 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1322 * core function for writing trace buffers to either the local filesystem or
1323 * the network.
1324 *
1325 * It must be called with the stream lock held.
1326 *
1327 * Careful review MUST be put if any changes occur!
1328 *
1329 * Returns the number of bytes written
1330 */
1331 ssize_t lttng_consumer_on_read_subbuffer_mmap(
1332 struct lttng_consumer_local_data *ctx,
1333 struct lttng_consumer_stream *stream, unsigned long len,
1334 unsigned long padding,
1335 struct lttng_packet_index *index)
1336 {
1337 unsigned long mmap_offset;
1338 void *mmap_base;
1339 ssize_t ret = 0, written = 0;
1340 off_t orig_offset = stream->out_fd_offset;
1341 /* Default is on the disk */
1342 int outfd = stream->out_fd;
1343 struct consumer_relayd_sock_pair *relayd = NULL;
1344 unsigned int relayd_hang_up = 0;
1345
1346 /* RCU lock for the relayd pointer */
1347 rcu_read_lock();
1348
1349 /* Flag that the current stream if set for network streaming. */
1350 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1351 relayd = consumer_find_relayd(stream->net_seq_idx);
1352 if (relayd == NULL) {
1353 ret = -EPIPE;
1354 goto end;
1355 }
1356 }
1357
1358 /* get the offset inside the fd to mmap */
1359 switch (consumer_data.type) {
1360 case LTTNG_CONSUMER_KERNEL:
1361 mmap_base = stream->mmap_base;
1362 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1363 if (ret != 0) {
1364 PERROR("tracer ctl get_mmap_read_offset");
1365 written = -errno;
1366 goto end;
1367 }
1368 break;
1369 case LTTNG_CONSUMER32_UST:
1370 case LTTNG_CONSUMER64_UST:
1371 mmap_base = lttng_ustctl_get_mmap_base(stream);
1372 if (!mmap_base) {
1373 ERR("read mmap get mmap base for stream %s", stream->name);
1374 written = -EPERM;
1375 goto end;
1376 }
1377 ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
1378 if (ret != 0) {
1379 PERROR("tracer ctl get_mmap_read_offset");
1380 written = ret;
1381 goto end;
1382 }
1383 break;
1384 default:
1385 ERR("Unknown consumer_data type");
1386 assert(0);
1387 }
1388
1389 /* Handle stream on the relayd if the output is on the network */
1390 if (relayd) {
1391 unsigned long netlen = len;
1392
1393 /*
1394 * Lock the control socket for the complete duration of the function
1395 * since from this point on we will use the socket.
1396 */
1397 if (stream->metadata_flag) {
1398 /* Metadata requires the control socket. */
1399 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1400 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1401 }
1402
1403 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1404 if (ret >= 0) {
1405 /* Use the returned socket. */
1406 outfd = ret;
1407
1408 /* Write metadata stream id before payload */
1409 if (stream->metadata_flag) {
1410 ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
1411 if (ret < 0) {
1412 written = ret;
1413 /* Socket operation failed. We consider the relayd dead */
1414 if (ret == -EPIPE || ret == -EINVAL) {
1415 relayd_hang_up = 1;
1416 goto write_error;
1417 }
1418 goto end;
1419 }
1420 }
1421 } else {
1422 /* Socket operation failed. We consider the relayd dead */
1423 if (ret == -EPIPE || ret == -EINVAL) {
1424 relayd_hang_up = 1;
1425 goto write_error;
1426 }
1427 /* Else, use the default set before which is the filesystem. */
1428 }
1429 } else {
1430 /* No streaming, we have to set the len with the full padding */
1431 len += padding;
1432
1433 /*
1434 * Check if we need to change the tracefile before writing the packet.
1435 */
1436 if (stream->chan->tracefile_size > 0 &&
1437 (stream->tracefile_size_current + len) >
1438 stream->chan->tracefile_size) {
1439 ret = utils_rotate_stream_file(stream->chan->pathname,
1440 stream->name, stream->chan->tracefile_size,
1441 stream->chan->tracefile_count, stream->uid, stream->gid,
1442 stream->out_fd, &(stream->tracefile_count_current),
1443 &stream->out_fd);
1444 if (ret < 0) {
1445 ERR("Rotating output file");
1446 goto end;
1447 }
1448 outfd = stream->out_fd;
1449
1450 if (stream->index_fd >= 0) {
1451 ret = index_create_file(stream->chan->pathname,
1452 stream->name, stream->uid, stream->gid,
1453 stream->chan->tracefile_size,
1454 stream->tracefile_count_current);
1455 if (ret < 0) {
1456 goto end;
1457 }
1458 stream->index_fd = ret;
1459 }
1460
1461 /* Reset current size because we just perform a rotation. */
1462 stream->tracefile_size_current = 0;
1463 stream->out_fd_offset = 0;
1464 orig_offset = 0;
1465 }
1466 stream->tracefile_size_current += len;
1467 if (index) {
1468 index->offset = htobe64(stream->out_fd_offset);
1469 }
1470 }
1471
1472 while (len > 0) {
1473 do {
1474 ret = write(outfd, mmap_base + mmap_offset, len);
1475 } while (ret < 0 && errno == EINTR);
1476 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
1477 if (ret < 0) {
1478 /*
1479 * This is possible if the fd is closed on the other side (outfd)
1480 * or any write problem. It can be verbose a bit for a normal
1481 * execution if for instance the relayd is stopped abruptly. This
1482 * can happen so set this to a DBG statement.
1483 */
1484 DBG("Error in file write mmap");
1485 if (written == 0) {
1486 written = -errno;
1487 }
1488 /* Socket operation failed. We consider the relayd dead */
1489 if (errno == EPIPE || errno == EINVAL) {
1490 relayd_hang_up = 1;
1491 goto write_error;
1492 }
1493 goto end;
1494 } else if (ret > len) {
1495 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
1496 written += ret;
1497 goto end;
1498 } else {
1499 len -= ret;
1500 mmap_offset += ret;
1501 }
1502
1503 /* This call is useless on a socket so better save a syscall. */
1504 if (!relayd) {
1505 /* This won't block, but will start writeout asynchronously */
1506 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1507 SYNC_FILE_RANGE_WRITE);
1508 stream->out_fd_offset += ret;
1509 }
1510 stream->output_written += ret;
1511 written += ret;
1512 }
1513 lttng_consumer_sync_trace_file(stream, orig_offset);
1514
1515 write_error:
1516 /*
1517 * This is a special case that the relayd has closed its socket. Let's
1518 * cleanup the relayd object and all associated streams.
1519 */
1520 if (relayd && relayd_hang_up) {
1521 cleanup_relayd(relayd, ctx);
1522 }
1523
1524 end:
1525 /* Unlock only if ctrl socket used */
1526 if (relayd && stream->metadata_flag) {
1527 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1528 }
1529
1530 rcu_read_unlock();
1531 return written;
1532 }
1533
1534 /*
1535 * Splice the data from the ring buffer to the tracefile.
1536 *
1537 * It must be called with the stream lock held.
1538 *
1539 * Returns the number of bytes spliced.
1540 */
1541 ssize_t lttng_consumer_on_read_subbuffer_splice(
1542 struct lttng_consumer_local_data *ctx,
1543 struct lttng_consumer_stream *stream, unsigned long len,
1544 unsigned long padding,
1545 struct lttng_packet_index *index)
1546 {
1547 ssize_t ret = 0, written = 0, ret_splice = 0;
1548 loff_t offset = 0;
1549 off_t orig_offset = stream->out_fd_offset;
1550 int fd = stream->wait_fd;
1551 /* Default is on the disk */
1552 int outfd = stream->out_fd;
1553 struct consumer_relayd_sock_pair *relayd = NULL;
1554 int *splice_pipe;
1555 unsigned int relayd_hang_up = 0;
1556
1557 switch (consumer_data.type) {
1558 case LTTNG_CONSUMER_KERNEL:
1559 break;
1560 case LTTNG_CONSUMER32_UST:
1561 case LTTNG_CONSUMER64_UST:
1562 /* Not supported for user space tracing */
1563 return -ENOSYS;
1564 default:
1565 ERR("Unknown consumer_data type");
1566 assert(0);
1567 }
1568
1569 /* RCU lock for the relayd pointer */
1570 rcu_read_lock();
1571
1572 /* Flag that the current stream if set for network streaming. */
1573 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1574 relayd = consumer_find_relayd(stream->net_seq_idx);
1575 if (relayd == NULL) {
1576 ret = -EPIPE;
1577 goto end;
1578 }
1579 }
1580
1581 /*
1582 * Choose right pipe for splice. Metadata and trace data are handled by
1583 * different threads hence the use of two pipes in order not to race or
1584 * corrupt the written data.
1585 */
1586 if (stream->metadata_flag) {
1587 splice_pipe = ctx->consumer_splice_metadata_pipe;
1588 } else {
1589 splice_pipe = ctx->consumer_thread_pipe;
1590 }
1591
1592 /* Write metadata stream id before payload */
1593 if (relayd) {
1594 int total_len = len;
1595
1596 if (stream->metadata_flag) {
1597 /*
1598 * Lock the control socket for the complete duration of the function
1599 * since from this point on we will use the socket.
1600 */
1601 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1602
1603 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
1604 padding);
1605 if (ret < 0) {
1606 written = ret;
1607 /* Socket operation failed. We consider the relayd dead */
1608 if (ret == -EBADF) {
1609 WARN("Remote relayd disconnected. Stopping");
1610 relayd_hang_up = 1;
1611 goto write_error;
1612 }
1613 goto end;
1614 }
1615
1616 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1617 }
1618
1619 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1620 if (ret >= 0) {
1621 /* Use the returned socket. */
1622 outfd = ret;
1623 } else {
1624 /* Socket operation failed. We consider the relayd dead */
1625 if (ret == -EBADF) {
1626 WARN("Remote relayd disconnected. Stopping");
1627 relayd_hang_up = 1;
1628 goto write_error;
1629 }
1630 goto end;
1631 }
1632 } else {
1633 /* No streaming, we have to set the len with the full padding */
1634 len += padding;
1635
1636 /*
1637 * Check if we need to change the tracefile before writing the packet.
1638 */
1639 if (stream->chan->tracefile_size > 0 &&
1640 (stream->tracefile_size_current + len) >
1641 stream->chan->tracefile_size) {
1642 ret = utils_rotate_stream_file(stream->chan->pathname,
1643 stream->name, stream->chan->tracefile_size,
1644 stream->chan->tracefile_count, stream->uid, stream->gid,
1645 stream->out_fd, &(stream->tracefile_count_current),
1646 &stream->out_fd);
1647 if (ret < 0) {
1648 ERR("Rotating output file");
1649 goto end;
1650 }
1651 outfd = stream->out_fd;
1652
1653 if (stream->index_fd >= 0) {
1654 ret = index_create_file(stream->chan->pathname,
1655 stream->name, stream->uid, stream->gid,
1656 stream->chan->tracefile_size,
1657 stream->tracefile_count_current);
1658 if (ret < 0) {
1659 goto end;
1660 }
1661 stream->index_fd = ret;
1662 }
1663
1664 /* Reset current size because we just perform a rotation. */
1665 stream->tracefile_size_current = 0;
1666 stream->out_fd_offset = 0;
1667 orig_offset = 0;
1668 }
1669 stream->tracefile_size_current += len;
1670 index->offset = htobe64(stream->out_fd_offset);
1671 }
1672
1673 while (len > 0) {
1674 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1675 (unsigned long)offset, len, fd, splice_pipe[1]);
1676 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
1677 SPLICE_F_MOVE | SPLICE_F_MORE);
1678 DBG("splice chan to pipe, ret %zd", ret_splice);
1679 if (ret_splice < 0) {
1680 PERROR("Error in relay splice");
1681 if (written == 0) {
1682 written = ret_splice;
1683 }
1684 ret = errno;
1685 goto splice_error;
1686 }
1687
1688 /* Handle stream on the relayd if the output is on the network */
1689 if (relayd) {
1690 if (stream->metadata_flag) {
1691 size_t metadata_payload_size =
1692 sizeof(struct lttcomm_relayd_metadata_payload);
1693
1694 /* Update counter to fit the spliced data */
1695 ret_splice += metadata_payload_size;
1696 len += metadata_payload_size;
1697 /*
1698 * We do this so the return value can match the len passed as
1699 * argument to this function.
1700 */
1701 written -= metadata_payload_size;
1702 }
1703 }
1704
1705 /* Splice data out */
1706 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
1707 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1708 DBG("Consumer splice pipe to file, ret %zd", ret_splice);
1709 if (ret_splice < 0) {
1710 PERROR("Error in file splice");
1711 if (written == 0) {
1712 written = ret_splice;
1713 }
1714 /* Socket operation failed. We consider the relayd dead */
1715 if (errno == EBADF || errno == EPIPE) {
1716 WARN("Remote relayd disconnected. Stopping");
1717 relayd_hang_up = 1;
1718 goto write_error;
1719 }
1720 ret = errno;
1721 goto splice_error;
1722 } else if (ret_splice > len) {
1723 errno = EINVAL;
1724 PERROR("Wrote more data than requested %zd (len: %lu)",
1725 ret_splice, len);
1726 written += ret_splice;
1727 ret = errno;
1728 goto splice_error;
1729 }
1730 len -= ret_splice;
1731
1732 /* This call is useless on a socket so better save a syscall. */
1733 if (!relayd) {
1734 /* This won't block, but will start writeout asynchronously */
1735 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1736 SYNC_FILE_RANGE_WRITE);
1737 stream->out_fd_offset += ret_splice;
1738 }
1739 stream->output_written += ret_splice;
1740 written += ret_splice;
1741 }
1742 lttng_consumer_sync_trace_file(stream, orig_offset);
1743
1744 ret = ret_splice;
1745
1746 goto end;
1747
1748 write_error:
1749 /*
1750 * This is a special case that the relayd has closed its socket. Let's
1751 * cleanup the relayd object and all associated streams.
1752 */
1753 if (relayd && relayd_hang_up) {
1754 cleanup_relayd(relayd, ctx);
1755 /* Skip splice error so the consumer does not fail */
1756 goto end;
1757 }
1758
1759 splice_error:
1760 /* send the appropriate error description to sessiond */
1761 switch (ret) {
1762 case EINVAL:
1763 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
1764 break;
1765 case ENOMEM:
1766 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
1767 break;
1768 case ESPIPE:
1769 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
1770 break;
1771 }
1772
1773 end:
1774 if (relayd && stream->metadata_flag) {
1775 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1776 }
1777
1778 rcu_read_unlock();
1779 return written;
1780 }
1781
1782 /*
1783 * Take a snapshot for a specific fd
1784 *
1785 * Returns 0 on success, < 0 on error
1786 */
1787 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
1788 {
1789 switch (consumer_data.type) {
1790 case LTTNG_CONSUMER_KERNEL:
1791 return lttng_kconsumer_take_snapshot(stream);
1792 case LTTNG_CONSUMER32_UST:
1793 case LTTNG_CONSUMER64_UST:
1794 return lttng_ustconsumer_take_snapshot(stream);
1795 default:
1796 ERR("Unknown consumer_data type");
1797 assert(0);
1798 return -ENOSYS;
1799 }
1800 }
1801
1802 /*
1803 * Get the produced position
1804 *
1805 * Returns 0 on success, < 0 on error
1806 */
1807 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
1808 unsigned long *pos)
1809 {
1810 switch (consumer_data.type) {
1811 case LTTNG_CONSUMER_KERNEL:
1812 return lttng_kconsumer_get_produced_snapshot(stream, pos);
1813 case LTTNG_CONSUMER32_UST:
1814 case LTTNG_CONSUMER64_UST:
1815 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
1816 default:
1817 ERR("Unknown consumer_data type");
1818 assert(0);
1819 return -ENOSYS;
1820 }
1821 }
1822
1823 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1824 int sock, struct pollfd *consumer_sockpoll)
1825 {
1826 switch (consumer_data.type) {
1827 case LTTNG_CONSUMER_KERNEL:
1828 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1829 case LTTNG_CONSUMER32_UST:
1830 case LTTNG_CONSUMER64_UST:
1831 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1832 default:
1833 ERR("Unknown consumer_data type");
1834 assert(0);
1835 return -ENOSYS;
1836 }
1837 }
1838
1839 /*
1840 * Iterate over all streams of the hashtable and free them properly.
1841 *
1842 * WARNING: *MUST* be used with data stream only.
1843 */
1844 static void destroy_data_stream_ht(struct lttng_ht *ht)
1845 {
1846 struct lttng_ht_iter iter;
1847 struct lttng_consumer_stream *stream;
1848
1849 if (ht == NULL) {
1850 return;
1851 }
1852
1853 rcu_read_lock();
1854 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1855 /*
1856 * Ignore return value since we are currently cleaning up so any error
1857 * can't be handled.
1858 */
1859 (void) consumer_del_stream(stream, ht);
1860 }
1861 rcu_read_unlock();
1862
1863 lttng_ht_destroy(ht);
1864 }
1865
1866 /*
1867 * Iterate over all streams of the hashtable and free them properly.
1868 *
1869 * XXX: Should not be only for metadata stream or else use an other name.
1870 */
1871 static void destroy_stream_ht(struct lttng_ht *ht)
1872 {
1873 struct lttng_ht_iter iter;
1874 struct lttng_consumer_stream *stream;
1875
1876 if (ht == NULL) {
1877 return;
1878 }
1879
1880 rcu_read_lock();
1881 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1882 /*
1883 * Ignore return value since we are currently cleaning up so any error
1884 * can't be handled.
1885 */
1886 (void) consumer_del_metadata_stream(stream, ht);
1887 }
1888 rcu_read_unlock();
1889
1890 lttng_ht_destroy(ht);
1891 }
1892
1893 void lttng_consumer_close_metadata(void)
1894 {
1895 switch (consumer_data.type) {
1896 case LTTNG_CONSUMER_KERNEL:
1897 /*
1898 * The Kernel consumer has a different metadata scheme so we don't
1899 * close anything because the stream will be closed by the session
1900 * daemon.
1901 */
1902 break;
1903 case LTTNG_CONSUMER32_UST:
1904 case LTTNG_CONSUMER64_UST:
1905 /*
1906 * Close all metadata streams. The metadata hash table is passed and
1907 * this call iterates over it by closing all wakeup fd. This is safe
1908 * because at this point we are sure that the metadata producer is
1909 * either dead or blocked.
1910 */
1911 lttng_ustconsumer_close_metadata(metadata_ht);
1912 break;
1913 default:
1914 ERR("Unknown consumer_data type");
1915 assert(0);
1916 }
1917 }
1918
1919 /*
1920 * Clean up a metadata stream and free its memory.
1921 */
1922 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
1923 struct lttng_ht *ht)
1924 {
1925 int ret;
1926 struct lttng_ht_iter iter;
1927 struct lttng_consumer_channel *free_chan = NULL;
1928 struct consumer_relayd_sock_pair *relayd;
1929
1930 assert(stream);
1931 /*
1932 * This call should NEVER receive regular stream. It must always be
1933 * metadata stream and this is crucial for data structure synchronization.
1934 */
1935 assert(stream->metadata_flag);
1936
1937 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
1938
1939 if (ht == NULL) {
1940 /* Means the stream was allocated but not successfully added */
1941 goto free_stream_rcu;
1942 }
1943
1944 pthread_mutex_lock(&consumer_data.lock);
1945 pthread_mutex_lock(&stream->chan->lock);
1946 pthread_mutex_lock(&stream->lock);
1947
1948 switch (consumer_data.type) {
1949 case LTTNG_CONSUMER_KERNEL:
1950 if (stream->mmap_base != NULL) {
1951 ret = munmap(stream->mmap_base, stream->mmap_len);
1952 if (ret != 0) {
1953 PERROR("munmap metadata stream");
1954 }
1955 }
1956 if (stream->wait_fd >= 0) {
1957 ret = close(stream->wait_fd);
1958 if (ret < 0) {
1959 PERROR("close kernel metadata wait_fd");
1960 }
1961 }
1962 break;
1963 case LTTNG_CONSUMER32_UST:
1964 case LTTNG_CONSUMER64_UST:
1965 if (stream->monitor) {
1966 /* close the write-side in close_metadata */
1967 ret = close(stream->ust_metadata_poll_pipe[0]);
1968 if (ret < 0) {
1969 PERROR("Close UST metadata read-side poll pipe");
1970 }
1971 }
1972 lttng_ustconsumer_del_stream(stream);
1973 break;
1974 default:
1975 ERR("Unknown consumer_data type");
1976 assert(0);
1977 goto end;
1978 }
1979
1980 rcu_read_lock();
1981 iter.iter.node = &stream->node.node;
1982 ret = lttng_ht_del(ht, &iter);
1983 assert(!ret);
1984
1985 iter.iter.node = &stream->node_channel_id.node;
1986 ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
1987 assert(!ret);
1988
1989 iter.iter.node = &stream->node_session_id.node;
1990 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
1991 assert(!ret);
1992 rcu_read_unlock();
1993
1994 if (stream->out_fd >= 0) {
1995 ret = close(stream->out_fd);
1996 if (ret) {
1997 PERROR("close");
1998 }
1999 }
2000
2001 /* Check and cleanup relayd */
2002 rcu_read_lock();
2003 relayd = consumer_find_relayd(stream->net_seq_idx);
2004 if (relayd != NULL) {
2005 uatomic_dec(&relayd->refcount);
2006 assert(uatomic_read(&relayd->refcount) >= 0);
2007
2008 /* Closing streams requires to lock the control socket. */
2009 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
2010 ret = relayd_send_close_stream(&relayd->control_sock,
2011 stream->relayd_stream_id, stream->next_net_seq_num - 1);
2012 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2013 if (ret < 0) {
2014 DBG("Unable to close stream on the relayd. Continuing");
2015 /*
2016 * Continue here. There is nothing we can do for the relayd.
2017 * Chances are that the relayd has closed the socket so we just
2018 * continue cleaning up.
2019 */
2020 }
2021
2022 /* Both conditions are met, we destroy the relayd. */
2023 if (uatomic_read(&relayd->refcount) == 0 &&
2024 uatomic_read(&relayd->destroy_flag)) {
2025 consumer_destroy_relayd(relayd);
2026 }
2027 }
2028 rcu_read_unlock();
2029
2030 /* Atomically decrement channel refcount since other threads can use it. */
2031 if (!uatomic_sub_return(&stream->chan->refcount, 1)
2032 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
2033 /* Go for channel deletion! */
2034 free_chan = stream->chan;
2035 }
2036
2037 end:
2038 /*
2039 * Nullify the stream reference so it is not used after deletion. The
2040 * channel lock MUST be acquired before being able to check for
2041 * a NULL pointer value.
2042 */
2043 stream->chan->metadata_stream = NULL;
2044
2045 pthread_mutex_unlock(&stream->lock);
2046 pthread_mutex_unlock(&stream->chan->lock);
2047 pthread_mutex_unlock(&consumer_data.lock);
2048
2049 if (free_chan) {
2050 consumer_del_channel(free_chan);
2051 }
2052
2053 free_stream_rcu:
2054 call_rcu(&stream->node.head, free_stream_rcu);
2055 }
2056
2057 /*
2058 * Action done with the metadata stream when adding it to the consumer internal
2059 * data structures to handle it.
2060 */
2061 int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
2062 {
2063 struct lttng_ht *ht = metadata_ht;
2064 int ret = 0;
2065 struct lttng_ht_iter iter;
2066 struct lttng_ht_node_u64 *node;
2067
2068 assert(stream);
2069 assert(ht);
2070
2071 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
2072
2073 pthread_mutex_lock(&consumer_data.lock);
2074 pthread_mutex_lock(&stream->chan->lock);
2075 pthread_mutex_lock(&stream->chan->timer_lock);
2076 pthread_mutex_lock(&stream->lock);
2077
2078 /*
2079 * From here, refcounts are updated so be _careful_ when returning an error
2080 * after this point.
2081 */
2082
2083 rcu_read_lock();
2084
2085 /*
2086 * Lookup the stream just to make sure it does not exist in our internal
2087 * state. This should NEVER happen.
2088 */
2089 lttng_ht_lookup(ht, &stream->key, &iter);
2090 node = lttng_ht_iter_get_node_u64(&iter);
2091 assert(!node);
2092
2093 /*
2094 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2095 * in terms of destroying the associated channel, because the action that
2096 * causes the count to become 0 also causes a stream to be added. The
2097 * channel deletion will thus be triggered by the following removal of this
2098 * stream.
2099 */
2100 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
2101 /* Increment refcount before decrementing nb_init_stream_left */
2102 cmm_smp_wmb();
2103 uatomic_dec(&stream->chan->nb_init_stream_left);
2104 }
2105
2106 lttng_ht_add_unique_u64(ht, &stream->node);
2107
2108 lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
2109 &stream->node_channel_id);
2110
2111 /*
2112 * Add stream to the stream_list_ht of the consumer data. No need to steal
2113 * the key since the HT does not use it and we allow to add redundant keys
2114 * into this table.
2115 */
2116 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
2117
2118 rcu_read_unlock();
2119
2120 pthread_mutex_unlock(&stream->lock);
2121 pthread_mutex_unlock(&stream->chan->lock);
2122 pthread_mutex_unlock(&stream->chan->timer_lock);
2123 pthread_mutex_unlock(&consumer_data.lock);
2124 return ret;
2125 }
2126
2127 /*
2128 * Delete data stream that are flagged for deletion (endpoint_status).
2129 */
2130 static void validate_endpoint_status_data_stream(void)
2131 {
2132 struct lttng_ht_iter iter;
2133 struct lttng_consumer_stream *stream;
2134
2135 DBG("Consumer delete flagged data stream");
2136
2137 rcu_read_lock();
2138 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
2139 /* Validate delete flag of the stream */
2140 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2141 continue;
2142 }
2143 /* Delete it right now */
2144 consumer_del_stream(stream, data_ht);
2145 }
2146 rcu_read_unlock();
2147 }
2148
2149 /*
2150 * Delete metadata stream that are flagged for deletion (endpoint_status).
2151 */
2152 static void validate_endpoint_status_metadata_stream(
2153 struct lttng_poll_event *pollset)
2154 {
2155 struct lttng_ht_iter iter;
2156 struct lttng_consumer_stream *stream;
2157
2158 DBG("Consumer delete flagged metadata stream");
2159
2160 assert(pollset);
2161
2162 rcu_read_lock();
2163 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
2164 /* Validate delete flag of the stream */
2165 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2166 continue;
2167 }
2168 /*
2169 * Remove from pollset so the metadata thread can continue without
2170 * blocking on a deleted stream.
2171 */
2172 lttng_poll_del(pollset, stream->wait_fd);
2173
2174 /* Delete it right now */
2175 consumer_del_metadata_stream(stream, metadata_ht);
2176 }
2177 rcu_read_unlock();
2178 }
2179
2180 /*
2181 * Thread polls on metadata file descriptor and write them on disk or on the
2182 * network.
2183 */
2184 void *consumer_thread_metadata_poll(void *data)
2185 {
2186 int ret, i, pollfd, err = -1;
2187 uint32_t revents, nb_fd;
2188 struct lttng_consumer_stream *stream = NULL;
2189 struct lttng_ht_iter iter;
2190 struct lttng_ht_node_u64 *node;
2191 struct lttng_poll_event events;
2192 struct lttng_consumer_local_data *ctx = data;
2193 ssize_t len;
2194
2195 rcu_register_thread();
2196
2197 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2198
2199 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2200 if (!metadata_ht) {
2201 /* ENOMEM at this point. Better to bail out. */
2202 goto end_ht;
2203 }
2204
2205 DBG("Thread metadata poll started");
2206
2207 /* Size is set to 1 for the consumer_metadata pipe */
2208 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2209 if (ret < 0) {
2210 ERR("Poll set creation failed");
2211 goto end_poll;
2212 }
2213
2214 ret = lttng_poll_add(&events,
2215 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
2216 if (ret < 0) {
2217 goto end;
2218 }
2219
2220 /* Main loop */
2221 DBG("Metadata main loop started");
2222
2223 while (1) {
2224 /* Only the metadata pipe is set */
2225 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
2226 err = 0; /* All is OK */
2227 goto end;
2228 }
2229
2230 restart:
2231 DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2232 ret = lttng_poll_wait(&events, -1);
2233 DBG("Metadata event catched in thread");
2234 if (ret < 0) {
2235 if (errno == EINTR) {
2236 ERR("Poll EINTR catched");
2237 goto restart;
2238 }
2239 goto error;
2240 }
2241
2242 nb_fd = ret;
2243
2244 /* From here, the event is a metadata wait fd */
2245 for (i = 0; i < nb_fd; i++) {
2246 revents = LTTNG_POLL_GETEV(&events, i);
2247 pollfd = LTTNG_POLL_GETFD(&events, i);
2248
2249 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
2250 if (revents & (LPOLLERR | LPOLLHUP )) {
2251 DBG("Metadata thread pipe hung up");
2252 /*
2253 * Remove the pipe from the poll set and continue the loop
2254 * since their might be data to consume.
2255 */
2256 lttng_poll_del(&events,
2257 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2258 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2259 continue;
2260 } else if (revents & LPOLLIN) {
2261 ssize_t pipe_len;
2262
2263 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2264 &stream, sizeof(stream));
2265 if (pipe_len < 0) {
2266 ERR("read metadata stream, ret: %zd", pipe_len);
2267 /*
2268 * Continue here to handle the rest of the streams.
2269 */
2270 continue;
2271 }
2272
2273 /* A NULL stream means that the state has changed. */
2274 if (stream == NULL) {
2275 /* Check for deleted streams. */
2276 validate_endpoint_status_metadata_stream(&events);
2277 goto restart;
2278 }
2279
2280 DBG("Adding metadata stream %d to poll set",
2281 stream->wait_fd);
2282
2283 /* Add metadata stream to the global poll events list */
2284 lttng_poll_add(&events, stream->wait_fd,
2285 LPOLLIN | LPOLLPRI);
2286 }
2287
2288 /* Handle other stream */
2289 continue;
2290 }
2291
2292 rcu_read_lock();
2293 {
2294 uint64_t tmp_id = (uint64_t) pollfd;
2295
2296 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2297 }
2298 node = lttng_ht_iter_get_node_u64(&iter);
2299 assert(node);
2300
2301 stream = caa_container_of(node, struct lttng_consumer_stream,
2302 node);
2303
2304 /* Check for error event */
2305 if (revents & (LPOLLERR | LPOLLHUP)) {
2306 DBG("Metadata fd %d is hup|err.", pollfd);
2307 if (!stream->hangup_flush_done
2308 && (consumer_data.type == LTTNG_CONSUMER32_UST
2309 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2310 DBG("Attempting to flush and consume the UST buffers");
2311 lttng_ustconsumer_on_stream_hangup(stream);
2312
2313 /* We just flushed the stream now read it. */
2314 do {
2315 len = ctx->on_buffer_ready(stream, ctx);
2316 /*
2317 * We don't check the return value here since if we get
2318 * a negative len, it means an error occured thus we
2319 * simply remove it from the poll set and free the
2320 * stream.
2321 */
2322 } while (len > 0);
2323 }
2324
2325 lttng_poll_del(&events, stream->wait_fd);
2326 /*
2327 * This call update the channel states, closes file descriptors
2328 * and securely free the stream.
2329 */
2330 consumer_del_metadata_stream(stream, metadata_ht);
2331 } else if (revents & (LPOLLIN | LPOLLPRI)) {
2332 /* Get the data out of the metadata file descriptor */
2333 DBG("Metadata available on fd %d", pollfd);
2334 assert(stream->wait_fd == pollfd);
2335
2336 do {
2337 len = ctx->on_buffer_ready(stream, ctx);
2338 /*
2339 * We don't check the return value here since if we get
2340 * a negative len, it means an error occured thus we
2341 * simply remove it from the poll set and free the
2342 * stream.
2343 */
2344 } while (len > 0);
2345
2346 /* It's ok to have an unavailable sub-buffer */
2347 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2348 /* Clean up stream from consumer and free it. */
2349 lttng_poll_del(&events, stream->wait_fd);
2350 consumer_del_metadata_stream(stream, metadata_ht);
2351 }
2352 }
2353
2354 /* Release RCU lock for the stream looked up */
2355 rcu_read_unlock();
2356 }
2357 }
2358
2359 /* All is OK */
2360 err = 0;
2361 error:
2362 end:
2363 DBG("Metadata poll thread exiting");
2364
2365 lttng_poll_clean(&events);
2366 end_poll:
2367 destroy_stream_ht(metadata_ht);
2368 end_ht:
2369 if (err) {
2370 health_error();
2371 ERR("Health error occurred in %s", __func__);
2372 }
2373 health_unregister(health_consumerd);
2374 rcu_unregister_thread();
2375 return NULL;
2376 }
2377
2378 /*
2379 * This thread polls the fds in the set to consume the data and write
2380 * it to tracefile if necessary.
2381 */
2382 void *consumer_thread_data_poll(void *data)
2383 {
2384 int num_rdy, num_hup, high_prio, ret, i, err = -1;
2385 struct pollfd *pollfd = NULL;
2386 /* local view of the streams */
2387 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
2388 /* local view of consumer_data.fds_count */
2389 int nb_fd = 0;
2390 struct lttng_consumer_local_data *ctx = data;
2391 ssize_t len;
2392
2393 rcu_register_thread();
2394
2395 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2396
2397 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2398 if (data_ht == NULL) {
2399 /* ENOMEM at this point. Better to bail out. */
2400 goto end;
2401 }
2402
2403 local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
2404 if (local_stream == NULL) {
2405 PERROR("local_stream malloc");
2406 goto end;
2407 }
2408
2409 while (1) {
2410 high_prio = 0;
2411 num_hup = 0;
2412
2413 /*
2414 * the fds set has been updated, we need to update our
2415 * local array as well
2416 */
2417 pthread_mutex_lock(&consumer_data.lock);
2418 if (consumer_data.need_update) {
2419 free(pollfd);
2420 pollfd = NULL;
2421
2422 free(local_stream);
2423 local_stream = NULL;
2424
2425 /* allocate for all fds + 1 for the consumer_data_pipe */
2426 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
2427 if (pollfd == NULL) {
2428 PERROR("pollfd malloc");
2429 pthread_mutex_unlock(&consumer_data.lock);
2430 goto end;
2431 }
2432
2433 /* allocate for all fds + 1 for the consumer_data_pipe */
2434 local_stream = zmalloc((consumer_data.stream_count + 1) *
2435 sizeof(struct lttng_consumer_stream *));
2436 if (local_stream == NULL) {
2437 PERROR("local_stream malloc");
2438 pthread_mutex_unlock(&consumer_data.lock);
2439 goto end;
2440 }
2441 ret = update_poll_array(ctx, &pollfd, local_stream,
2442 data_ht);
2443 if (ret < 0) {
2444 ERR("Error in allocating pollfd or local_outfds");
2445 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2446 pthread_mutex_unlock(&consumer_data.lock);
2447 goto end;
2448 }
2449 nb_fd = ret;
2450 consumer_data.need_update = 0;
2451 }
2452 pthread_mutex_unlock(&consumer_data.lock);
2453
2454 /* No FDs and consumer_quit, consumer_cleanup the thread */
2455 if (nb_fd == 0 && consumer_quit == 1) {
2456 err = 0; /* All is OK */
2457 goto end;
2458 }
2459 /* poll on the array of fds */
2460 restart:
2461 DBG("polling on %d fd", nb_fd + 1);
2462 num_rdy = poll(pollfd, nb_fd + 1, -1);
2463 DBG("poll num_rdy : %d", num_rdy);
2464 if (num_rdy == -1) {
2465 /*
2466 * Restart interrupted system call.
2467 */
2468 if (errno == EINTR) {
2469 goto restart;
2470 }
2471 PERROR("Poll error");
2472 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2473 goto end;
2474 } else if (num_rdy == 0) {
2475 DBG("Polling thread timed out");
2476 goto end;
2477 }
2478
2479 /*
2480 * If the consumer_data_pipe triggered poll go directly to the
2481 * beginning of the loop to update the array. We want to prioritize
2482 * array update over low-priority reads.
2483 */
2484 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2485 ssize_t pipe_readlen;
2486
2487 DBG("consumer_data_pipe wake up");
2488 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2489 &new_stream, sizeof(new_stream));
2490 if (pipe_readlen < 0) {
2491 ERR("Consumer data pipe ret %zd", pipe_readlen);
2492 /* Continue so we can at least handle the current stream(s). */
2493 continue;
2494 }
2495
2496 /*
2497 * If the stream is NULL, just ignore it. It's also possible that
2498 * the sessiond poll thread changed the consumer_quit state and is
2499 * waking us up to test it.
2500 */
2501 if (new_stream == NULL) {
2502 validate_endpoint_status_data_stream();
2503 continue;
2504 }
2505
2506 /* Continue to update the local streams and handle prio ones */
2507 continue;
2508 }
2509
2510 /* Take care of high priority channels first. */
2511 for (i = 0; i < nb_fd; i++) {
2512 if (local_stream[i] == NULL) {
2513 continue;
2514 }
2515 if (pollfd[i].revents & POLLPRI) {
2516 DBG("Urgent read on fd %d", pollfd[i].fd);
2517 high_prio = 1;
2518 len = ctx->on_buffer_ready(local_stream[i], ctx);
2519 /* it's ok to have an unavailable sub-buffer */
2520 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2521 /* Clean the stream and free it. */
2522 consumer_del_stream(local_stream[i], data_ht);
2523 local_stream[i] = NULL;
2524 } else if (len > 0) {
2525 local_stream[i]->data_read = 1;
2526 }
2527 }
2528 }
2529
2530 /*
2531 * If we read high prio channel in this loop, try again
2532 * for more high prio data.
2533 */
2534 if (high_prio) {
2535 continue;
2536 }
2537
2538 /* Take care of low priority channels. */
2539 for (i = 0; i < nb_fd; i++) {
2540 if (local_stream[i] == NULL) {
2541 continue;
2542 }
2543 if ((pollfd[i].revents & POLLIN) ||
2544 local_stream[i]->hangup_flush_done) {
2545 DBG("Normal read on fd %d", pollfd[i].fd);
2546 len = ctx->on_buffer_ready(local_stream[i], ctx);
2547 /* it's ok to have an unavailable sub-buffer */
2548 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2549 /* Clean the stream and free it. */
2550 consumer_del_stream(local_stream[i], data_ht);
2551 local_stream[i] = NULL;
2552 } else if (len > 0) {
2553 local_stream[i]->data_read = 1;
2554 }
2555 }
2556 }
2557
2558 /* Handle hangup and errors */
2559 for (i = 0; i < nb_fd; i++) {
2560 if (local_stream[i] == NULL) {
2561 continue;
2562 }
2563 if (!local_stream[i]->hangup_flush_done
2564 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2565 && (consumer_data.type == LTTNG_CONSUMER32_UST
2566 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2567 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2568 pollfd[i].fd);
2569 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2570 /* Attempt read again, for the data we just flushed. */
2571 local_stream[i]->data_read = 1;
2572 }
2573 /*
2574 * If the poll flag is HUP/ERR/NVAL and we have
2575 * read no data in this pass, we can remove the
2576 * stream from its hash table.
2577 */
2578 if ((pollfd[i].revents & POLLHUP)) {
2579 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2580 if (!local_stream[i]->data_read) {
2581 consumer_del_stream(local_stream[i], data_ht);
2582 local_stream[i] = NULL;
2583 num_hup++;
2584 }
2585 } else if (pollfd[i].revents & POLLERR) {
2586 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2587 if (!local_stream[i]->data_read) {
2588 consumer_del_stream(local_stream[i], data_ht);
2589 local_stream[i] = NULL;
2590 num_hup++;
2591 }
2592 } else if (pollfd[i].revents & POLLNVAL) {
2593 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2594 if (!local_stream[i]->data_read) {
2595 consumer_del_stream(local_stream[i], data_ht);
2596 local_stream[i] = NULL;
2597 num_hup++;
2598 }
2599 }
2600 if (local_stream[i] != NULL) {
2601 local_stream[i]->data_read = 0;
2602 }
2603 }
2604 }
2605 /* All is OK */
2606 err = 0;
2607 end:
2608 DBG("polling thread exiting");
2609 free(pollfd);
2610 free(local_stream);
2611
2612 /*
2613 * Close the write side of the pipe so epoll_wait() in
2614 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2615 * read side of the pipe. If we close them both, epoll_wait strangely does
2616 * not return and could create a endless wait period if the pipe is the
2617 * only tracked fd in the poll set. The thread will take care of closing
2618 * the read side.
2619 */
2620 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
2621
2622 destroy_data_stream_ht(data_ht);
2623
2624 if (err) {
2625 health_error();
2626 ERR("Health error occurred in %s", __func__);
2627 }
2628 health_unregister(health_consumerd);
2629
2630 rcu_unregister_thread();
2631 return NULL;
2632 }
2633
2634 /*
2635 * Close wake-up end of each stream belonging to the channel. This will
2636 * allow the poll() on the stream read-side to detect when the
2637 * write-side (application) finally closes them.
2638 */
2639 static
2640 void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2641 {
2642 struct lttng_ht *ht;
2643 struct lttng_consumer_stream *stream;
2644 struct lttng_ht_iter iter;
2645
2646 ht = consumer_data.stream_per_chan_id_ht;
2647
2648 rcu_read_lock();
2649 cds_lfht_for_each_entry_duplicate(ht->ht,
2650 ht->hash_fct(&channel->key, lttng_ht_seed),
2651 ht->match_fct, &channel->key,
2652 &iter.iter, stream, node_channel_id.node) {
2653 /*
2654 * Protect against teardown with mutex.
2655 */
2656 pthread_mutex_lock(&stream->lock);
2657 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2658 goto next;
2659 }
2660 switch (consumer_data.type) {
2661 case LTTNG_CONSUMER_KERNEL:
2662 break;
2663 case LTTNG_CONSUMER32_UST:
2664 case LTTNG_CONSUMER64_UST:
2665 /*
2666 * Note: a mutex is taken internally within
2667 * liblttng-ust-ctl to protect timer wakeup_fd
2668 * use from concurrent close.
2669 */
2670 lttng_ustconsumer_close_stream_wakeup(stream);
2671 break;
2672 default:
2673 ERR("Unknown consumer_data type");
2674 assert(0);
2675 }
2676 next:
2677 pthread_mutex_unlock(&stream->lock);
2678 }
2679 rcu_read_unlock();
2680 }
2681
2682 static void destroy_channel_ht(struct lttng_ht *ht)
2683 {
2684 struct lttng_ht_iter iter;
2685 struct lttng_consumer_channel *channel;
2686 int ret;
2687
2688 if (ht == NULL) {
2689 return;
2690 }
2691
2692 rcu_read_lock();
2693 cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
2694 ret = lttng_ht_del(ht, &iter);
2695 assert(ret != 0);
2696 }
2697 rcu_read_unlock();
2698
2699 lttng_ht_destroy(ht);
2700 }
2701
2702 /*
2703 * This thread polls the channel fds to detect when they are being
2704 * closed. It closes all related streams if the channel is detected as
2705 * closed. It is currently only used as a shim layer for UST because the
2706 * consumerd needs to keep the per-stream wakeup end of pipes open for
2707 * periodical flush.
2708 */
2709 void *consumer_thread_channel_poll(void *data)
2710 {
2711 int ret, i, pollfd, err = -1;
2712 uint32_t revents, nb_fd;
2713 struct lttng_consumer_channel *chan = NULL;
2714 struct lttng_ht_iter iter;
2715 struct lttng_ht_node_u64 *node;
2716 struct lttng_poll_event events;
2717 struct lttng_consumer_local_data *ctx = data;
2718 struct lttng_ht *channel_ht;
2719
2720 rcu_register_thread();
2721
2722 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2723
2724 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2725 if (!channel_ht) {
2726 /* ENOMEM at this point. Better to bail out. */
2727 goto end_ht;
2728 }
2729
2730 DBG("Thread channel poll started");
2731
2732 /* Size is set to 1 for the consumer_channel pipe */
2733 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2734 if (ret < 0) {
2735 ERR("Poll set creation failed");
2736 goto end_poll;
2737 }
2738
2739 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2740 if (ret < 0) {
2741 goto end;
2742 }
2743
2744 /* Main loop */
2745 DBG("Channel main loop started");
2746
2747 while (1) {
2748 /* Only the channel pipe is set */
2749 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
2750 err = 0; /* All is OK */
2751 goto end;
2752 }
2753
2754 restart:
2755 DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2756 ret = lttng_poll_wait(&events, -1);
2757 DBG("Channel event catched in thread");
2758 if (ret < 0) {
2759 if (errno == EINTR) {
2760 ERR("Poll EINTR catched");
2761 goto restart;
2762 }
2763 goto end;
2764 }
2765
2766 nb_fd = ret;
2767
2768 /* From here, the event is a channel wait fd */
2769 for (i = 0; i < nb_fd; i++) {
2770 revents = LTTNG_POLL_GETEV(&events, i);
2771 pollfd = LTTNG_POLL_GETFD(&events, i);
2772
2773 /* Just don't waste time if no returned events for the fd */
2774 if (!revents) {
2775 continue;
2776 }
2777 if (pollfd == ctx->consumer_channel_pipe[0]) {
2778 if (revents & (LPOLLERR | LPOLLHUP)) {
2779 DBG("Channel thread pipe hung up");
2780 /*
2781 * Remove the pipe from the poll set and continue the loop
2782 * since their might be data to consume.
2783 */
2784 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2785 continue;
2786 } else if (revents & LPOLLIN) {
2787 enum consumer_channel_action action;
2788 uint64_t key;
2789
2790 ret = read_channel_pipe(ctx, &chan, &key, &action);
2791 if (ret <= 0) {
2792 ERR("Error reading channel pipe");
2793 continue;
2794 }
2795
2796 switch (action) {
2797 case CONSUMER_CHANNEL_ADD:
2798 DBG("Adding channel %d to poll set",
2799 chan->wait_fd);
2800
2801 lttng_ht_node_init_u64(&chan->wait_fd_node,
2802 chan->wait_fd);
2803 rcu_read_lock();
2804 lttng_ht_add_unique_u64(channel_ht,
2805 &chan->wait_fd_node);
2806 rcu_read_unlock();
2807 /* Add channel to the global poll events list */
2808 lttng_poll_add(&events, chan->wait_fd,
2809 LPOLLIN | LPOLLPRI);
2810 break;
2811 case CONSUMER_CHANNEL_DEL:
2812 {
2813 struct lttng_consumer_stream *stream, *stmp;
2814
2815 rcu_read_lock();
2816 chan = consumer_find_channel(key);
2817 if (!chan) {
2818 rcu_read_unlock();
2819 ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
2820 break;
2821 }
2822 lttng_poll_del(&events, chan->wait_fd);
2823 iter.iter.node = &chan->wait_fd_node.node;
2824 ret = lttng_ht_del(channel_ht, &iter);
2825 assert(ret == 0);
2826 consumer_close_channel_streams(chan);
2827
2828 switch (consumer_data.type) {
2829 case LTTNG_CONSUMER_KERNEL:
2830 break;
2831 case LTTNG_CONSUMER32_UST:
2832 case LTTNG_CONSUMER64_UST:
2833 /* Delete streams that might have been left in the stream list. */
2834 cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
2835 send_node) {
2836 cds_list_del(&stream->send_node);
2837 lttng_ustconsumer_del_stream(stream);
2838 uatomic_sub(&stream->chan->refcount, 1);
2839 assert(&chan->refcount);
2840 free(stream);
2841 }
2842 break;
2843 default:
2844 ERR("Unknown consumer_data type");
2845 assert(0);
2846 }
2847
2848 /*
2849 * Release our own refcount. Force channel deletion even if
2850 * streams were not initialized.
2851 */
2852 if (!uatomic_sub_return(&chan->refcount, 1)) {
2853 consumer_del_channel(chan);
2854 }
2855 rcu_read_unlock();
2856 goto restart;
2857 }
2858 case CONSUMER_CHANNEL_QUIT:
2859 /*
2860 * Remove the pipe from the poll set and continue the loop
2861 * since their might be data to consume.
2862 */
2863 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2864 continue;
2865 default:
2866 ERR("Unknown action");
2867 break;
2868 }
2869 }
2870
2871 /* Handle other stream */
2872 continue;
2873 }
2874
2875 rcu_read_lock();
2876 {
2877 uint64_t tmp_id = (uint64_t) pollfd;
2878
2879 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
2880 }
2881 node = lttng_ht_iter_get_node_u64(&iter);
2882 assert(node);
2883
2884 chan = caa_container_of(node, struct lttng_consumer_channel,
2885 wait_fd_node);
2886
2887 /* Check for error event */
2888 if (revents & (LPOLLERR | LPOLLHUP)) {
2889 DBG("Channel fd %d is hup|err.", pollfd);
2890
2891 lttng_poll_del(&events, chan->wait_fd);
2892 ret = lttng_ht_del(channel_ht, &iter);
2893 assert(ret == 0);
2894 consumer_close_channel_streams(chan);
2895
2896 /* Release our own refcount */
2897 if (!uatomic_sub_return(&chan->refcount, 1)
2898 && !uatomic_read(&chan->nb_init_stream_left)) {
2899 consumer_del_channel(chan);
2900 }
2901 }
2902
2903 /* Release RCU lock for the channel looked up */
2904 rcu_read_unlock();
2905 }
2906 }
2907
2908 /* All is OK */
2909 err = 0;
2910 end:
2911 lttng_poll_clean(&events);
2912 end_poll:
2913 destroy_channel_ht(channel_ht);
2914 end_ht:
2915 DBG("Channel poll thread exiting");
2916 if (err) {
2917 health_error();
2918 ERR("Health error occurred in %s", __func__);
2919 }
2920 health_unregister(health_consumerd);
2921 rcu_unregister_thread();
2922 return NULL;
2923 }
2924
2925 static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
2926 struct pollfd *sockpoll, int client_socket)
2927 {
2928 int ret;
2929
2930 assert(ctx);
2931 assert(sockpoll);
2932
2933 if (lttng_consumer_poll_socket(sockpoll) < 0) {
2934 ret = -1;
2935 goto error;
2936 }
2937 DBG("Metadata connection on client_socket");
2938
2939 /* Blocking call, waiting for transmission */
2940 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
2941 if (ctx->consumer_metadata_socket < 0) {
2942 WARN("On accept metadata");
2943 ret = -1;
2944 goto error;
2945 }
2946 ret = 0;
2947
2948 error:
2949 return ret;
2950 }
2951
2952 /*
2953 * This thread listens on the consumerd socket and receives the file
2954 * descriptors from the session daemon.
2955 */
2956 void *consumer_thread_sessiond_poll(void *data)
2957 {
2958 int sock = -1, client_socket, ret, err = -1;
2959 /*
2960 * structure to poll for incoming data on communication socket avoids
2961 * making blocking sockets.
2962 */
2963 struct pollfd consumer_sockpoll[2];
2964 struct lttng_consumer_local_data *ctx = data;
2965
2966 rcu_register_thread();
2967
2968 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
2969
2970 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
2971 unlink(ctx->consumer_command_sock_path);
2972 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
2973 if (client_socket < 0) {
2974 ERR("Cannot create command socket");
2975 goto end;
2976 }
2977
2978 ret = lttcomm_listen_unix_sock(client_socket);
2979 if (ret < 0) {
2980 goto end;
2981 }
2982
2983 DBG("Sending ready command to lttng-sessiond");
2984 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
2985 /* return < 0 on error, but == 0 is not fatal */
2986 if (ret < 0) {
2987 ERR("Error sending ready command to lttng-sessiond");
2988 goto end;
2989 }
2990
2991 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2992 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
2993 consumer_sockpoll[0].events = POLLIN | POLLPRI;
2994 consumer_sockpoll[1].fd = client_socket;
2995 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2996
2997 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2998 goto end;
2999 }
3000 DBG("Connection on client_socket");
3001
3002 /* Blocking call, waiting for transmission */
3003 sock = lttcomm_accept_unix_sock(client_socket);
3004 if (sock < 0) {
3005 WARN("On accept");
3006 goto end;
3007 }
3008
3009 /*
3010 * Setup metadata socket which is the second socket connection on the
3011 * command unix socket.
3012 */
3013 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
3014 if (ret < 0) {
3015 goto end;
3016 }
3017
3018 /* This socket is not useful anymore. */
3019 ret = close(client_socket);
3020 if (ret < 0) {
3021 PERROR("close client_socket");
3022 }
3023 client_socket = -1;
3024
3025 /* update the polling structure to poll on the established socket */
3026 consumer_sockpoll[1].fd = sock;
3027 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3028
3029 while (1) {
3030 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
3031 goto end;
3032 }
3033 DBG("Incoming command on sock");
3034 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
3035 if (ret == -ENOENT) {
3036 DBG("Received STOP command");
3037 goto end;
3038 }
3039 if (ret <= 0) {
3040 /*
3041 * This could simply be a session daemon quitting. Don't output
3042 * ERR() here.
3043 */
3044 DBG("Communication interrupted on command socket");
3045 err = 0;
3046 goto end;
3047 }
3048 if (consumer_quit) {
3049 DBG("consumer_thread_receive_fds received quit from signal");
3050 err = 0; /* All is OK */
3051 goto end;
3052 }
3053 DBG("received command on sock");
3054 }
3055 /* All is OK */
3056 err = 0;
3057
3058 end:
3059 DBG("Consumer thread sessiond poll exiting");
3060
3061 /*
3062 * Close metadata streams since the producer is the session daemon which
3063 * just died.
3064 *
3065 * NOTE: for now, this only applies to the UST tracer.
3066 */
3067 lttng_consumer_close_metadata();
3068
3069 /*
3070 * when all fds have hung up, the polling thread
3071 * can exit cleanly
3072 */
3073 consumer_quit = 1;
3074
3075 /*
3076 * Notify the data poll thread to poll back again and test the
3077 * consumer_quit state that we just set so to quit gracefully.
3078 */
3079 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
3080
3081 notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
3082
3083 /* Cleaning up possibly open sockets. */
3084 if (sock >= 0) {
3085 ret = close(sock);
3086 if (ret < 0) {
3087 PERROR("close sock sessiond poll");
3088 }
3089 }
3090 if (client_socket >= 0) {
3091 ret = close(client_socket);
3092 if (ret < 0) {
3093 PERROR("close client_socket sessiond poll");
3094 }
3095 }
3096
3097 if (err) {
3098 health_error();
3099 ERR("Health error occurred in %s", __func__);
3100 }
3101 health_unregister(health_consumerd);
3102
3103 rcu_unregister_thread();
3104 return NULL;
3105 }
3106
3107 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
3108 struct lttng_consumer_local_data *ctx)
3109 {
3110 ssize_t ret;
3111
3112 pthread_mutex_lock(&stream->lock);
3113 if (stream->metadata_flag) {
3114 pthread_mutex_lock(&stream->metadata_rdv_lock);
3115 }
3116
3117 switch (consumer_data.type) {
3118 case LTTNG_CONSUMER_KERNEL:
3119 ret = lttng_kconsumer_read_subbuffer(stream, ctx);
3120 break;
3121 case LTTNG_CONSUMER32_UST:
3122 case LTTNG_CONSUMER64_UST:
3123 ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
3124 break;
3125 default:
3126 ERR("Unknown consumer_data type");
3127 assert(0);
3128 ret = -ENOSYS;
3129 break;
3130 }
3131
3132 if (stream->metadata_flag) {
3133 pthread_cond_broadcast(&stream->metadata_rdv);
3134 pthread_mutex_unlock(&stream->metadata_rdv_lock);
3135 }
3136 pthread_mutex_unlock(&stream->lock);
3137 return ret;
3138 }
3139
3140 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3141 {
3142 switch (consumer_data.type) {
3143 case LTTNG_CONSUMER_KERNEL:
3144 return lttng_kconsumer_on_recv_stream(stream);
3145 case LTTNG_CONSUMER32_UST:
3146 case LTTNG_CONSUMER64_UST:
3147 return lttng_ustconsumer_on_recv_stream(stream);
3148 default:
3149 ERR("Unknown consumer_data type");
3150 assert(0);
3151 return -ENOSYS;
3152 }
3153 }
3154
3155 /*
3156 * Allocate and set consumer data hash tables.
3157 */
3158 void lttng_consumer_init(void)
3159 {
3160 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3161 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3162 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3163 consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3164 }
3165
3166 /*
3167 * Process the ADD_RELAYD command receive by a consumer.
3168 *
3169 * This will create a relayd socket pair and add it to the relayd hash table.
3170 * The caller MUST acquire a RCU read side lock before calling it.
3171 */
3172 int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
3173 struct lttng_consumer_local_data *ctx, int sock,
3174 struct pollfd *consumer_sockpoll,
3175 struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
3176 uint64_t relayd_session_id)
3177 {
3178 int fd = -1, ret = -1, relayd_created = 0;
3179 enum lttng_error_code ret_code = LTTNG_OK;
3180 struct consumer_relayd_sock_pair *relayd = NULL;
3181
3182 assert(ctx);
3183 assert(relayd_sock);
3184
3185 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
3186
3187 /* Get relayd reference if exists. */
3188 relayd = consumer_find_relayd(net_seq_idx);
3189 if (relayd == NULL) {
3190 assert(sock_type == LTTNG_STREAM_CONTROL);
3191 /* Not found. Allocate one. */
3192 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3193 if (relayd == NULL) {
3194 ret = -ENOMEM;
3195 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3196 goto error;
3197 } else {
3198 relayd->sessiond_session_id = sessiond_id;
3199 relayd_created = 1;
3200 }
3201
3202 /*
3203 * This code path MUST continue to the consumer send status message to
3204 * we can notify the session daemon and continue our work without
3205 * killing everything.
3206 */
3207 } else {
3208 /*
3209 * relayd key should never be found for control socket.
3210 */
3211 assert(sock_type != LTTNG_STREAM_CONTROL);
3212 }
3213
3214 /* First send a status message before receiving the fds. */
3215 ret = consumer_send_status_msg(sock, LTTNG_OK);
3216 if (ret < 0) {
3217 /* Somehow, the session daemon is not responding anymore. */
3218 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3219 goto error_nosignal;
3220 }
3221
3222 /* Poll on consumer socket. */
3223 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
3224 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3225 ret = -EINTR;
3226 goto error_nosignal;
3227 }
3228
3229 /* Get relayd socket from session daemon */
3230 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3231 if (ret != sizeof(fd)) {
3232 ret = -1;
3233 fd = -1; /* Just in case it gets set with an invalid value. */
3234
3235 /*
3236 * Failing to receive FDs might indicate a major problem such as
3237 * reaching a fd limit during the receive where the kernel returns a
3238 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3239 * don't take any chances and stop everything.
3240 *
3241 * XXX: Feature request #558 will fix that and avoid this possible
3242 * issue when reaching the fd limit.
3243 */
3244 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
3245 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
3246 goto error;
3247 }
3248
3249 /* Copy socket information and received FD */
3250 switch (sock_type) {
3251 case LTTNG_STREAM_CONTROL:
3252 /* Copy received lttcomm socket */
3253 lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
3254 ret = lttcomm_create_sock(&relayd->control_sock.sock);
3255 /* Handle create_sock error. */
3256 if (ret < 0) {
3257 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3258 goto error;
3259 }
3260 /*
3261 * Close the socket created internally by
3262 * lttcomm_create_sock, so we can replace it by the one
3263 * received from sessiond.
3264 */
3265 if (close(relayd->control_sock.sock.fd)) {
3266 PERROR("close");
3267 }
3268
3269 /* Assign new file descriptor */
3270 relayd->control_sock.sock.fd = fd;
3271 fd = -1; /* For error path */
3272 /* Assign version values. */
3273 relayd->control_sock.major = relayd_sock->major;
3274 relayd->control_sock.minor = relayd_sock->minor;
3275
3276 relayd->relayd_session_id = relayd_session_id;
3277
3278 break;
3279 case LTTNG_STREAM_DATA:
3280 /* Copy received lttcomm socket */
3281 lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
3282 ret = lttcomm_create_sock(&relayd->data_sock.sock);
3283 /* Handle create_sock error. */
3284 if (ret < 0) {
3285 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3286 goto error;
3287 }
3288 /*
3289 * Close the socket created internally by
3290 * lttcomm_create_sock, so we can replace it by the one
3291 * received from sessiond.
3292 */
3293 if (close(relayd->data_sock.sock.fd)) {
3294 PERROR("close");
3295 }
3296
3297 /* Assign new file descriptor */
3298 relayd->data_sock.sock.fd = fd;
3299 fd = -1; /* for eventual error paths */
3300 /* Assign version values. */
3301 relayd->data_sock.major = relayd_sock->major;
3302 relayd->data_sock.minor = relayd_sock->minor;
3303 break;
3304 default:
3305 ERR("Unknown relayd socket type (%d)", sock_type);
3306 ret = -1;
3307 ret_code = LTTCOMM_CONSUMERD_FATAL;
3308 goto error;
3309 }
3310
3311 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
3312 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3313 relayd->net_seq_idx, fd);
3314
3315 /* We successfully added the socket. Send status back. */
3316 ret = consumer_send_status_msg(sock, ret_code);
3317 if (ret < 0) {
3318 /* Somehow, the session daemon is not responding anymore. */
3319 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3320 goto error_nosignal;
3321 }
3322
3323 /*
3324 * Add relayd socket pair to consumer data hashtable. If object already
3325 * exists or on error, the function gracefully returns.
3326 */
3327 add_relayd(relayd);
3328
3329 /* All good! */
3330 return 0;
3331
3332 error:
3333 if (consumer_send_status_msg(sock, ret_code) < 0) {
3334 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3335 }
3336
3337 error_nosignal:
3338 /* Close received socket if valid. */
3339 if (fd >= 0) {
3340 if (close(fd)) {
3341 PERROR("close received socket");
3342 }
3343 }
3344
3345 if (relayd_created) {
3346 free(relayd);
3347 }
3348
3349 return ret;
3350 }
3351
3352 /*
3353 * Try to lock the stream mutex.
3354 *
3355 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
3356 */
3357 static int stream_try_lock(struct lttng_consumer_stream *stream)
3358 {
3359 int ret;
3360
3361 assert(stream);
3362
3363 /*
3364 * Try to lock the stream mutex. On failure, we know that the stream is
3365 * being used else where hence there is data still being extracted.
3366 */
3367 ret = pthread_mutex_trylock(&stream->lock);
3368 if (ret) {
3369 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
3370 ret = 0;
3371 goto end;
3372 }
3373
3374 ret = 1;
3375
3376 end:
3377 return ret;
3378 }
3379
3380 /*
3381 * Search for a relayd associated to the session id and return the reference.
3382 *
3383 * A rcu read side lock MUST be acquire before calling this function and locked
3384 * until the relayd object is no longer necessary.
3385 */
3386 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3387 {
3388 struct lttng_ht_iter iter;
3389 struct consumer_relayd_sock_pair *relayd = NULL;
3390
3391 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3392 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
3393 node.node) {
3394 /*
3395 * Check by sessiond id which is unique here where the relayd session
3396 * id might not be when having multiple relayd.
3397 */
3398 if (relayd->sessiond_session_id == id) {
3399 /* Found the relayd. There can be only one per id. */
3400 goto found;
3401 }
3402 }
3403
3404 return NULL;
3405
3406 found:
3407 return relayd;
3408 }
3409
3410 /*
3411 * Check if for a given session id there is still data needed to be extract
3412 * from the buffers.
3413 *
3414 * Return 1 if data is pending or else 0 meaning ready to be read.
3415 */
3416 int consumer_data_pending(uint64_t id)
3417 {
3418 int ret;
3419 struct lttng_ht_iter iter;
3420 struct lttng_ht *ht;
3421 struct lttng_consumer_stream *stream;
3422 struct consumer_relayd_sock_pair *relayd = NULL;
3423 int (*data_pending)(struct lttng_consumer_stream *);
3424
3425 DBG("Consumer data pending command on session id %" PRIu64, id);
3426
3427 rcu_read_lock();
3428 pthread_mutex_lock(&consumer_data.lock);
3429
3430 switch (consumer_data.type) {
3431 case LTTNG_CONSUMER_KERNEL:
3432 data_pending = lttng_kconsumer_data_pending;
3433 break;
3434 case LTTNG_CONSUMER32_UST:
3435 case LTTNG_CONSUMER64_UST:
3436 data_pending = lttng_ustconsumer_data_pending;
3437 break;
3438 default:
3439 ERR("Unknown consumer data type");
3440 assert(0);
3441 }
3442
3443 /* Ease our life a bit */
3444 ht = consumer_data.stream_list_ht;
3445
3446 relayd = find_relayd_by_session_id(id);
3447 if (relayd) {
3448 /* Send init command for data pending. */
3449 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3450 ret = relayd_begin_data_pending(&relayd->control_sock,
3451 relayd->relayd_session_id);
3452 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3453 if (ret < 0) {
3454 /* Communication error thus the relayd so no data pending. */
3455 goto data_not_pending;
3456 }
3457 }
3458
3459 cds_lfht_for_each_entry_duplicate(ht->ht,
3460 ht->hash_fct(&id, lttng_ht_seed),
3461 ht->match_fct, &id,
3462 &iter.iter, stream, node_session_id.node) {
3463 /* If this call fails, the stream is being used hence data pending. */
3464 ret = stream_try_lock(stream);
3465 if (!ret) {
3466 goto data_pending;
3467 }
3468
3469 /*
3470 * A removed node from the hash table indicates that the stream has
3471 * been deleted thus having a guarantee that the buffers are closed
3472 * on the consumer side. However, data can still be transmitted
3473 * over the network so don't skip the relayd check.
3474 */
3475 ret = cds_lfht_is_node_deleted(&stream->node.node);
3476 if (!ret) {
3477 /*
3478 * An empty output file is not valid. We need at least one packet
3479 * generated per stream, even if it contains no event, so it
3480 * contains at least one packet header.
3481 */
3482 if (stream->output_written == 0) {
3483 pthread_mutex_unlock(&stream->lock);
3484 goto data_pending;
3485 }
3486 /* Check the stream if there is data in the buffers. */
3487 ret = data_pending(stream);
3488 if (ret == 1) {
3489 pthread_mutex_unlock(&stream->lock);
3490 goto data_pending;
3491 }
3492 }
3493
3494 /* Relayd check */
3495 if (relayd) {
3496 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3497 if (stream->metadata_flag) {
3498 ret = relayd_quiescent_control(&relayd->control_sock,
3499 stream->relayd_stream_id);
3500 } else {
3501 ret = relayd_data_pending(&relayd->control_sock,
3502 stream->relayd_stream_id,
3503 stream->next_net_seq_num - 1);
3504 }
3505 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3506 if (ret == 1) {
3507 pthread_mutex_unlock(&stream->lock);
3508 goto data_pending;
3509 }
3510 }
3511 pthread_mutex_unlock(&stream->lock);
3512 }
3513
3514 if (relayd) {
3515 unsigned int is_data_inflight = 0;
3516
3517 /* Send init command for data pending. */
3518 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3519 ret = relayd_end_data_pending(&relayd->control_sock,
3520 relayd->relayd_session_id, &is_data_inflight);
3521 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3522 if (ret < 0) {
3523 goto data_not_pending;
3524 }
3525 if (is_data_inflight) {
3526 goto data_pending;
3527 }
3528 }
3529
3530 /*
3531 * Finding _no_ node in the hash table and no inflight data means that the
3532 * stream(s) have been removed thus data is guaranteed to be available for
3533 * analysis from the trace files.
3534 */
3535
3536 data_not_pending:
3537 /* Data is available to be read by a viewer. */
3538 pthread_mutex_unlock(&consumer_data.lock);
3539 rcu_read_unlock();
3540 return 0;
3541
3542 data_pending:
3543 /* Data is still being extracted from buffers. */
3544 pthread_mutex_unlock(&consumer_data.lock);
3545 rcu_read_unlock();
3546 return 1;
3547 }
3548
3549 /*
3550 * Send a ret code status message to the sessiond daemon.
3551 *
3552 * Return the sendmsg() return value.
3553 */
3554 int consumer_send_status_msg(int sock, int ret_code)
3555 {
3556 struct lttcomm_consumer_status_msg msg;
3557
3558 msg.ret_code = ret_code;
3559
3560 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3561 }
3562
3563 /*
3564 * Send a channel status message to the sessiond daemon.
3565 *
3566 * Return the sendmsg() return value.
3567 */
3568 int consumer_send_status_channel(int sock,
3569 struct lttng_consumer_channel *channel)
3570 {
3571 struct lttcomm_consumer_status_channel msg;
3572
3573 assert(sock >= 0);
3574
3575 if (!channel) {
3576 msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
3577 } else {
3578 msg.ret_code = LTTNG_OK;
3579 msg.key = channel->key;
3580 msg.stream_count = channel->streams.count;
3581 }
3582
3583 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3584 }
3585
3586 /*
3587 * Using a maximum stream size with the produced and consumed position of a
3588 * stream, computes the new consumed position to be as close as possible to the
3589 * maximum possible stream size.
3590 *
3591 * If maximum stream size is lower than the possible buffer size (produced -
3592 * consumed), the consumed_pos given is returned untouched else the new value
3593 * is returned.
3594 */
3595 unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
3596 unsigned long produced_pos, uint64_t max_stream_size)
3597 {
3598 if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
3599 /* Offset from the produced position to get the latest buffers. */
3600 return produced_pos - max_stream_size;
3601 }
3602
3603 return consumed_pos;
3604 }
This page took 0.146463 seconds and 5 git commands to generate.