consumerd: add health instrumentation into threads
[lttng-tools.git] / src / common / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <inttypes.h>
31 #include <signal.h>
32
33 #include <common/common.h>
34 #include <common/utils.h>
35 #include <common/compat/poll.h>
36 #include <common/index/index.h>
37 #include <common/kernel-ctl/kernel-ctl.h>
38 #include <common/sessiond-comm/relayd.h>
39 #include <common/sessiond-comm/sessiond-comm.h>
40 #include <common/kernel-consumer/kernel-consumer.h>
41 #include <common/relayd/relayd.h>
42 #include <common/ust-consumer/ust-consumer.h>
43 #include <common/consumer-timer.h>
44
45 #include "consumer.h"
46 #include "consumer-stream.h"
47 #include "../bin/lttng-consumerd/health-consumerd.h"
48
49 struct lttng_consumer_global_data consumer_data = {
50 .stream_count = 0,
51 .need_update = 1,
52 .type = LTTNG_CONSUMER_UNKNOWN,
53 };
54
55 enum consumer_channel_action {
56 CONSUMER_CHANNEL_ADD,
57 CONSUMER_CHANNEL_DEL,
58 CONSUMER_CHANNEL_QUIT,
59 };
60
61 struct consumer_channel_msg {
62 enum consumer_channel_action action;
63 struct lttng_consumer_channel *chan; /* add */
64 uint64_t key; /* del */
65 };
66
67 /*
68 * Flag to inform the polling thread to quit when all fd hung up. Updated by
69 * the consumer_thread_receive_fds when it notices that all fds has hung up.
70 * Also updated by the signal handler (consumer_should_exit()). Read by the
71 * polling threads.
72 */
73 volatile int consumer_quit;
74
75 /*
76 * Global hash table containing respectively metadata and data streams. The
77 * stream element in this ht should only be updated by the metadata poll thread
78 * for the metadata and the data poll thread for the data.
79 */
80 static struct lttng_ht *metadata_ht;
81 static struct lttng_ht *data_ht;
82
83 /*
84 * Notify a thread lttng pipe to poll back again. This usually means that some
85 * global state has changed so we just send back the thread in a poll wait
86 * call.
87 */
88 static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
89 {
90 struct lttng_consumer_stream *null_stream = NULL;
91
92 assert(pipe);
93
94 (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
95 }
96
97 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
98 struct lttng_consumer_channel *chan,
99 uint64_t key,
100 enum consumer_channel_action action)
101 {
102 struct consumer_channel_msg msg;
103 int ret;
104
105 memset(&msg, 0, sizeof(msg));
106
107 msg.action = action;
108 msg.chan = chan;
109 msg.key = key;
110 do {
111 ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
112 } while (ret < 0 && errno == EINTR);
113 }
114
115 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
116 uint64_t key)
117 {
118 notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
119 }
120
121 static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
122 struct lttng_consumer_channel **chan,
123 uint64_t *key,
124 enum consumer_channel_action *action)
125 {
126 struct consumer_channel_msg msg;
127 int ret;
128
129 do {
130 ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
131 } while (ret < 0 && errno == EINTR);
132 if (ret > 0) {
133 *action = msg.action;
134 *chan = msg.chan;
135 *key = msg.key;
136 }
137 return ret;
138 }
139
140 /*
141 * Find a stream. The consumer_data.lock must be locked during this
142 * call.
143 */
144 static struct lttng_consumer_stream *find_stream(uint64_t key,
145 struct lttng_ht *ht)
146 {
147 struct lttng_ht_iter iter;
148 struct lttng_ht_node_u64 *node;
149 struct lttng_consumer_stream *stream = NULL;
150
151 assert(ht);
152
153 /* -1ULL keys are lookup failures */
154 if (key == (uint64_t) -1ULL) {
155 return NULL;
156 }
157
158 rcu_read_lock();
159
160 lttng_ht_lookup(ht, &key, &iter);
161 node = lttng_ht_iter_get_node_u64(&iter);
162 if (node != NULL) {
163 stream = caa_container_of(node, struct lttng_consumer_stream, node);
164 }
165
166 rcu_read_unlock();
167
168 return stream;
169 }
170
171 static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
172 {
173 struct lttng_consumer_stream *stream;
174
175 rcu_read_lock();
176 stream = find_stream(key, ht);
177 if (stream) {
178 stream->key = (uint64_t) -1ULL;
179 /*
180 * We don't want the lookup to match, but we still need
181 * to iterate on this stream when iterating over the hash table. Just
182 * change the node key.
183 */
184 stream->node.key = (uint64_t) -1ULL;
185 }
186 rcu_read_unlock();
187 }
188
189 /*
190 * Return a channel object for the given key.
191 *
192 * RCU read side lock MUST be acquired before calling this function and
193 * protects the channel ptr.
194 */
195 struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
196 {
197 struct lttng_ht_iter iter;
198 struct lttng_ht_node_u64 *node;
199 struct lttng_consumer_channel *channel = NULL;
200
201 /* -1ULL keys are lookup failures */
202 if (key == (uint64_t) -1ULL) {
203 return NULL;
204 }
205
206 lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
207 node = lttng_ht_iter_get_node_u64(&iter);
208 if (node != NULL) {
209 channel = caa_container_of(node, struct lttng_consumer_channel, node);
210 }
211
212 return channel;
213 }
214
215 static void free_stream_rcu(struct rcu_head *head)
216 {
217 struct lttng_ht_node_u64 *node =
218 caa_container_of(head, struct lttng_ht_node_u64, head);
219 struct lttng_consumer_stream *stream =
220 caa_container_of(node, struct lttng_consumer_stream, node);
221
222 free(stream);
223 }
224
225 static void free_channel_rcu(struct rcu_head *head)
226 {
227 struct lttng_ht_node_u64 *node =
228 caa_container_of(head, struct lttng_ht_node_u64, head);
229 struct lttng_consumer_channel *channel =
230 caa_container_of(node, struct lttng_consumer_channel, node);
231
232 free(channel);
233 }
234
235 /*
236 * RCU protected relayd socket pair free.
237 */
238 static void free_relayd_rcu(struct rcu_head *head)
239 {
240 struct lttng_ht_node_u64 *node =
241 caa_container_of(head, struct lttng_ht_node_u64, head);
242 struct consumer_relayd_sock_pair *relayd =
243 caa_container_of(node, struct consumer_relayd_sock_pair, node);
244
245 /*
246 * Close all sockets. This is done in the call RCU since we don't want the
247 * socket fds to be reassigned thus potentially creating bad state of the
248 * relayd object.
249 *
250 * We do not have to lock the control socket mutex here since at this stage
251 * there is no one referencing to this relayd object.
252 */
253 (void) relayd_close(&relayd->control_sock);
254 (void) relayd_close(&relayd->data_sock);
255
256 free(relayd);
257 }
258
259 /*
260 * Destroy and free relayd socket pair object.
261 */
262 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
263 {
264 int ret;
265 struct lttng_ht_iter iter;
266
267 if (relayd == NULL) {
268 return;
269 }
270
271 DBG("Consumer destroy and close relayd socket pair");
272
273 iter.iter.node = &relayd->node.node;
274 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
275 if (ret != 0) {
276 /* We assume the relayd is being or is destroyed */
277 return;
278 }
279
280 /* RCU free() call */
281 call_rcu(&relayd->node.head, free_relayd_rcu);
282 }
283
284 /*
285 * Remove a channel from the global list protected by a mutex. This function is
286 * also responsible for freeing its data structures.
287 */
288 void consumer_del_channel(struct lttng_consumer_channel *channel)
289 {
290 int ret;
291 struct lttng_ht_iter iter;
292 struct lttng_consumer_stream *stream, *stmp;
293
294 DBG("Consumer delete channel key %" PRIu64, channel->key);
295
296 pthread_mutex_lock(&consumer_data.lock);
297 pthread_mutex_lock(&channel->lock);
298
299 /* Delete streams that might have been left in the stream list. */
300 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
301 send_node) {
302 cds_list_del(&stream->send_node);
303 /*
304 * Once a stream is added to this list, the buffers were created so
305 * we have a guarantee that this call will succeed.
306 */
307 consumer_stream_destroy(stream, NULL);
308 }
309
310 if (channel->live_timer_enabled == 1) {
311 consumer_timer_live_stop(channel);
312 }
313
314 switch (consumer_data.type) {
315 case LTTNG_CONSUMER_KERNEL:
316 break;
317 case LTTNG_CONSUMER32_UST:
318 case LTTNG_CONSUMER64_UST:
319 lttng_ustconsumer_del_channel(channel);
320 break;
321 default:
322 ERR("Unknown consumer_data type");
323 assert(0);
324 goto end;
325 }
326
327 rcu_read_lock();
328 iter.iter.node = &channel->node.node;
329 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
330 assert(!ret);
331 rcu_read_unlock();
332
333 call_rcu(&channel->node.head, free_channel_rcu);
334 end:
335 pthread_mutex_unlock(&channel->lock);
336 pthread_mutex_unlock(&consumer_data.lock);
337 }
338
339 /*
340 * Iterate over the relayd hash table and destroy each element. Finally,
341 * destroy the whole hash table.
342 */
343 static void cleanup_relayd_ht(void)
344 {
345 struct lttng_ht_iter iter;
346 struct consumer_relayd_sock_pair *relayd;
347
348 rcu_read_lock();
349
350 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
351 node.node) {
352 consumer_destroy_relayd(relayd);
353 }
354
355 rcu_read_unlock();
356
357 lttng_ht_destroy(consumer_data.relayd_ht);
358 }
359
360 /*
361 * Update the end point status of all streams having the given network sequence
362 * index (relayd index).
363 *
364 * It's atomically set without having the stream mutex locked which is fine
365 * because we handle the write/read race with a pipe wakeup for each thread.
366 */
367 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
368 enum consumer_endpoint_status status)
369 {
370 struct lttng_ht_iter iter;
371 struct lttng_consumer_stream *stream;
372
373 DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
374
375 rcu_read_lock();
376
377 /* Let's begin with metadata */
378 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
379 if (stream->net_seq_idx == net_seq_idx) {
380 uatomic_set(&stream->endpoint_status, status);
381 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
382 }
383 }
384
385 /* Follow up by the data streams */
386 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
387 if (stream->net_seq_idx == net_seq_idx) {
388 uatomic_set(&stream->endpoint_status, status);
389 DBG("Delete flag set to data stream %d", stream->wait_fd);
390 }
391 }
392 rcu_read_unlock();
393 }
394
395 /*
396 * Cleanup a relayd object by flagging every associated streams for deletion,
397 * destroying the object meaning removing it from the relayd hash table,
398 * closing the sockets and freeing the memory in a RCU call.
399 *
400 * If a local data context is available, notify the threads that the streams'
401 * state have changed.
402 */
403 static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
404 struct lttng_consumer_local_data *ctx)
405 {
406 uint64_t netidx;
407
408 assert(relayd);
409
410 DBG("Cleaning up relayd sockets");
411
412 /* Save the net sequence index before destroying the object */
413 netidx = relayd->net_seq_idx;
414
415 /*
416 * Delete the relayd from the relayd hash table, close the sockets and free
417 * the object in a RCU call.
418 */
419 consumer_destroy_relayd(relayd);
420
421 /* Set inactive endpoint to all streams */
422 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
423
424 /*
425 * With a local data context, notify the threads that the streams' state
426 * have changed. The write() action on the pipe acts as an "implicit"
427 * memory barrier ordering the updates of the end point status from the
428 * read of this status which happens AFTER receiving this notify.
429 */
430 if (ctx) {
431 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
432 notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
433 }
434 }
435
436 /*
437 * Flag a relayd socket pair for destruction. Destroy it if the refcount
438 * reaches zero.
439 *
440 * RCU read side lock MUST be aquired before calling this function.
441 */
442 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
443 {
444 assert(relayd);
445
446 /* Set destroy flag for this object */
447 uatomic_set(&relayd->destroy_flag, 1);
448
449 /* Destroy the relayd if refcount is 0 */
450 if (uatomic_read(&relayd->refcount) == 0) {
451 consumer_destroy_relayd(relayd);
452 }
453 }
454
455 /*
456 * Completly destroy stream from every visiable data structure and the given
457 * hash table if one.
458 *
459 * One this call returns, the stream object is not longer usable nor visible.
460 */
461 void consumer_del_stream(struct lttng_consumer_stream *stream,
462 struct lttng_ht *ht)
463 {
464 consumer_stream_destroy(stream, ht);
465 }
466
467 /*
468 * XXX naming of del vs destroy is all mixed up.
469 */
470 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
471 {
472 consumer_stream_destroy(stream, data_ht);
473 }
474
475 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
476 {
477 consumer_stream_destroy(stream, metadata_ht);
478 }
479
480 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
481 uint64_t stream_key,
482 enum lttng_consumer_stream_state state,
483 const char *channel_name,
484 uid_t uid,
485 gid_t gid,
486 uint64_t relayd_id,
487 uint64_t session_id,
488 int cpu,
489 int *alloc_ret,
490 enum consumer_channel_type type,
491 unsigned int monitor)
492 {
493 int ret;
494 struct lttng_consumer_stream *stream;
495
496 stream = zmalloc(sizeof(*stream));
497 if (stream == NULL) {
498 PERROR("malloc struct lttng_consumer_stream");
499 ret = -ENOMEM;
500 goto end;
501 }
502
503 rcu_read_lock();
504
505 stream->key = stream_key;
506 stream->out_fd = -1;
507 stream->out_fd_offset = 0;
508 stream->output_written = 0;
509 stream->state = state;
510 stream->uid = uid;
511 stream->gid = gid;
512 stream->net_seq_idx = relayd_id;
513 stream->session_id = session_id;
514 stream->monitor = monitor;
515 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
516 stream->index_fd = -1;
517 pthread_mutex_init(&stream->lock, NULL);
518
519 /* If channel is the metadata, flag this stream as metadata. */
520 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
521 stream->metadata_flag = 1;
522 /* Metadata is flat out. */
523 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
524 /* Live rendez-vous point. */
525 pthread_cond_init(&stream->metadata_rdv, NULL);
526 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
527 } else {
528 /* Format stream name to <channel_name>_<cpu_number> */
529 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
530 channel_name, cpu);
531 if (ret < 0) {
532 PERROR("snprintf stream name");
533 goto error;
534 }
535 }
536
537 /* Key is always the wait_fd for streams. */
538 lttng_ht_node_init_u64(&stream->node, stream->key);
539
540 /* Init node per channel id key */
541 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
542
543 /* Init session id node with the stream session id */
544 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
545
546 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
547 " relayd_id %" PRIu64 ", session_id %" PRIu64,
548 stream->name, stream->key, channel_key,
549 stream->net_seq_idx, stream->session_id);
550
551 rcu_read_unlock();
552 return stream;
553
554 error:
555 rcu_read_unlock();
556 free(stream);
557 end:
558 if (alloc_ret) {
559 *alloc_ret = ret;
560 }
561 return NULL;
562 }
563
564 /*
565 * Add a stream to the global list protected by a mutex.
566 */
567 int consumer_add_data_stream(struct lttng_consumer_stream *stream)
568 {
569 struct lttng_ht *ht = data_ht;
570 int ret = 0;
571
572 assert(stream);
573 assert(ht);
574
575 DBG3("Adding consumer stream %" PRIu64, stream->key);
576
577 pthread_mutex_lock(&consumer_data.lock);
578 pthread_mutex_lock(&stream->chan->lock);
579 pthread_mutex_lock(&stream->chan->timer_lock);
580 pthread_mutex_lock(&stream->lock);
581 rcu_read_lock();
582
583 /* Steal stream identifier to avoid having streams with the same key */
584 steal_stream_key(stream->key, ht);
585
586 lttng_ht_add_unique_u64(ht, &stream->node);
587
588 lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
589 &stream->node_channel_id);
590
591 /*
592 * Add stream to the stream_list_ht of the consumer data. No need to steal
593 * the key since the HT does not use it and we allow to add redundant keys
594 * into this table.
595 */
596 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
597
598 /*
599 * When nb_init_stream_left reaches 0, we don't need to trigger any action
600 * in terms of destroying the associated channel, because the action that
601 * causes the count to become 0 also causes a stream to be added. The
602 * channel deletion will thus be triggered by the following removal of this
603 * stream.
604 */
605 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
606 /* Increment refcount before decrementing nb_init_stream_left */
607 cmm_smp_wmb();
608 uatomic_dec(&stream->chan->nb_init_stream_left);
609 }
610
611 /* Update consumer data once the node is inserted. */
612 consumer_data.stream_count++;
613 consumer_data.need_update = 1;
614
615 rcu_read_unlock();
616 pthread_mutex_unlock(&stream->lock);
617 pthread_mutex_unlock(&stream->chan->timer_lock);
618 pthread_mutex_unlock(&stream->chan->lock);
619 pthread_mutex_unlock(&consumer_data.lock);
620
621 return ret;
622 }
623
624 void consumer_del_data_stream(struct lttng_consumer_stream *stream)
625 {
626 consumer_del_stream(stream, data_ht);
627 }
628
629 /*
630 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
631 * be acquired before calling this.
632 */
633 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
634 {
635 int ret = 0;
636 struct lttng_ht_node_u64 *node;
637 struct lttng_ht_iter iter;
638
639 assert(relayd);
640
641 lttng_ht_lookup(consumer_data.relayd_ht,
642 &relayd->net_seq_idx, &iter);
643 node = lttng_ht_iter_get_node_u64(&iter);
644 if (node != NULL) {
645 goto end;
646 }
647 lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
648
649 end:
650 return ret;
651 }
652
653 /*
654 * Allocate and return a consumer relayd socket.
655 */
656 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
657 uint64_t net_seq_idx)
658 {
659 struct consumer_relayd_sock_pair *obj = NULL;
660
661 /* net sequence index of -1 is a failure */
662 if (net_seq_idx == (uint64_t) -1ULL) {
663 goto error;
664 }
665
666 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
667 if (obj == NULL) {
668 PERROR("zmalloc relayd sock");
669 goto error;
670 }
671
672 obj->net_seq_idx = net_seq_idx;
673 obj->refcount = 0;
674 obj->destroy_flag = 0;
675 obj->control_sock.sock.fd = -1;
676 obj->data_sock.sock.fd = -1;
677 lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
678 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
679
680 error:
681 return obj;
682 }
683
684 /*
685 * Find a relayd socket pair in the global consumer data.
686 *
687 * Return the object if found else NULL.
688 * RCU read-side lock must be held across this call and while using the
689 * returned object.
690 */
691 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
692 {
693 struct lttng_ht_iter iter;
694 struct lttng_ht_node_u64 *node;
695 struct consumer_relayd_sock_pair *relayd = NULL;
696
697 /* Negative keys are lookup failures */
698 if (key == (uint64_t) -1ULL) {
699 goto error;
700 }
701
702 lttng_ht_lookup(consumer_data.relayd_ht, &key,
703 &iter);
704 node = lttng_ht_iter_get_node_u64(&iter);
705 if (node != NULL) {
706 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
707 }
708
709 error:
710 return relayd;
711 }
712
713 /*
714 * Find a relayd and send the stream
715 *
716 * Returns 0 on success, < 0 on error
717 */
718 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
719 char *path)
720 {
721 int ret = 0;
722 struct consumer_relayd_sock_pair *relayd;
723
724 assert(stream);
725 assert(stream->net_seq_idx != -1ULL);
726 assert(path);
727
728 /* The stream is not metadata. Get relayd reference if exists. */
729 rcu_read_lock();
730 relayd = consumer_find_relayd(stream->net_seq_idx);
731 if (relayd != NULL) {
732 /* Add stream on the relayd */
733 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
734 ret = relayd_add_stream(&relayd->control_sock, stream->name,
735 path, &stream->relayd_stream_id,
736 stream->chan->tracefile_size, stream->chan->tracefile_count);
737 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
738 if (ret < 0) {
739 goto end;
740 }
741
742 uatomic_inc(&relayd->refcount);
743 stream->sent_to_relayd = 1;
744 } else {
745 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
746 stream->key, stream->net_seq_idx);
747 ret = -1;
748 goto end;
749 }
750
751 DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
752 stream->name, stream->key, stream->net_seq_idx);
753
754 end:
755 rcu_read_unlock();
756 return ret;
757 }
758
759 /*
760 * Find a relayd and close the stream
761 */
762 void close_relayd_stream(struct lttng_consumer_stream *stream)
763 {
764 struct consumer_relayd_sock_pair *relayd;
765
766 /* The stream is not metadata. Get relayd reference if exists. */
767 rcu_read_lock();
768 relayd = consumer_find_relayd(stream->net_seq_idx);
769 if (relayd) {
770 consumer_stream_relayd_close(stream, relayd);
771 }
772 rcu_read_unlock();
773 }
774
775 /*
776 * Handle stream for relayd transmission if the stream applies for network
777 * streaming where the net sequence index is set.
778 *
779 * Return destination file descriptor or negative value on error.
780 */
781 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
782 size_t data_size, unsigned long padding,
783 struct consumer_relayd_sock_pair *relayd)
784 {
785 int outfd = -1, ret;
786 struct lttcomm_relayd_data_hdr data_hdr;
787
788 /* Safety net */
789 assert(stream);
790 assert(relayd);
791
792 /* Reset data header */
793 memset(&data_hdr, 0, sizeof(data_hdr));
794
795 if (stream->metadata_flag) {
796 /* Caller MUST acquire the relayd control socket lock */
797 ret = relayd_send_metadata(&relayd->control_sock, data_size);
798 if (ret < 0) {
799 goto error;
800 }
801
802 /* Metadata are always sent on the control socket. */
803 outfd = relayd->control_sock.sock.fd;
804 } else {
805 /* Set header with stream information */
806 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
807 data_hdr.data_size = htobe32(data_size);
808 data_hdr.padding_size = htobe32(padding);
809 /*
810 * Note that net_seq_num below is assigned with the *current* value of
811 * next_net_seq_num and only after that the next_net_seq_num will be
812 * increment. This is why when issuing a command on the relayd using
813 * this next value, 1 should always be substracted in order to compare
814 * the last seen sequence number on the relayd side to the last sent.
815 */
816 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
817 /* Other fields are zeroed previously */
818
819 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
820 sizeof(data_hdr));
821 if (ret < 0) {
822 goto error;
823 }
824
825 ++stream->next_net_seq_num;
826
827 /* Set to go on data socket */
828 outfd = relayd->data_sock.sock.fd;
829 }
830
831 error:
832 return outfd;
833 }
834
835 /*
836 * Allocate and return a new lttng_consumer_channel object using the given key
837 * to initialize the hash table node.
838 *
839 * On error, return NULL.
840 */
841 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
842 uint64_t session_id,
843 const char *pathname,
844 const char *name,
845 uid_t uid,
846 gid_t gid,
847 uint64_t relayd_id,
848 enum lttng_event_output output,
849 uint64_t tracefile_size,
850 uint64_t tracefile_count,
851 uint64_t session_id_per_pid,
852 unsigned int monitor,
853 unsigned int live_timer_interval)
854 {
855 struct lttng_consumer_channel *channel;
856
857 channel = zmalloc(sizeof(*channel));
858 if (channel == NULL) {
859 PERROR("malloc struct lttng_consumer_channel");
860 goto end;
861 }
862
863 channel->key = key;
864 channel->refcount = 0;
865 channel->session_id = session_id;
866 channel->session_id_per_pid = session_id_per_pid;
867 channel->uid = uid;
868 channel->gid = gid;
869 channel->relayd_id = relayd_id;
870 channel->output = output;
871 channel->tracefile_size = tracefile_size;
872 channel->tracefile_count = tracefile_count;
873 channel->monitor = monitor;
874 channel->live_timer_interval = live_timer_interval;
875 pthread_mutex_init(&channel->lock, NULL);
876 pthread_mutex_init(&channel->timer_lock, NULL);
877
878 /*
879 * In monitor mode, the streams associated with the channel will be put in
880 * a special list ONLY owned by this channel. So, the refcount is set to 1
881 * here meaning that the channel itself has streams that are referenced.
882 *
883 * On a channel deletion, once the channel is no longer visible, the
884 * refcount is decremented and checked for a zero value to delete it. With
885 * streams in no monitor mode, it will now be safe to destroy the channel.
886 */
887 if (!channel->monitor) {
888 channel->refcount = 1;
889 }
890
891 strncpy(channel->pathname, pathname, sizeof(channel->pathname));
892 channel->pathname[sizeof(channel->pathname) - 1] = '\0';
893
894 strncpy(channel->name, name, sizeof(channel->name));
895 channel->name[sizeof(channel->name) - 1] = '\0';
896
897 lttng_ht_node_init_u64(&channel->node, channel->key);
898
899 channel->wait_fd = -1;
900
901 CDS_INIT_LIST_HEAD(&channel->streams.head);
902
903 DBG("Allocated channel (key %" PRIu64 ")", channel->key)
904
905 end:
906 return channel;
907 }
908
909 /*
910 * Add a channel to the global list protected by a mutex.
911 *
912 * On success 0 is returned else a negative value.
913 */
914 int consumer_add_channel(struct lttng_consumer_channel *channel,
915 struct lttng_consumer_local_data *ctx)
916 {
917 int ret = 0;
918 struct lttng_ht_node_u64 *node;
919 struct lttng_ht_iter iter;
920
921 pthread_mutex_lock(&consumer_data.lock);
922 pthread_mutex_lock(&channel->lock);
923 pthread_mutex_lock(&channel->timer_lock);
924 rcu_read_lock();
925
926 lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
927 node = lttng_ht_iter_get_node_u64(&iter);
928 if (node != NULL) {
929 /* Channel already exist. Ignore the insertion */
930 ERR("Consumer add channel key %" PRIu64 " already exists!",
931 channel->key);
932 ret = -EEXIST;
933 goto end;
934 }
935
936 lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
937
938 end:
939 rcu_read_unlock();
940 pthread_mutex_unlock(&channel->timer_lock);
941 pthread_mutex_unlock(&channel->lock);
942 pthread_mutex_unlock(&consumer_data.lock);
943
944 if (!ret && channel->wait_fd != -1 &&
945 channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
946 notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
947 }
948 return ret;
949 }
950
951 /*
952 * Allocate the pollfd structure and the local view of the out fds to avoid
953 * doing a lookup in the linked list and concurrency issues when writing is
954 * needed. Called with consumer_data.lock held.
955 *
956 * Returns the number of fds in the structures.
957 */
958 static int update_poll_array(struct lttng_consumer_local_data *ctx,
959 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
960 struct lttng_ht *ht)
961 {
962 int i = 0;
963 struct lttng_ht_iter iter;
964 struct lttng_consumer_stream *stream;
965
966 assert(ctx);
967 assert(ht);
968 assert(pollfd);
969 assert(local_stream);
970
971 DBG("Updating poll fd array");
972 rcu_read_lock();
973 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
974 /*
975 * Only active streams with an active end point can be added to the
976 * poll set and local stream storage of the thread.
977 *
978 * There is a potential race here for endpoint_status to be updated
979 * just after the check. However, this is OK since the stream(s) will
980 * be deleted once the thread is notified that the end point state has
981 * changed where this function will be called back again.
982 */
983 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
984 stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
985 continue;
986 }
987 /*
988 * This clobbers way too much the debug output. Uncomment that if you
989 * need it for debugging purposes.
990 *
991 * DBG("Active FD %d", stream->wait_fd);
992 */
993 (*pollfd)[i].fd = stream->wait_fd;
994 (*pollfd)[i].events = POLLIN | POLLPRI;
995 local_stream[i] = stream;
996 i++;
997 }
998 rcu_read_unlock();
999
1000 /*
1001 * Insert the consumer_data_pipe at the end of the array and don't
1002 * increment i so nb_fd is the number of real FD.
1003 */
1004 (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
1005 (*pollfd)[i].events = POLLIN | POLLPRI;
1006 return i;
1007 }
1008
1009 /*
1010 * Poll on the should_quit pipe and the command socket return -1 on error and
1011 * should exit, 0 if data is available on the command socket
1012 */
1013 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
1014 {
1015 int num_rdy;
1016
1017 restart:
1018 num_rdy = poll(consumer_sockpoll, 2, -1);
1019 if (num_rdy == -1) {
1020 /*
1021 * Restart interrupted system call.
1022 */
1023 if (errno == EINTR) {
1024 goto restart;
1025 }
1026 PERROR("Poll error");
1027 goto exit;
1028 }
1029 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
1030 DBG("consumer_should_quit wake up");
1031 goto exit;
1032 }
1033 return 0;
1034
1035 exit:
1036 return -1;
1037 }
1038
1039 /*
1040 * Set the error socket.
1041 */
1042 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
1043 int sock)
1044 {
1045 ctx->consumer_error_socket = sock;
1046 }
1047
1048 /*
1049 * Set the command socket path.
1050 */
1051 void lttng_consumer_set_command_sock_path(
1052 struct lttng_consumer_local_data *ctx, char *sock)
1053 {
1054 ctx->consumer_command_sock_path = sock;
1055 }
1056
1057 /*
1058 * Send return code to the session daemon.
1059 * If the socket is not defined, we return 0, it is not a fatal error
1060 */
1061 int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
1062 {
1063 if (ctx->consumer_error_socket > 0) {
1064 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
1065 sizeof(enum lttcomm_sessiond_command));
1066 }
1067
1068 return 0;
1069 }
1070
1071 /*
1072 * Close all the tracefiles and stream fds and MUST be called when all
1073 * instances are destroyed i.e. when all threads were joined and are ended.
1074 */
1075 void lttng_consumer_cleanup(void)
1076 {
1077 struct lttng_ht_iter iter;
1078 struct lttng_consumer_channel *channel;
1079
1080 rcu_read_lock();
1081
1082 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
1083 node.node) {
1084 consumer_del_channel(channel);
1085 }
1086
1087 rcu_read_unlock();
1088
1089 lttng_ht_destroy(consumer_data.channel_ht);
1090
1091 cleanup_relayd_ht();
1092
1093 lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
1094
1095 /*
1096 * This HT contains streams that are freed by either the metadata thread or
1097 * the data thread so we do *nothing* on the hash table and simply destroy
1098 * it.
1099 */
1100 lttng_ht_destroy(consumer_data.stream_list_ht);
1101 }
1102
1103 /*
1104 * Called from signal handler.
1105 */
1106 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1107 {
1108 int ret;
1109 consumer_quit = 1;
1110 do {
1111 ret = write(ctx->consumer_should_quit[1], "4", 1);
1112 } while (ret < 0 && errno == EINTR);
1113 if (ret < 0 || ret != 1) {
1114 PERROR("write consumer quit");
1115 }
1116
1117 DBG("Consumer flag that it should quit");
1118 }
1119
1120 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1121 off_t orig_offset)
1122 {
1123 int outfd = stream->out_fd;
1124
1125 /*
1126 * This does a blocking write-and-wait on any page that belongs to the
1127 * subbuffer prior to the one we just wrote.
1128 * Don't care about error values, as these are just hints and ways to
1129 * limit the amount of page cache used.
1130 */
1131 if (orig_offset < stream->max_sb_size) {
1132 return;
1133 }
1134 lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
1135 stream->max_sb_size,
1136 SYNC_FILE_RANGE_WAIT_BEFORE
1137 | SYNC_FILE_RANGE_WRITE
1138 | SYNC_FILE_RANGE_WAIT_AFTER);
1139 /*
1140 * Give hints to the kernel about how we access the file:
1141 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1142 * we write it.
1143 *
1144 * We need to call fadvise again after the file grows because the
1145 * kernel does not seem to apply fadvise to non-existing parts of the
1146 * file.
1147 *
1148 * Call fadvise _after_ having waited for the page writeback to
1149 * complete because the dirty page writeback semantic is not well
1150 * defined. So it can be expected to lead to lower throughput in
1151 * streaming.
1152 */
1153 posix_fadvise(outfd, orig_offset - stream->max_sb_size,
1154 stream->max_sb_size, POSIX_FADV_DONTNEED);
1155 }
1156
1157 /*
1158 * Initialise the necessary environnement :
1159 * - create a new context
1160 * - create the poll_pipe
1161 * - create the should_quit pipe (for signal handler)
1162 * - create the thread pipe (for splice)
1163 *
1164 * Takes a function pointer as argument, this function is called when data is
1165 * available on a buffer. This function is responsible to do the
1166 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1167 * buffer configuration and then kernctl_put_next_subbuf at the end.
1168 *
1169 * Returns a pointer to the new context or NULL on error.
1170 */
1171 struct lttng_consumer_local_data *lttng_consumer_create(
1172 enum lttng_consumer_type type,
1173 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1174 struct lttng_consumer_local_data *ctx),
1175 int (*recv_channel)(struct lttng_consumer_channel *channel),
1176 int (*recv_stream)(struct lttng_consumer_stream *stream),
1177 int (*update_stream)(uint64_t stream_key, uint32_t state))
1178 {
1179 int ret;
1180 struct lttng_consumer_local_data *ctx;
1181
1182 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1183 consumer_data.type == type);
1184 consumer_data.type = type;
1185
1186 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
1187 if (ctx == NULL) {
1188 PERROR("allocating context");
1189 goto error;
1190 }
1191
1192 ctx->consumer_error_socket = -1;
1193 ctx->consumer_metadata_socket = -1;
1194 pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
1195 /* assign the callbacks */
1196 ctx->on_buffer_ready = buffer_ready;
1197 ctx->on_recv_channel = recv_channel;
1198 ctx->on_recv_stream = recv_stream;
1199 ctx->on_update_stream = update_stream;
1200
1201 ctx->consumer_data_pipe = lttng_pipe_open(0);
1202 if (!ctx->consumer_data_pipe) {
1203 goto error_poll_pipe;
1204 }
1205
1206 ret = pipe(ctx->consumer_should_quit);
1207 if (ret < 0) {
1208 PERROR("Error creating recv pipe");
1209 goto error_quit_pipe;
1210 }
1211
1212 ret = pipe(ctx->consumer_thread_pipe);
1213 if (ret < 0) {
1214 PERROR("Error creating thread pipe");
1215 goto error_thread_pipe;
1216 }
1217
1218 ret = pipe(ctx->consumer_channel_pipe);
1219 if (ret < 0) {
1220 PERROR("Error creating channel pipe");
1221 goto error_channel_pipe;
1222 }
1223
1224 ctx->consumer_metadata_pipe = lttng_pipe_open(0);
1225 if (!ctx->consumer_metadata_pipe) {
1226 goto error_metadata_pipe;
1227 }
1228
1229 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1230 if (ret < 0) {
1231 goto error_splice_pipe;
1232 }
1233
1234 return ctx;
1235
1236 error_splice_pipe:
1237 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1238 error_metadata_pipe:
1239 utils_close_pipe(ctx->consumer_channel_pipe);
1240 error_channel_pipe:
1241 utils_close_pipe(ctx->consumer_thread_pipe);
1242 error_thread_pipe:
1243 utils_close_pipe(ctx->consumer_should_quit);
1244 error_quit_pipe:
1245 lttng_pipe_destroy(ctx->consumer_data_pipe);
1246 error_poll_pipe:
1247 free(ctx);
1248 error:
1249 return NULL;
1250 }
1251
1252 /*
1253 * Close all fds associated with the instance and free the context.
1254 */
1255 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1256 {
1257 int ret;
1258
1259 DBG("Consumer destroying it. Closing everything.");
1260
1261 ret = close(ctx->consumer_error_socket);
1262 if (ret) {
1263 PERROR("close");
1264 }
1265 ret = close(ctx->consumer_metadata_socket);
1266 if (ret) {
1267 PERROR("close");
1268 }
1269 utils_close_pipe(ctx->consumer_thread_pipe);
1270 utils_close_pipe(ctx->consumer_channel_pipe);
1271 lttng_pipe_destroy(ctx->consumer_data_pipe);
1272 lttng_pipe_destroy(ctx->consumer_metadata_pipe);
1273 utils_close_pipe(ctx->consumer_should_quit);
1274 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1275
1276 unlink(ctx->consumer_command_sock_path);
1277 free(ctx);
1278 }
1279
1280 /*
1281 * Write the metadata stream id on the specified file descriptor.
1282 */
1283 static int write_relayd_metadata_id(int fd,
1284 struct lttng_consumer_stream *stream,
1285 struct consumer_relayd_sock_pair *relayd, unsigned long padding)
1286 {
1287 int ret;
1288 struct lttcomm_relayd_metadata_payload hdr;
1289
1290 hdr.stream_id = htobe64(stream->relayd_stream_id);
1291 hdr.padding_size = htobe32(padding);
1292 do {
1293 ret = write(fd, (void *) &hdr, sizeof(hdr));
1294 } while (ret < 0 && errno == EINTR);
1295 if (ret < 0 || ret != sizeof(hdr)) {
1296 /*
1297 * This error means that the fd's end is closed so ignore the perror
1298 * not to clubber the error output since this can happen in a normal
1299 * code path.
1300 */
1301 if (errno != EPIPE) {
1302 PERROR("write metadata stream id");
1303 }
1304 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
1305 /*
1306 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1307 * handle writting the missing part so report that as an error and
1308 * don't lie to the caller.
1309 */
1310 ret = -1;
1311 goto end;
1312 }
1313 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1314 stream->relayd_stream_id, padding);
1315
1316 end:
1317 return ret;
1318 }
1319
1320 /*
1321 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1322 * core function for writing trace buffers to either the local filesystem or
1323 * the network.
1324 *
1325 * It must be called with the stream lock held.
1326 *
1327 * Careful review MUST be put if any changes occur!
1328 *
1329 * Returns the number of bytes written
1330 */
1331 ssize_t lttng_consumer_on_read_subbuffer_mmap(
1332 struct lttng_consumer_local_data *ctx,
1333 struct lttng_consumer_stream *stream, unsigned long len,
1334 unsigned long padding,
1335 struct lttng_packet_index *index)
1336 {
1337 unsigned long mmap_offset;
1338 void *mmap_base;
1339 ssize_t ret = 0, written = 0;
1340 off_t orig_offset = stream->out_fd_offset;
1341 /* Default is on the disk */
1342 int outfd = stream->out_fd;
1343 struct consumer_relayd_sock_pair *relayd = NULL;
1344 unsigned int relayd_hang_up = 0;
1345
1346 /* RCU lock for the relayd pointer */
1347 rcu_read_lock();
1348
1349 /* Flag that the current stream if set for network streaming. */
1350 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1351 relayd = consumer_find_relayd(stream->net_seq_idx);
1352 if (relayd == NULL) {
1353 ret = -EPIPE;
1354 goto end;
1355 }
1356 }
1357
1358 /* get the offset inside the fd to mmap */
1359 switch (consumer_data.type) {
1360 case LTTNG_CONSUMER_KERNEL:
1361 mmap_base = stream->mmap_base;
1362 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1363 if (ret != 0) {
1364 PERROR("tracer ctl get_mmap_read_offset");
1365 written = -errno;
1366 goto end;
1367 }
1368 break;
1369 case LTTNG_CONSUMER32_UST:
1370 case LTTNG_CONSUMER64_UST:
1371 mmap_base = lttng_ustctl_get_mmap_base(stream);
1372 if (!mmap_base) {
1373 ERR("read mmap get mmap base for stream %s", stream->name);
1374 written = -EPERM;
1375 goto end;
1376 }
1377 ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
1378 if (ret != 0) {
1379 PERROR("tracer ctl get_mmap_read_offset");
1380 written = ret;
1381 goto end;
1382 }
1383 break;
1384 default:
1385 ERR("Unknown consumer_data type");
1386 assert(0);
1387 }
1388
1389 /* Handle stream on the relayd if the output is on the network */
1390 if (relayd) {
1391 unsigned long netlen = len;
1392
1393 /*
1394 * Lock the control socket for the complete duration of the function
1395 * since from this point on we will use the socket.
1396 */
1397 if (stream->metadata_flag) {
1398 /* Metadata requires the control socket. */
1399 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1400 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1401 }
1402
1403 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1404 if (ret >= 0) {
1405 /* Use the returned socket. */
1406 outfd = ret;
1407
1408 /* Write metadata stream id before payload */
1409 if (stream->metadata_flag) {
1410 ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
1411 if (ret < 0) {
1412 written = ret;
1413 /* Socket operation failed. We consider the relayd dead */
1414 if (ret == -EPIPE || ret == -EINVAL) {
1415 relayd_hang_up = 1;
1416 goto write_error;
1417 }
1418 goto end;
1419 }
1420 }
1421 } else {
1422 /* Socket operation failed. We consider the relayd dead */
1423 if (ret == -EPIPE || ret == -EINVAL) {
1424 relayd_hang_up = 1;
1425 goto write_error;
1426 }
1427 /* Else, use the default set before which is the filesystem. */
1428 }
1429 } else {
1430 /* No streaming, we have to set the len with the full padding */
1431 len += padding;
1432
1433 /*
1434 * Check if we need to change the tracefile before writing the packet.
1435 */
1436 if (stream->chan->tracefile_size > 0 &&
1437 (stream->tracefile_size_current + len) >
1438 stream->chan->tracefile_size) {
1439 ret = utils_rotate_stream_file(stream->chan->pathname,
1440 stream->name, stream->chan->tracefile_size,
1441 stream->chan->tracefile_count, stream->uid, stream->gid,
1442 stream->out_fd, &(stream->tracefile_count_current),
1443 &stream->out_fd);
1444 if (ret < 0) {
1445 ERR("Rotating output file");
1446 goto end;
1447 }
1448 outfd = stream->out_fd;
1449
1450 if (stream->index_fd >= 0) {
1451 ret = index_create_file(stream->chan->pathname,
1452 stream->name, stream->uid, stream->gid,
1453 stream->chan->tracefile_size,
1454 stream->tracefile_count_current);
1455 if (ret < 0) {
1456 goto end;
1457 }
1458 stream->index_fd = ret;
1459 }
1460
1461 /* Reset current size because we just perform a rotation. */
1462 stream->tracefile_size_current = 0;
1463 stream->out_fd_offset = 0;
1464 orig_offset = 0;
1465 }
1466 stream->tracefile_size_current += len;
1467 if (index) {
1468 index->offset = htobe64(stream->out_fd_offset);
1469 }
1470 }
1471
1472 while (len > 0) {
1473 do {
1474 ret = write(outfd, mmap_base + mmap_offset, len);
1475 } while (ret < 0 && errno == EINTR);
1476 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
1477 if (ret < 0) {
1478 /*
1479 * This is possible if the fd is closed on the other side (outfd)
1480 * or any write problem. It can be verbose a bit for a normal
1481 * execution if for instance the relayd is stopped abruptly. This
1482 * can happen so set this to a DBG statement.
1483 */
1484 DBG("Error in file write mmap");
1485 if (written == 0) {
1486 written = -errno;
1487 }
1488 /* Socket operation failed. We consider the relayd dead */
1489 if (errno == EPIPE || errno == EINVAL) {
1490 relayd_hang_up = 1;
1491 goto write_error;
1492 }
1493 goto end;
1494 } else if (ret > len) {
1495 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
1496 written += ret;
1497 goto end;
1498 } else {
1499 len -= ret;
1500 mmap_offset += ret;
1501 }
1502
1503 /* This call is useless on a socket so better save a syscall. */
1504 if (!relayd) {
1505 /* This won't block, but will start writeout asynchronously */
1506 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1507 SYNC_FILE_RANGE_WRITE);
1508 stream->out_fd_offset += ret;
1509 }
1510 stream->output_written += ret;
1511 written += ret;
1512 }
1513 lttng_consumer_sync_trace_file(stream, orig_offset);
1514
1515 write_error:
1516 /*
1517 * This is a special case that the relayd has closed its socket. Let's
1518 * cleanup the relayd object and all associated streams.
1519 */
1520 if (relayd && relayd_hang_up) {
1521 cleanup_relayd(relayd, ctx);
1522 }
1523
1524 end:
1525 /* Unlock only if ctrl socket used */
1526 if (relayd && stream->metadata_flag) {
1527 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1528 }
1529
1530 rcu_read_unlock();
1531 return written;
1532 }
1533
1534 /*
1535 * Splice the data from the ring buffer to the tracefile.
1536 *
1537 * It must be called with the stream lock held.
1538 *
1539 * Returns the number of bytes spliced.
1540 */
1541 ssize_t lttng_consumer_on_read_subbuffer_splice(
1542 struct lttng_consumer_local_data *ctx,
1543 struct lttng_consumer_stream *stream, unsigned long len,
1544 unsigned long padding,
1545 struct lttng_packet_index *index)
1546 {
1547 ssize_t ret = 0, written = 0, ret_splice = 0;
1548 loff_t offset = 0;
1549 off_t orig_offset = stream->out_fd_offset;
1550 int fd = stream->wait_fd;
1551 /* Default is on the disk */
1552 int outfd = stream->out_fd;
1553 struct consumer_relayd_sock_pair *relayd = NULL;
1554 int *splice_pipe;
1555 unsigned int relayd_hang_up = 0;
1556
1557 switch (consumer_data.type) {
1558 case LTTNG_CONSUMER_KERNEL:
1559 break;
1560 case LTTNG_CONSUMER32_UST:
1561 case LTTNG_CONSUMER64_UST:
1562 /* Not supported for user space tracing */
1563 return -ENOSYS;
1564 default:
1565 ERR("Unknown consumer_data type");
1566 assert(0);
1567 }
1568
1569 /* RCU lock for the relayd pointer */
1570 rcu_read_lock();
1571
1572 /* Flag that the current stream if set for network streaming. */
1573 if (stream->net_seq_idx != (uint64_t) -1ULL) {
1574 relayd = consumer_find_relayd(stream->net_seq_idx);
1575 if (relayd == NULL) {
1576 ret = -EPIPE;
1577 goto end;
1578 }
1579 }
1580
1581 /*
1582 * Choose right pipe for splice. Metadata and trace data are handled by
1583 * different threads hence the use of two pipes in order not to race or
1584 * corrupt the written data.
1585 */
1586 if (stream->metadata_flag) {
1587 splice_pipe = ctx->consumer_splice_metadata_pipe;
1588 } else {
1589 splice_pipe = ctx->consumer_thread_pipe;
1590 }
1591
1592 /* Write metadata stream id before payload */
1593 if (relayd) {
1594 int total_len = len;
1595
1596 if (stream->metadata_flag) {
1597 /*
1598 * Lock the control socket for the complete duration of the function
1599 * since from this point on we will use the socket.
1600 */
1601 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1602
1603 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
1604 padding);
1605 if (ret < 0) {
1606 written = ret;
1607 /* Socket operation failed. We consider the relayd dead */
1608 if (ret == -EBADF) {
1609 WARN("Remote relayd disconnected. Stopping");
1610 relayd_hang_up = 1;
1611 goto write_error;
1612 }
1613 goto end;
1614 }
1615
1616 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1617 }
1618
1619 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1620 if (ret >= 0) {
1621 /* Use the returned socket. */
1622 outfd = ret;
1623 } else {
1624 /* Socket operation failed. We consider the relayd dead */
1625 if (ret == -EBADF) {
1626 WARN("Remote relayd disconnected. Stopping");
1627 relayd_hang_up = 1;
1628 goto write_error;
1629 }
1630 goto end;
1631 }
1632 } else {
1633 /* No streaming, we have to set the len with the full padding */
1634 len += padding;
1635
1636 /*
1637 * Check if we need to change the tracefile before writing the packet.
1638 */
1639 if (stream->chan->tracefile_size > 0 &&
1640 (stream->tracefile_size_current + len) >
1641 stream->chan->tracefile_size) {
1642 ret = utils_rotate_stream_file(stream->chan->pathname,
1643 stream->name, stream->chan->tracefile_size,
1644 stream->chan->tracefile_count, stream->uid, stream->gid,
1645 stream->out_fd, &(stream->tracefile_count_current),
1646 &stream->out_fd);
1647 if (ret < 0) {
1648 ERR("Rotating output file");
1649 goto end;
1650 }
1651 outfd = stream->out_fd;
1652
1653 if (stream->index_fd >= 0) {
1654 ret = index_create_file(stream->chan->pathname,
1655 stream->name, stream->uid, stream->gid,
1656 stream->chan->tracefile_size,
1657 stream->tracefile_count_current);
1658 if (ret < 0) {
1659 goto end;
1660 }
1661 stream->index_fd = ret;
1662 }
1663
1664 /* Reset current size because we just perform a rotation. */
1665 stream->tracefile_size_current = 0;
1666 stream->out_fd_offset = 0;
1667 orig_offset = 0;
1668 }
1669 stream->tracefile_size_current += len;
1670 index->offset = htobe64(stream->out_fd_offset);
1671 }
1672
1673 while (len > 0) {
1674 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1675 (unsigned long)offset, len, fd, splice_pipe[1]);
1676 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
1677 SPLICE_F_MOVE | SPLICE_F_MORE);
1678 DBG("splice chan to pipe, ret %zd", ret_splice);
1679 if (ret_splice < 0) {
1680 PERROR("Error in relay splice");
1681 if (written == 0) {
1682 written = ret_splice;
1683 }
1684 ret = errno;
1685 goto splice_error;
1686 }
1687
1688 /* Handle stream on the relayd if the output is on the network */
1689 if (relayd) {
1690 if (stream->metadata_flag) {
1691 size_t metadata_payload_size =
1692 sizeof(struct lttcomm_relayd_metadata_payload);
1693
1694 /* Update counter to fit the spliced data */
1695 ret_splice += metadata_payload_size;
1696 len += metadata_payload_size;
1697 /*
1698 * We do this so the return value can match the len passed as
1699 * argument to this function.
1700 */
1701 written -= metadata_payload_size;
1702 }
1703 }
1704
1705 /* Splice data out */
1706 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
1707 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1708 DBG("Consumer splice pipe to file, ret %zd", ret_splice);
1709 if (ret_splice < 0) {
1710 PERROR("Error in file splice");
1711 if (written == 0) {
1712 written = ret_splice;
1713 }
1714 /* Socket operation failed. We consider the relayd dead */
1715 if (errno == EBADF || errno == EPIPE) {
1716 WARN("Remote relayd disconnected. Stopping");
1717 relayd_hang_up = 1;
1718 goto write_error;
1719 }
1720 ret = errno;
1721 goto splice_error;
1722 } else if (ret_splice > len) {
1723 errno = EINVAL;
1724 PERROR("Wrote more data than requested %zd (len: %lu)",
1725 ret_splice, len);
1726 written += ret_splice;
1727 ret = errno;
1728 goto splice_error;
1729 }
1730 len -= ret_splice;
1731
1732 /* This call is useless on a socket so better save a syscall. */
1733 if (!relayd) {
1734 /* This won't block, but will start writeout asynchronously */
1735 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1736 SYNC_FILE_RANGE_WRITE);
1737 stream->out_fd_offset += ret_splice;
1738 }
1739 stream->output_written += ret_splice;
1740 written += ret_splice;
1741 }
1742 lttng_consumer_sync_trace_file(stream, orig_offset);
1743
1744 ret = ret_splice;
1745
1746 goto end;
1747
1748 write_error:
1749 /*
1750 * This is a special case that the relayd has closed its socket. Let's
1751 * cleanup the relayd object and all associated streams.
1752 */
1753 if (relayd && relayd_hang_up) {
1754 cleanup_relayd(relayd, ctx);
1755 /* Skip splice error so the consumer does not fail */
1756 goto end;
1757 }
1758
1759 splice_error:
1760 /* send the appropriate error description to sessiond */
1761 switch (ret) {
1762 case EINVAL:
1763 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
1764 break;
1765 case ENOMEM:
1766 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
1767 break;
1768 case ESPIPE:
1769 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
1770 break;
1771 }
1772
1773 end:
1774 if (relayd && stream->metadata_flag) {
1775 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1776 }
1777
1778 rcu_read_unlock();
1779 return written;
1780 }
1781
1782 /*
1783 * Take a snapshot for a specific fd
1784 *
1785 * Returns 0 on success, < 0 on error
1786 */
1787 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
1788 {
1789 switch (consumer_data.type) {
1790 case LTTNG_CONSUMER_KERNEL:
1791 return lttng_kconsumer_take_snapshot(stream);
1792 case LTTNG_CONSUMER32_UST:
1793 case LTTNG_CONSUMER64_UST:
1794 return lttng_ustconsumer_take_snapshot(stream);
1795 default:
1796 ERR("Unknown consumer_data type");
1797 assert(0);
1798 return -ENOSYS;
1799 }
1800 }
1801
1802 /*
1803 * Get the produced position
1804 *
1805 * Returns 0 on success, < 0 on error
1806 */
1807 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
1808 unsigned long *pos)
1809 {
1810 switch (consumer_data.type) {
1811 case LTTNG_CONSUMER_KERNEL:
1812 return lttng_kconsumer_get_produced_snapshot(stream, pos);
1813 case LTTNG_CONSUMER32_UST:
1814 case LTTNG_CONSUMER64_UST:
1815 return lttng_ustconsumer_get_produced_snapshot(stream, pos);
1816 default:
1817 ERR("Unknown consumer_data type");
1818 assert(0);
1819 return -ENOSYS;
1820 }
1821 }
1822
1823 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1824 int sock, struct pollfd *consumer_sockpoll)
1825 {
1826 switch (consumer_data.type) {
1827 case LTTNG_CONSUMER_KERNEL:
1828 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1829 case LTTNG_CONSUMER32_UST:
1830 case LTTNG_CONSUMER64_UST:
1831 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1832 default:
1833 ERR("Unknown consumer_data type");
1834 assert(0);
1835 return -ENOSYS;
1836 }
1837 }
1838
1839 /*
1840 * Iterate over all streams of the hashtable and free them properly.
1841 *
1842 * WARNING: *MUST* be used with data stream only.
1843 */
1844 static void destroy_data_stream_ht(struct lttng_ht *ht)
1845 {
1846 struct lttng_ht_iter iter;
1847 struct lttng_consumer_stream *stream;
1848
1849 if (ht == NULL) {
1850 return;
1851 }
1852
1853 rcu_read_lock();
1854 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1855 /*
1856 * Ignore return value since we are currently cleaning up so any error
1857 * can't be handled.
1858 */
1859 (void) consumer_del_stream(stream, ht);
1860 }
1861 rcu_read_unlock();
1862
1863 lttng_ht_destroy(ht);
1864 }
1865
1866 /*
1867 * Iterate over all streams of the hashtable and free them properly.
1868 *
1869 * XXX: Should not be only for metadata stream or else use an other name.
1870 */
1871 static void destroy_stream_ht(struct lttng_ht *ht)
1872 {
1873 struct lttng_ht_iter iter;
1874 struct lttng_consumer_stream *stream;
1875
1876 if (ht == NULL) {
1877 return;
1878 }
1879
1880 rcu_read_lock();
1881 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1882 /*
1883 * Ignore return value since we are currently cleaning up so any error
1884 * can't be handled.
1885 */
1886 (void) consumer_del_metadata_stream(stream, ht);
1887 }
1888 rcu_read_unlock();
1889
1890 lttng_ht_destroy(ht);
1891 }
1892
1893 void lttng_consumer_close_metadata(void)
1894 {
1895 switch (consumer_data.type) {
1896 case LTTNG_CONSUMER_KERNEL:
1897 /*
1898 * The Kernel consumer has a different metadata scheme so we don't
1899 * close anything because the stream will be closed by the session
1900 * daemon.
1901 */
1902 break;
1903 case LTTNG_CONSUMER32_UST:
1904 case LTTNG_CONSUMER64_UST:
1905 /*
1906 * Close all metadata streams. The metadata hash table is passed and
1907 * this call iterates over it by closing all wakeup fd. This is safe
1908 * because at this point we are sure that the metadata producer is
1909 * either dead or blocked.
1910 */
1911 lttng_ustconsumer_close_metadata(metadata_ht);
1912 break;
1913 default:
1914 ERR("Unknown consumer_data type");
1915 assert(0);
1916 }
1917 }
1918
1919 /*
1920 * Clean up a metadata stream and free its memory.
1921 */
1922 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
1923 struct lttng_ht *ht)
1924 {
1925 int ret;
1926 struct lttng_ht_iter iter;
1927 struct lttng_consumer_channel *free_chan = NULL;
1928 struct consumer_relayd_sock_pair *relayd;
1929
1930 assert(stream);
1931 /*
1932 * This call should NEVER receive regular stream. It must always be
1933 * metadata stream and this is crucial for data structure synchronization.
1934 */
1935 assert(stream->metadata_flag);
1936
1937 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
1938
1939 if (ht == NULL) {
1940 /* Means the stream was allocated but not successfully added */
1941 goto free_stream_rcu;
1942 }
1943
1944 pthread_mutex_lock(&consumer_data.lock);
1945 pthread_mutex_lock(&stream->chan->lock);
1946 pthread_mutex_lock(&stream->lock);
1947
1948 switch (consumer_data.type) {
1949 case LTTNG_CONSUMER_KERNEL:
1950 if (stream->mmap_base != NULL) {
1951 ret = munmap(stream->mmap_base, stream->mmap_len);
1952 if (ret != 0) {
1953 PERROR("munmap metadata stream");
1954 }
1955 }
1956 if (stream->wait_fd >= 0) {
1957 ret = close(stream->wait_fd);
1958 if (ret < 0) {
1959 PERROR("close kernel metadata wait_fd");
1960 }
1961 }
1962 break;
1963 case LTTNG_CONSUMER32_UST:
1964 case LTTNG_CONSUMER64_UST:
1965 if (stream->monitor) {
1966 /* close the write-side in close_metadata */
1967 ret = close(stream->ust_metadata_poll_pipe[0]);
1968 if (ret < 0) {
1969 PERROR("Close UST metadata read-side poll pipe");
1970 }
1971 }
1972 lttng_ustconsumer_del_stream(stream);
1973 break;
1974 default:
1975 ERR("Unknown consumer_data type");
1976 assert(0);
1977 goto end;
1978 }
1979
1980 rcu_read_lock();
1981 iter.iter.node = &stream->node.node;
1982 ret = lttng_ht_del(ht, &iter);
1983 assert(!ret);
1984
1985 iter.iter.node = &stream->node_channel_id.node;
1986 ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
1987 assert(!ret);
1988
1989 iter.iter.node = &stream->node_session_id.node;
1990 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
1991 assert(!ret);
1992 rcu_read_unlock();
1993
1994 if (stream->out_fd >= 0) {
1995 ret = close(stream->out_fd);
1996 if (ret) {
1997 PERROR("close");
1998 }
1999 }
2000
2001 /* Check and cleanup relayd */
2002 rcu_read_lock();
2003 relayd = consumer_find_relayd(stream->net_seq_idx);
2004 if (relayd != NULL) {
2005 uatomic_dec(&relayd->refcount);
2006 assert(uatomic_read(&relayd->refcount) >= 0);
2007
2008 /* Closing streams requires to lock the control socket. */
2009 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
2010 ret = relayd_send_close_stream(&relayd->control_sock,
2011 stream->relayd_stream_id, stream->next_net_seq_num - 1);
2012 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2013 if (ret < 0) {
2014 DBG("Unable to close stream on the relayd. Continuing");
2015 /*
2016 * Continue here. There is nothing we can do for the relayd.
2017 * Chances are that the relayd has closed the socket so we just
2018 * continue cleaning up.
2019 */
2020 }
2021
2022 /* Both conditions are met, we destroy the relayd. */
2023 if (uatomic_read(&relayd->refcount) == 0 &&
2024 uatomic_read(&relayd->destroy_flag)) {
2025 consumer_destroy_relayd(relayd);
2026 }
2027 }
2028 rcu_read_unlock();
2029
2030 /* Atomically decrement channel refcount since other threads can use it. */
2031 if (!uatomic_sub_return(&stream->chan->refcount, 1)
2032 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
2033 /* Go for channel deletion! */
2034 free_chan = stream->chan;
2035 }
2036
2037 end:
2038 /*
2039 * Nullify the stream reference so it is not used after deletion. The
2040 * channel lock MUST be acquired before being able to check for
2041 * a NULL pointer value.
2042 */
2043 stream->chan->metadata_stream = NULL;
2044
2045 pthread_mutex_unlock(&stream->lock);
2046 pthread_mutex_unlock(&stream->chan->lock);
2047 pthread_mutex_unlock(&consumer_data.lock);
2048
2049 if (free_chan) {
2050 consumer_del_channel(free_chan);
2051 }
2052
2053 free_stream_rcu:
2054 call_rcu(&stream->node.head, free_stream_rcu);
2055 }
2056
2057 /*
2058 * Action done with the metadata stream when adding it to the consumer internal
2059 * data structures to handle it.
2060 */
2061 int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
2062 {
2063 struct lttng_ht *ht = metadata_ht;
2064 int ret = 0;
2065 struct lttng_ht_iter iter;
2066 struct lttng_ht_node_u64 *node;
2067
2068 assert(stream);
2069 assert(ht);
2070
2071 DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
2072
2073 pthread_mutex_lock(&consumer_data.lock);
2074 pthread_mutex_lock(&stream->chan->lock);
2075 pthread_mutex_lock(&stream->chan->timer_lock);
2076 pthread_mutex_lock(&stream->lock);
2077
2078 /*
2079 * From here, refcounts are updated so be _careful_ when returning an error
2080 * after this point.
2081 */
2082
2083 rcu_read_lock();
2084
2085 /*
2086 * Lookup the stream just to make sure it does not exist in our internal
2087 * state. This should NEVER happen.
2088 */
2089 lttng_ht_lookup(ht, &stream->key, &iter);
2090 node = lttng_ht_iter_get_node_u64(&iter);
2091 assert(!node);
2092
2093 /*
2094 * When nb_init_stream_left reaches 0, we don't need to trigger any action
2095 * in terms of destroying the associated channel, because the action that
2096 * causes the count to become 0 also causes a stream to be added. The
2097 * channel deletion will thus be triggered by the following removal of this
2098 * stream.
2099 */
2100 if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
2101 /* Increment refcount before decrementing nb_init_stream_left */
2102 cmm_smp_wmb();
2103 uatomic_dec(&stream->chan->nb_init_stream_left);
2104 }
2105
2106 lttng_ht_add_unique_u64(ht, &stream->node);
2107
2108 lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
2109 &stream->node_channel_id);
2110
2111 /*
2112 * Add stream to the stream_list_ht of the consumer data. No need to steal
2113 * the key since the HT does not use it and we allow to add redundant keys
2114 * into this table.
2115 */
2116 lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
2117
2118 rcu_read_unlock();
2119
2120 pthread_mutex_unlock(&stream->lock);
2121 pthread_mutex_unlock(&stream->chan->lock);
2122 pthread_mutex_unlock(&stream->chan->timer_lock);
2123 pthread_mutex_unlock(&consumer_data.lock);
2124 return ret;
2125 }
2126
2127 /*
2128 * Delete data stream that are flagged for deletion (endpoint_status).
2129 */
2130 static void validate_endpoint_status_data_stream(void)
2131 {
2132 struct lttng_ht_iter iter;
2133 struct lttng_consumer_stream *stream;
2134
2135 DBG("Consumer delete flagged data stream");
2136
2137 rcu_read_lock();
2138 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
2139 /* Validate delete flag of the stream */
2140 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2141 continue;
2142 }
2143 /* Delete it right now */
2144 consumer_del_stream(stream, data_ht);
2145 }
2146 rcu_read_unlock();
2147 }
2148
2149 /*
2150 * Delete metadata stream that are flagged for deletion (endpoint_status).
2151 */
2152 static void validate_endpoint_status_metadata_stream(
2153 struct lttng_poll_event *pollset)
2154 {
2155 struct lttng_ht_iter iter;
2156 struct lttng_consumer_stream *stream;
2157
2158 DBG("Consumer delete flagged metadata stream");
2159
2160 assert(pollset);
2161
2162 rcu_read_lock();
2163 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
2164 /* Validate delete flag of the stream */
2165 if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
2166 continue;
2167 }
2168 /*
2169 * Remove from pollset so the metadata thread can continue without
2170 * blocking on a deleted stream.
2171 */
2172 lttng_poll_del(pollset, stream->wait_fd);
2173
2174 /* Delete it right now */
2175 consumer_del_metadata_stream(stream, metadata_ht);
2176 }
2177 rcu_read_unlock();
2178 }
2179
2180 /*
2181 * Thread polls on metadata file descriptor and write them on disk or on the
2182 * network.
2183 */
2184 void *consumer_thread_metadata_poll(void *data)
2185 {
2186 int ret, i, pollfd, err = -1;
2187 uint32_t revents, nb_fd;
2188 struct lttng_consumer_stream *stream = NULL;
2189 struct lttng_ht_iter iter;
2190 struct lttng_ht_node_u64 *node;
2191 struct lttng_poll_event events;
2192 struct lttng_consumer_local_data *ctx = data;
2193 ssize_t len;
2194
2195 rcu_register_thread();
2196
2197 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
2198
2199 health_code_update();
2200
2201 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2202 if (!metadata_ht) {
2203 /* ENOMEM at this point. Better to bail out. */
2204 goto end_ht;
2205 }
2206
2207 DBG("Thread metadata poll started");
2208
2209 /* Size is set to 1 for the consumer_metadata pipe */
2210 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2211 if (ret < 0) {
2212 ERR("Poll set creation failed");
2213 goto end_poll;
2214 }
2215
2216 ret = lttng_poll_add(&events,
2217 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
2218 if (ret < 0) {
2219 goto end;
2220 }
2221
2222 /* Main loop */
2223 DBG("Metadata main loop started");
2224
2225 while (1) {
2226 health_code_update();
2227
2228 /* Only the metadata pipe is set */
2229 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
2230 err = 0; /* All is OK */
2231 goto end;
2232 }
2233
2234 restart:
2235 DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2236 health_poll_entry();
2237 ret = lttng_poll_wait(&events, -1);
2238 health_poll_exit();
2239 DBG("Metadata event catched in thread");
2240 if (ret < 0) {
2241 if (errno == EINTR) {
2242 ERR("Poll EINTR catched");
2243 goto restart;
2244 }
2245 goto error;
2246 }
2247
2248 nb_fd = ret;
2249
2250 /* From here, the event is a metadata wait fd */
2251 for (i = 0; i < nb_fd; i++) {
2252 health_code_update();
2253
2254 revents = LTTNG_POLL_GETEV(&events, i);
2255 pollfd = LTTNG_POLL_GETFD(&events, i);
2256
2257 if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
2258 if (revents & (LPOLLERR | LPOLLHUP )) {
2259 DBG("Metadata thread pipe hung up");
2260 /*
2261 * Remove the pipe from the poll set and continue the loop
2262 * since their might be data to consume.
2263 */
2264 lttng_poll_del(&events,
2265 lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
2266 lttng_pipe_read_close(ctx->consumer_metadata_pipe);
2267 continue;
2268 } else if (revents & LPOLLIN) {
2269 ssize_t pipe_len;
2270
2271 pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
2272 &stream, sizeof(stream));
2273 if (pipe_len < 0) {
2274 ERR("read metadata stream, ret: %zd", pipe_len);
2275 /*
2276 * Continue here to handle the rest of the streams.
2277 */
2278 continue;
2279 }
2280
2281 /* A NULL stream means that the state has changed. */
2282 if (stream == NULL) {
2283 /* Check for deleted streams. */
2284 validate_endpoint_status_metadata_stream(&events);
2285 goto restart;
2286 }
2287
2288 DBG("Adding metadata stream %d to poll set",
2289 stream->wait_fd);
2290
2291 /* Add metadata stream to the global poll events list */
2292 lttng_poll_add(&events, stream->wait_fd,
2293 LPOLLIN | LPOLLPRI);
2294 }
2295
2296 /* Handle other stream */
2297 continue;
2298 }
2299
2300 rcu_read_lock();
2301 {
2302 uint64_t tmp_id = (uint64_t) pollfd;
2303
2304 lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
2305 }
2306 node = lttng_ht_iter_get_node_u64(&iter);
2307 assert(node);
2308
2309 stream = caa_container_of(node, struct lttng_consumer_stream,
2310 node);
2311
2312 /* Check for error event */
2313 if (revents & (LPOLLERR | LPOLLHUP)) {
2314 DBG("Metadata fd %d is hup|err.", pollfd);
2315 if (!stream->hangup_flush_done
2316 && (consumer_data.type == LTTNG_CONSUMER32_UST
2317 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2318 DBG("Attempting to flush and consume the UST buffers");
2319 lttng_ustconsumer_on_stream_hangup(stream);
2320
2321 /* We just flushed the stream now read it. */
2322 do {
2323 health_code_update();
2324
2325 len = ctx->on_buffer_ready(stream, ctx);
2326 /*
2327 * We don't check the return value here since if we get
2328 * a negative len, it means an error occured thus we
2329 * simply remove it from the poll set and free the
2330 * stream.
2331 */
2332 } while (len > 0);
2333 }
2334
2335 lttng_poll_del(&events, stream->wait_fd);
2336 /*
2337 * This call update the channel states, closes file descriptors
2338 * and securely free the stream.
2339 */
2340 consumer_del_metadata_stream(stream, metadata_ht);
2341 } else if (revents & (LPOLLIN | LPOLLPRI)) {
2342 /* Get the data out of the metadata file descriptor */
2343 DBG("Metadata available on fd %d", pollfd);
2344 assert(stream->wait_fd == pollfd);
2345
2346 do {
2347 health_code_update();
2348
2349 len = ctx->on_buffer_ready(stream, ctx);
2350 /*
2351 * We don't check the return value here since if we get
2352 * a negative len, it means an error occured thus we
2353 * simply remove it from the poll set and free the
2354 * stream.
2355 */
2356 } while (len > 0);
2357
2358 /* It's ok to have an unavailable sub-buffer */
2359 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2360 /* Clean up stream from consumer and free it. */
2361 lttng_poll_del(&events, stream->wait_fd);
2362 consumer_del_metadata_stream(stream, metadata_ht);
2363 }
2364 }
2365
2366 /* Release RCU lock for the stream looked up */
2367 rcu_read_unlock();
2368 }
2369 }
2370
2371 /* All is OK */
2372 err = 0;
2373 error:
2374 end:
2375 DBG("Metadata poll thread exiting");
2376
2377 lttng_poll_clean(&events);
2378 end_poll:
2379 destroy_stream_ht(metadata_ht);
2380 end_ht:
2381 if (err) {
2382 health_error();
2383 ERR("Health error occurred in %s", __func__);
2384 }
2385 health_unregister(health_consumerd);
2386 rcu_unregister_thread();
2387 return NULL;
2388 }
2389
2390 /*
2391 * This thread polls the fds in the set to consume the data and write
2392 * it to tracefile if necessary.
2393 */
2394 void *consumer_thread_data_poll(void *data)
2395 {
2396 int num_rdy, num_hup, high_prio, ret, i, err = -1;
2397 struct pollfd *pollfd = NULL;
2398 /* local view of the streams */
2399 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
2400 /* local view of consumer_data.fds_count */
2401 int nb_fd = 0;
2402 struct lttng_consumer_local_data *ctx = data;
2403 ssize_t len;
2404
2405 rcu_register_thread();
2406
2407 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
2408
2409 health_code_update();
2410
2411 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2412 if (data_ht == NULL) {
2413 /* ENOMEM at this point. Better to bail out. */
2414 goto end;
2415 }
2416
2417 local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
2418 if (local_stream == NULL) {
2419 PERROR("local_stream malloc");
2420 goto end;
2421 }
2422
2423 while (1) {
2424 health_code_update();
2425
2426 high_prio = 0;
2427 num_hup = 0;
2428
2429 /*
2430 * the fds set has been updated, we need to update our
2431 * local array as well
2432 */
2433 pthread_mutex_lock(&consumer_data.lock);
2434 if (consumer_data.need_update) {
2435 free(pollfd);
2436 pollfd = NULL;
2437
2438 free(local_stream);
2439 local_stream = NULL;
2440
2441 /* allocate for all fds + 1 for the consumer_data_pipe */
2442 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
2443 if (pollfd == NULL) {
2444 PERROR("pollfd malloc");
2445 pthread_mutex_unlock(&consumer_data.lock);
2446 goto end;
2447 }
2448
2449 /* allocate for all fds + 1 for the consumer_data_pipe */
2450 local_stream = zmalloc((consumer_data.stream_count + 1) *
2451 sizeof(struct lttng_consumer_stream *));
2452 if (local_stream == NULL) {
2453 PERROR("local_stream malloc");
2454 pthread_mutex_unlock(&consumer_data.lock);
2455 goto end;
2456 }
2457 ret = update_poll_array(ctx, &pollfd, local_stream,
2458 data_ht);
2459 if (ret < 0) {
2460 ERR("Error in allocating pollfd or local_outfds");
2461 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2462 pthread_mutex_unlock(&consumer_data.lock);
2463 goto end;
2464 }
2465 nb_fd = ret;
2466 consumer_data.need_update = 0;
2467 }
2468 pthread_mutex_unlock(&consumer_data.lock);
2469
2470 /* No FDs and consumer_quit, consumer_cleanup the thread */
2471 if (nb_fd == 0 && consumer_quit == 1) {
2472 err = 0; /* All is OK */
2473 goto end;
2474 }
2475 /* poll on the array of fds */
2476 restart:
2477 DBG("polling on %d fd", nb_fd + 1);
2478 health_poll_entry();
2479 num_rdy = poll(pollfd, nb_fd + 1, -1);
2480 health_poll_exit();
2481 DBG("poll num_rdy : %d", num_rdy);
2482 if (num_rdy == -1) {
2483 /*
2484 * Restart interrupted system call.
2485 */
2486 if (errno == EINTR) {
2487 goto restart;
2488 }
2489 PERROR("Poll error");
2490 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2491 goto end;
2492 } else if (num_rdy == 0) {
2493 DBG("Polling thread timed out");
2494 goto end;
2495 }
2496
2497 /*
2498 * If the consumer_data_pipe triggered poll go directly to the
2499 * beginning of the loop to update the array. We want to prioritize
2500 * array update over low-priority reads.
2501 */
2502 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2503 ssize_t pipe_readlen;
2504
2505 DBG("consumer_data_pipe wake up");
2506 pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
2507 &new_stream, sizeof(new_stream));
2508 if (pipe_readlen < 0) {
2509 ERR("Consumer data pipe ret %zd", pipe_readlen);
2510 /* Continue so we can at least handle the current stream(s). */
2511 continue;
2512 }
2513
2514 /*
2515 * If the stream is NULL, just ignore it. It's also possible that
2516 * the sessiond poll thread changed the consumer_quit state and is
2517 * waking us up to test it.
2518 */
2519 if (new_stream == NULL) {
2520 validate_endpoint_status_data_stream();
2521 continue;
2522 }
2523
2524 /* Continue to update the local streams and handle prio ones */
2525 continue;
2526 }
2527
2528 /* Take care of high priority channels first. */
2529 for (i = 0; i < nb_fd; i++) {
2530 health_code_update();
2531
2532 if (local_stream[i] == NULL) {
2533 continue;
2534 }
2535 if (pollfd[i].revents & POLLPRI) {
2536 DBG("Urgent read on fd %d", pollfd[i].fd);
2537 high_prio = 1;
2538 len = ctx->on_buffer_ready(local_stream[i], ctx);
2539 /* it's ok to have an unavailable sub-buffer */
2540 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2541 /* Clean the stream and free it. */
2542 consumer_del_stream(local_stream[i], data_ht);
2543 local_stream[i] = NULL;
2544 } else if (len > 0) {
2545 local_stream[i]->data_read = 1;
2546 }
2547 }
2548 }
2549
2550 /*
2551 * If we read high prio channel in this loop, try again
2552 * for more high prio data.
2553 */
2554 if (high_prio) {
2555 continue;
2556 }
2557
2558 /* Take care of low priority channels. */
2559 for (i = 0; i < nb_fd; i++) {
2560 health_code_update();
2561
2562 if (local_stream[i] == NULL) {
2563 continue;
2564 }
2565 if ((pollfd[i].revents & POLLIN) ||
2566 local_stream[i]->hangup_flush_done) {
2567 DBG("Normal read on fd %d", pollfd[i].fd);
2568 len = ctx->on_buffer_ready(local_stream[i], ctx);
2569 /* it's ok to have an unavailable sub-buffer */
2570 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2571 /* Clean the stream and free it. */
2572 consumer_del_stream(local_stream[i], data_ht);
2573 local_stream[i] = NULL;
2574 } else if (len > 0) {
2575 local_stream[i]->data_read = 1;
2576 }
2577 }
2578 }
2579
2580 /* Handle hangup and errors */
2581 for (i = 0; i < nb_fd; i++) {
2582 health_code_update();
2583
2584 if (local_stream[i] == NULL) {
2585 continue;
2586 }
2587 if (!local_stream[i]->hangup_flush_done
2588 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2589 && (consumer_data.type == LTTNG_CONSUMER32_UST
2590 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2591 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2592 pollfd[i].fd);
2593 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2594 /* Attempt read again, for the data we just flushed. */
2595 local_stream[i]->data_read = 1;
2596 }
2597 /*
2598 * If the poll flag is HUP/ERR/NVAL and we have
2599 * read no data in this pass, we can remove the
2600 * stream from its hash table.
2601 */
2602 if ((pollfd[i].revents & POLLHUP)) {
2603 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2604 if (!local_stream[i]->data_read) {
2605 consumer_del_stream(local_stream[i], data_ht);
2606 local_stream[i] = NULL;
2607 num_hup++;
2608 }
2609 } else if (pollfd[i].revents & POLLERR) {
2610 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2611 if (!local_stream[i]->data_read) {
2612 consumer_del_stream(local_stream[i], data_ht);
2613 local_stream[i] = NULL;
2614 num_hup++;
2615 }
2616 } else if (pollfd[i].revents & POLLNVAL) {
2617 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2618 if (!local_stream[i]->data_read) {
2619 consumer_del_stream(local_stream[i], data_ht);
2620 local_stream[i] = NULL;
2621 num_hup++;
2622 }
2623 }
2624 if (local_stream[i] != NULL) {
2625 local_stream[i]->data_read = 0;
2626 }
2627 }
2628 }
2629 /* All is OK */
2630 err = 0;
2631 end:
2632 DBG("polling thread exiting");
2633 free(pollfd);
2634 free(local_stream);
2635
2636 /*
2637 * Close the write side of the pipe so epoll_wait() in
2638 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2639 * read side of the pipe. If we close them both, epoll_wait strangely does
2640 * not return and could create a endless wait period if the pipe is the
2641 * only tracked fd in the poll set. The thread will take care of closing
2642 * the read side.
2643 */
2644 (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
2645
2646 destroy_data_stream_ht(data_ht);
2647
2648 if (err) {
2649 health_error();
2650 ERR("Health error occurred in %s", __func__);
2651 }
2652 health_unregister(health_consumerd);
2653
2654 rcu_unregister_thread();
2655 return NULL;
2656 }
2657
2658 /*
2659 * Close wake-up end of each stream belonging to the channel. This will
2660 * allow the poll() on the stream read-side to detect when the
2661 * write-side (application) finally closes them.
2662 */
2663 static
2664 void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
2665 {
2666 struct lttng_ht *ht;
2667 struct lttng_consumer_stream *stream;
2668 struct lttng_ht_iter iter;
2669
2670 ht = consumer_data.stream_per_chan_id_ht;
2671
2672 rcu_read_lock();
2673 cds_lfht_for_each_entry_duplicate(ht->ht,
2674 ht->hash_fct(&channel->key, lttng_ht_seed),
2675 ht->match_fct, &channel->key,
2676 &iter.iter, stream, node_channel_id.node) {
2677 /*
2678 * Protect against teardown with mutex.
2679 */
2680 pthread_mutex_lock(&stream->lock);
2681 if (cds_lfht_is_node_deleted(&stream->node.node)) {
2682 goto next;
2683 }
2684 switch (consumer_data.type) {
2685 case LTTNG_CONSUMER_KERNEL:
2686 break;
2687 case LTTNG_CONSUMER32_UST:
2688 case LTTNG_CONSUMER64_UST:
2689 /*
2690 * Note: a mutex is taken internally within
2691 * liblttng-ust-ctl to protect timer wakeup_fd
2692 * use from concurrent close.
2693 */
2694 lttng_ustconsumer_close_stream_wakeup(stream);
2695 break;
2696 default:
2697 ERR("Unknown consumer_data type");
2698 assert(0);
2699 }
2700 next:
2701 pthread_mutex_unlock(&stream->lock);
2702 }
2703 rcu_read_unlock();
2704 }
2705
2706 static void destroy_channel_ht(struct lttng_ht *ht)
2707 {
2708 struct lttng_ht_iter iter;
2709 struct lttng_consumer_channel *channel;
2710 int ret;
2711
2712 if (ht == NULL) {
2713 return;
2714 }
2715
2716 rcu_read_lock();
2717 cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
2718 ret = lttng_ht_del(ht, &iter);
2719 assert(ret != 0);
2720 }
2721 rcu_read_unlock();
2722
2723 lttng_ht_destroy(ht);
2724 }
2725
2726 /*
2727 * This thread polls the channel fds to detect when they are being
2728 * closed. It closes all related streams if the channel is detected as
2729 * closed. It is currently only used as a shim layer for UST because the
2730 * consumerd needs to keep the per-stream wakeup end of pipes open for
2731 * periodical flush.
2732 */
2733 void *consumer_thread_channel_poll(void *data)
2734 {
2735 int ret, i, pollfd, err = -1;
2736 uint32_t revents, nb_fd;
2737 struct lttng_consumer_channel *chan = NULL;
2738 struct lttng_ht_iter iter;
2739 struct lttng_ht_node_u64 *node;
2740 struct lttng_poll_event events;
2741 struct lttng_consumer_local_data *ctx = data;
2742 struct lttng_ht *channel_ht;
2743
2744 rcu_register_thread();
2745
2746 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
2747
2748 health_code_update();
2749
2750 channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
2751 if (!channel_ht) {
2752 /* ENOMEM at this point. Better to bail out. */
2753 goto end_ht;
2754 }
2755
2756 DBG("Thread channel poll started");
2757
2758 /* Size is set to 1 for the consumer_channel pipe */
2759 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2760 if (ret < 0) {
2761 ERR("Poll set creation failed");
2762 goto end_poll;
2763 }
2764
2765 ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
2766 if (ret < 0) {
2767 goto end;
2768 }
2769
2770 /* Main loop */
2771 DBG("Channel main loop started");
2772
2773 while (1) {
2774 health_code_update();
2775
2776 /* Only the channel pipe is set */
2777 if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
2778 err = 0; /* All is OK */
2779 goto end;
2780 }
2781
2782 restart:
2783 DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
2784 health_poll_entry();
2785 ret = lttng_poll_wait(&events, -1);
2786 health_poll_exit();
2787 DBG("Channel event catched in thread");
2788 if (ret < 0) {
2789 if (errno == EINTR) {
2790 ERR("Poll EINTR catched");
2791 goto restart;
2792 }
2793 goto end;
2794 }
2795
2796 nb_fd = ret;
2797
2798 /* From here, the event is a channel wait fd */
2799 for (i = 0; i < nb_fd; i++) {
2800 health_code_update();
2801
2802 revents = LTTNG_POLL_GETEV(&events, i);
2803 pollfd = LTTNG_POLL_GETFD(&events, i);
2804
2805 /* Just don't waste time if no returned events for the fd */
2806 if (!revents) {
2807 continue;
2808 }
2809 if (pollfd == ctx->consumer_channel_pipe[0]) {
2810 if (revents & (LPOLLERR | LPOLLHUP)) {
2811 DBG("Channel thread pipe hung up");
2812 /*
2813 * Remove the pipe from the poll set and continue the loop
2814 * since their might be data to consume.
2815 */
2816 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2817 continue;
2818 } else if (revents & LPOLLIN) {
2819 enum consumer_channel_action action;
2820 uint64_t key;
2821
2822 ret = read_channel_pipe(ctx, &chan, &key, &action);
2823 if (ret <= 0) {
2824 ERR("Error reading channel pipe");
2825 continue;
2826 }
2827
2828 switch (action) {
2829 case CONSUMER_CHANNEL_ADD:
2830 DBG("Adding channel %d to poll set",
2831 chan->wait_fd);
2832
2833 lttng_ht_node_init_u64(&chan->wait_fd_node,
2834 chan->wait_fd);
2835 rcu_read_lock();
2836 lttng_ht_add_unique_u64(channel_ht,
2837 &chan->wait_fd_node);
2838 rcu_read_unlock();
2839 /* Add channel to the global poll events list */
2840 lttng_poll_add(&events, chan->wait_fd,
2841 LPOLLIN | LPOLLPRI);
2842 break;
2843 case CONSUMER_CHANNEL_DEL:
2844 {
2845 struct lttng_consumer_stream *stream, *stmp;
2846
2847 rcu_read_lock();
2848 chan = consumer_find_channel(key);
2849 if (!chan) {
2850 rcu_read_unlock();
2851 ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
2852 break;
2853 }
2854 lttng_poll_del(&events, chan->wait_fd);
2855 iter.iter.node = &chan->wait_fd_node.node;
2856 ret = lttng_ht_del(channel_ht, &iter);
2857 assert(ret == 0);
2858 consumer_close_channel_streams(chan);
2859
2860 switch (consumer_data.type) {
2861 case LTTNG_CONSUMER_KERNEL:
2862 break;
2863 case LTTNG_CONSUMER32_UST:
2864 case LTTNG_CONSUMER64_UST:
2865 /* Delete streams that might have been left in the stream list. */
2866 cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
2867 send_node) {
2868 health_code_update();
2869
2870 cds_list_del(&stream->send_node);
2871 lttng_ustconsumer_del_stream(stream);
2872 uatomic_sub(&stream->chan->refcount, 1);
2873 assert(&chan->refcount);
2874 free(stream);
2875 }
2876 break;
2877 default:
2878 ERR("Unknown consumer_data type");
2879 assert(0);
2880 }
2881
2882 /*
2883 * Release our own refcount. Force channel deletion even if
2884 * streams were not initialized.
2885 */
2886 if (!uatomic_sub_return(&chan->refcount, 1)) {
2887 consumer_del_channel(chan);
2888 }
2889 rcu_read_unlock();
2890 goto restart;
2891 }
2892 case CONSUMER_CHANNEL_QUIT:
2893 /*
2894 * Remove the pipe from the poll set and continue the loop
2895 * since their might be data to consume.
2896 */
2897 lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
2898 continue;
2899 default:
2900 ERR("Unknown action");
2901 break;
2902 }
2903 }
2904
2905 /* Handle other stream */
2906 continue;
2907 }
2908
2909 rcu_read_lock();
2910 {
2911 uint64_t tmp_id = (uint64_t) pollfd;
2912
2913 lttng_ht_lookup(channel_ht, &tmp_id, &iter);
2914 }
2915 node = lttng_ht_iter_get_node_u64(&iter);
2916 assert(node);
2917
2918 chan = caa_container_of(node, struct lttng_consumer_channel,
2919 wait_fd_node);
2920
2921 /* Check for error event */
2922 if (revents & (LPOLLERR | LPOLLHUP)) {
2923 DBG("Channel fd %d is hup|err.", pollfd);
2924
2925 lttng_poll_del(&events, chan->wait_fd);
2926 ret = lttng_ht_del(channel_ht, &iter);
2927 assert(ret == 0);
2928 consumer_close_channel_streams(chan);
2929
2930 /* Release our own refcount */
2931 if (!uatomic_sub_return(&chan->refcount, 1)
2932 && !uatomic_read(&chan->nb_init_stream_left)) {
2933 consumer_del_channel(chan);
2934 }
2935 }
2936
2937 /* Release RCU lock for the channel looked up */
2938 rcu_read_unlock();
2939 }
2940 }
2941
2942 /* All is OK */
2943 err = 0;
2944 end:
2945 lttng_poll_clean(&events);
2946 end_poll:
2947 destroy_channel_ht(channel_ht);
2948 end_ht:
2949 DBG("Channel poll thread exiting");
2950 if (err) {
2951 health_error();
2952 ERR("Health error occurred in %s", __func__);
2953 }
2954 health_unregister(health_consumerd);
2955 rcu_unregister_thread();
2956 return NULL;
2957 }
2958
2959 static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
2960 struct pollfd *sockpoll, int client_socket)
2961 {
2962 int ret;
2963
2964 assert(ctx);
2965 assert(sockpoll);
2966
2967 if (lttng_consumer_poll_socket(sockpoll) < 0) {
2968 ret = -1;
2969 goto error;
2970 }
2971 DBG("Metadata connection on client_socket");
2972
2973 /* Blocking call, waiting for transmission */
2974 ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
2975 if (ctx->consumer_metadata_socket < 0) {
2976 WARN("On accept metadata");
2977 ret = -1;
2978 goto error;
2979 }
2980 ret = 0;
2981
2982 error:
2983 return ret;
2984 }
2985
2986 /*
2987 * This thread listens on the consumerd socket and receives the file
2988 * descriptors from the session daemon.
2989 */
2990 void *consumer_thread_sessiond_poll(void *data)
2991 {
2992 int sock = -1, client_socket, ret, err = -1;
2993 /*
2994 * structure to poll for incoming data on communication socket avoids
2995 * making blocking sockets.
2996 */
2997 struct pollfd consumer_sockpoll[2];
2998 struct lttng_consumer_local_data *ctx = data;
2999
3000 rcu_register_thread();
3001
3002 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
3003
3004 health_code_update();
3005
3006 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
3007 unlink(ctx->consumer_command_sock_path);
3008 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
3009 if (client_socket < 0) {
3010 ERR("Cannot create command socket");
3011 goto end;
3012 }
3013
3014 ret = lttcomm_listen_unix_sock(client_socket);
3015 if (ret < 0) {
3016 goto end;
3017 }
3018
3019 DBG("Sending ready command to lttng-sessiond");
3020 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3021 /* return < 0 on error, but == 0 is not fatal */
3022 if (ret < 0) {
3023 ERR("Error sending ready command to lttng-sessiond");
3024 goto end;
3025 }
3026
3027 /* prepare the FDs to poll : to client socket and the should_quit pipe */
3028 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
3029 consumer_sockpoll[0].events = POLLIN | POLLPRI;
3030 consumer_sockpoll[1].fd = client_socket;
3031 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3032
3033 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
3034 goto end;
3035 }
3036 DBG("Connection on client_socket");
3037
3038 /* Blocking call, waiting for transmission */
3039 sock = lttcomm_accept_unix_sock(client_socket);
3040 if (sock < 0) {
3041 WARN("On accept");
3042 goto end;
3043 }
3044
3045 /*
3046 * Setup metadata socket which is the second socket connection on the
3047 * command unix socket.
3048 */
3049 ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
3050 if (ret < 0) {
3051 goto end;
3052 }
3053
3054 /* This socket is not useful anymore. */
3055 ret = close(client_socket);
3056 if (ret < 0) {
3057 PERROR("close client_socket");
3058 }
3059 client_socket = -1;
3060
3061 /* update the polling structure to poll on the established socket */
3062 consumer_sockpoll[1].fd = sock;
3063 consumer_sockpoll[1].events = POLLIN | POLLPRI;
3064
3065 while (1) {
3066 health_code_update();
3067
3068 health_poll_entry();
3069 ret = lttng_consumer_poll_socket(consumer_sockpoll);
3070 health_poll_exit();
3071 if (ret < 0) {
3072 goto end;
3073 }
3074 DBG("Incoming command on sock");
3075 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
3076 if (ret == -ENOENT) {
3077 DBG("Received STOP command");
3078 goto end;
3079 }
3080 if (ret <= 0) {
3081 /*
3082 * This could simply be a session daemon quitting. Don't output
3083 * ERR() here.
3084 */
3085 DBG("Communication interrupted on command socket");
3086 err = 0;
3087 goto end;
3088 }
3089 if (consumer_quit) {
3090 DBG("consumer_thread_receive_fds received quit from signal");
3091 err = 0; /* All is OK */
3092 goto end;
3093 }
3094 DBG("received command on sock");
3095 }
3096 /* All is OK */
3097 err = 0;
3098
3099 end:
3100 DBG("Consumer thread sessiond poll exiting");
3101
3102 /*
3103 * Close metadata streams since the producer is the session daemon which
3104 * just died.
3105 *
3106 * NOTE: for now, this only applies to the UST tracer.
3107 */
3108 lttng_consumer_close_metadata();
3109
3110 /*
3111 * when all fds have hung up, the polling thread
3112 * can exit cleanly
3113 */
3114 consumer_quit = 1;
3115
3116 /*
3117 * Notify the data poll thread to poll back again and test the
3118 * consumer_quit state that we just set so to quit gracefully.
3119 */
3120 notify_thread_lttng_pipe(ctx->consumer_data_pipe);
3121
3122 notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
3123
3124 /* Cleaning up possibly open sockets. */
3125 if (sock >= 0) {
3126 ret = close(sock);
3127 if (ret < 0) {
3128 PERROR("close sock sessiond poll");
3129 }
3130 }
3131 if (client_socket >= 0) {
3132 ret = close(client_socket);
3133 if (ret < 0) {
3134 PERROR("close client_socket sessiond poll");
3135 }
3136 }
3137
3138 if (err) {
3139 health_error();
3140 ERR("Health error occurred in %s", __func__);
3141 }
3142 health_unregister(health_consumerd);
3143
3144 rcu_unregister_thread();
3145 return NULL;
3146 }
3147
3148 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
3149 struct lttng_consumer_local_data *ctx)
3150 {
3151 ssize_t ret;
3152
3153 pthread_mutex_lock(&stream->lock);
3154 if (stream->metadata_flag) {
3155 pthread_mutex_lock(&stream->metadata_rdv_lock);
3156 }
3157
3158 switch (consumer_data.type) {
3159 case LTTNG_CONSUMER_KERNEL:
3160 ret = lttng_kconsumer_read_subbuffer(stream, ctx);
3161 break;
3162 case LTTNG_CONSUMER32_UST:
3163 case LTTNG_CONSUMER64_UST:
3164 ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
3165 break;
3166 default:
3167 ERR("Unknown consumer_data type");
3168 assert(0);
3169 ret = -ENOSYS;
3170 break;
3171 }
3172
3173 if (stream->metadata_flag) {
3174 pthread_cond_broadcast(&stream->metadata_rdv);
3175 pthread_mutex_unlock(&stream->metadata_rdv_lock);
3176 }
3177 pthread_mutex_unlock(&stream->lock);
3178 return ret;
3179 }
3180
3181 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
3182 {
3183 switch (consumer_data.type) {
3184 case LTTNG_CONSUMER_KERNEL:
3185 return lttng_kconsumer_on_recv_stream(stream);
3186 case LTTNG_CONSUMER32_UST:
3187 case LTTNG_CONSUMER64_UST:
3188 return lttng_ustconsumer_on_recv_stream(stream);
3189 default:
3190 ERR("Unknown consumer_data type");
3191 assert(0);
3192 return -ENOSYS;
3193 }
3194 }
3195
3196 /*
3197 * Allocate and set consumer data hash tables.
3198 */
3199 void lttng_consumer_init(void)
3200 {
3201 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3202 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3203 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3204 consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3205 }
3206
3207 /*
3208 * Process the ADD_RELAYD command receive by a consumer.
3209 *
3210 * This will create a relayd socket pair and add it to the relayd hash table.
3211 * The caller MUST acquire a RCU read side lock before calling it.
3212 */
3213 int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
3214 struct lttng_consumer_local_data *ctx, int sock,
3215 struct pollfd *consumer_sockpoll,
3216 struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
3217 uint64_t relayd_session_id)
3218 {
3219 int fd = -1, ret = -1, relayd_created = 0;
3220 enum lttng_error_code ret_code = LTTNG_OK;
3221 struct consumer_relayd_sock_pair *relayd = NULL;
3222
3223 assert(ctx);
3224 assert(relayd_sock);
3225
3226 DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
3227
3228 /* Get relayd reference if exists. */
3229 relayd = consumer_find_relayd(net_seq_idx);
3230 if (relayd == NULL) {
3231 assert(sock_type == LTTNG_STREAM_CONTROL);
3232 /* Not found. Allocate one. */
3233 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
3234 if (relayd == NULL) {
3235 ret = -ENOMEM;
3236 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3237 goto error;
3238 } else {
3239 relayd->sessiond_session_id = sessiond_id;
3240 relayd_created = 1;
3241 }
3242
3243 /*
3244 * This code path MUST continue to the consumer send status message to
3245 * we can notify the session daemon and continue our work without
3246 * killing everything.
3247 */
3248 } else {
3249 /*
3250 * relayd key should never be found for control socket.
3251 */
3252 assert(sock_type != LTTNG_STREAM_CONTROL);
3253 }
3254
3255 /* First send a status message before receiving the fds. */
3256 ret = consumer_send_status_msg(sock, LTTNG_OK);
3257 if (ret < 0) {
3258 /* Somehow, the session daemon is not responding anymore. */
3259 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3260 goto error_nosignal;
3261 }
3262
3263 /* Poll on consumer socket. */
3264 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
3265 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3266 ret = -EINTR;
3267 goto error_nosignal;
3268 }
3269
3270 /* Get relayd socket from session daemon */
3271 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
3272 if (ret != sizeof(fd)) {
3273 ret = -1;
3274 fd = -1; /* Just in case it gets set with an invalid value. */
3275
3276 /*
3277 * Failing to receive FDs might indicate a major problem such as
3278 * reaching a fd limit during the receive where the kernel returns a
3279 * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
3280 * don't take any chances and stop everything.
3281 *
3282 * XXX: Feature request #558 will fix that and avoid this possible
3283 * issue when reaching the fd limit.
3284 */
3285 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
3286 ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
3287 goto error;
3288 }
3289
3290 /* Copy socket information and received FD */
3291 switch (sock_type) {
3292 case LTTNG_STREAM_CONTROL:
3293 /* Copy received lttcomm socket */
3294 lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
3295 ret = lttcomm_create_sock(&relayd->control_sock.sock);
3296 /* Handle create_sock error. */
3297 if (ret < 0) {
3298 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3299 goto error;
3300 }
3301 /*
3302 * Close the socket created internally by
3303 * lttcomm_create_sock, so we can replace it by the one
3304 * received from sessiond.
3305 */
3306 if (close(relayd->control_sock.sock.fd)) {
3307 PERROR("close");
3308 }
3309
3310 /* Assign new file descriptor */
3311 relayd->control_sock.sock.fd = fd;
3312 fd = -1; /* For error path */
3313 /* Assign version values. */
3314 relayd->control_sock.major = relayd_sock->major;
3315 relayd->control_sock.minor = relayd_sock->minor;
3316
3317 relayd->relayd_session_id = relayd_session_id;
3318
3319 break;
3320 case LTTNG_STREAM_DATA:
3321 /* Copy received lttcomm socket */
3322 lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
3323 ret = lttcomm_create_sock(&relayd->data_sock.sock);
3324 /* Handle create_sock error. */
3325 if (ret < 0) {
3326 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
3327 goto error;
3328 }
3329 /*
3330 * Close the socket created internally by
3331 * lttcomm_create_sock, so we can replace it by the one
3332 * received from sessiond.
3333 */
3334 if (close(relayd->data_sock.sock.fd)) {
3335 PERROR("close");
3336 }
3337
3338 /* Assign new file descriptor */
3339 relayd->data_sock.sock.fd = fd;
3340 fd = -1; /* for eventual error paths */
3341 /* Assign version values. */
3342 relayd->data_sock.major = relayd_sock->major;
3343 relayd->data_sock.minor = relayd_sock->minor;
3344 break;
3345 default:
3346 ERR("Unknown relayd socket type (%d)", sock_type);
3347 ret = -1;
3348 ret_code = LTTCOMM_CONSUMERD_FATAL;
3349 goto error;
3350 }
3351
3352 DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
3353 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
3354 relayd->net_seq_idx, fd);
3355
3356 /* We successfully added the socket. Send status back. */
3357 ret = consumer_send_status_msg(sock, ret_code);
3358 if (ret < 0) {
3359 /* Somehow, the session daemon is not responding anymore. */
3360 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3361 goto error_nosignal;
3362 }
3363
3364 /*
3365 * Add relayd socket pair to consumer data hashtable. If object already
3366 * exists or on error, the function gracefully returns.
3367 */
3368 add_relayd(relayd);
3369
3370 /* All good! */
3371 return 0;
3372
3373 error:
3374 if (consumer_send_status_msg(sock, ret_code) < 0) {
3375 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
3376 }
3377
3378 error_nosignal:
3379 /* Close received socket if valid. */
3380 if (fd >= 0) {
3381 if (close(fd)) {
3382 PERROR("close received socket");
3383 }
3384 }
3385
3386 if (relayd_created) {
3387 free(relayd);
3388 }
3389
3390 return ret;
3391 }
3392
3393 /*
3394 * Try to lock the stream mutex.
3395 *
3396 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
3397 */
3398 static int stream_try_lock(struct lttng_consumer_stream *stream)
3399 {
3400 int ret;
3401
3402 assert(stream);
3403
3404 /*
3405 * Try to lock the stream mutex. On failure, we know that the stream is
3406 * being used else where hence there is data still being extracted.
3407 */
3408 ret = pthread_mutex_trylock(&stream->lock);
3409 if (ret) {
3410 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
3411 ret = 0;
3412 goto end;
3413 }
3414
3415 ret = 1;
3416
3417 end:
3418 return ret;
3419 }
3420
3421 /*
3422 * Search for a relayd associated to the session id and return the reference.
3423 *
3424 * A rcu read side lock MUST be acquire before calling this function and locked
3425 * until the relayd object is no longer necessary.
3426 */
3427 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
3428 {
3429 struct lttng_ht_iter iter;
3430 struct consumer_relayd_sock_pair *relayd = NULL;
3431
3432 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3433 cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
3434 node.node) {
3435 /*
3436 * Check by sessiond id which is unique here where the relayd session
3437 * id might not be when having multiple relayd.
3438 */
3439 if (relayd->sessiond_session_id == id) {
3440 /* Found the relayd. There can be only one per id. */
3441 goto found;
3442 }
3443 }
3444
3445 return NULL;
3446
3447 found:
3448 return relayd;
3449 }
3450
3451 /*
3452 * Check if for a given session id there is still data needed to be extract
3453 * from the buffers.
3454 *
3455 * Return 1 if data is pending or else 0 meaning ready to be read.
3456 */
3457 int consumer_data_pending(uint64_t id)
3458 {
3459 int ret;
3460 struct lttng_ht_iter iter;
3461 struct lttng_ht *ht;
3462 struct lttng_consumer_stream *stream;
3463 struct consumer_relayd_sock_pair *relayd = NULL;
3464 int (*data_pending)(struct lttng_consumer_stream *);
3465
3466 DBG("Consumer data pending command on session id %" PRIu64, id);
3467
3468 rcu_read_lock();
3469 pthread_mutex_lock(&consumer_data.lock);
3470
3471 switch (consumer_data.type) {
3472 case LTTNG_CONSUMER_KERNEL:
3473 data_pending = lttng_kconsumer_data_pending;
3474 break;
3475 case LTTNG_CONSUMER32_UST:
3476 case LTTNG_CONSUMER64_UST:
3477 data_pending = lttng_ustconsumer_data_pending;
3478 break;
3479 default:
3480 ERR("Unknown consumer data type");
3481 assert(0);
3482 }
3483
3484 /* Ease our life a bit */
3485 ht = consumer_data.stream_list_ht;
3486
3487 relayd = find_relayd_by_session_id(id);
3488 if (relayd) {
3489 /* Send init command for data pending. */
3490 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3491 ret = relayd_begin_data_pending(&relayd->control_sock,
3492 relayd->relayd_session_id);
3493 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3494 if (ret < 0) {
3495 /* Communication error thus the relayd so no data pending. */
3496 goto data_not_pending;
3497 }
3498 }
3499
3500 cds_lfht_for_each_entry_duplicate(ht->ht,
3501 ht->hash_fct(&id, lttng_ht_seed),
3502 ht->match_fct, &id,
3503 &iter.iter, stream, node_session_id.node) {
3504 /* If this call fails, the stream is being used hence data pending. */
3505 ret = stream_try_lock(stream);
3506 if (!ret) {
3507 goto data_pending;
3508 }
3509
3510 /*
3511 * A removed node from the hash table indicates that the stream has
3512 * been deleted thus having a guarantee that the buffers are closed
3513 * on the consumer side. However, data can still be transmitted
3514 * over the network so don't skip the relayd check.
3515 */
3516 ret = cds_lfht_is_node_deleted(&stream->node.node);
3517 if (!ret) {
3518 /*
3519 * An empty output file is not valid. We need at least one packet
3520 * generated per stream, even if it contains no event, so it
3521 * contains at least one packet header.
3522 */
3523 if (stream->output_written == 0) {
3524 pthread_mutex_unlock(&stream->lock);
3525 goto data_pending;
3526 }
3527 /* Check the stream if there is data in the buffers. */
3528 ret = data_pending(stream);
3529 if (ret == 1) {
3530 pthread_mutex_unlock(&stream->lock);
3531 goto data_pending;
3532 }
3533 }
3534
3535 /* Relayd check */
3536 if (relayd) {
3537 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3538 if (stream->metadata_flag) {
3539 ret = relayd_quiescent_control(&relayd->control_sock,
3540 stream->relayd_stream_id);
3541 } else {
3542 ret = relayd_data_pending(&relayd->control_sock,
3543 stream->relayd_stream_id,
3544 stream->next_net_seq_num - 1);
3545 }
3546 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3547 if (ret == 1) {
3548 pthread_mutex_unlock(&stream->lock);
3549 goto data_pending;
3550 }
3551 }
3552 pthread_mutex_unlock(&stream->lock);
3553 }
3554
3555 if (relayd) {
3556 unsigned int is_data_inflight = 0;
3557
3558 /* Send init command for data pending. */
3559 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
3560 ret = relayd_end_data_pending(&relayd->control_sock,
3561 relayd->relayd_session_id, &is_data_inflight);
3562 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
3563 if (ret < 0) {
3564 goto data_not_pending;
3565 }
3566 if (is_data_inflight) {
3567 goto data_pending;
3568 }
3569 }
3570
3571 /*
3572 * Finding _no_ node in the hash table and no inflight data means that the
3573 * stream(s) have been removed thus data is guaranteed to be available for
3574 * analysis from the trace files.
3575 */
3576
3577 data_not_pending:
3578 /* Data is available to be read by a viewer. */
3579 pthread_mutex_unlock(&consumer_data.lock);
3580 rcu_read_unlock();
3581 return 0;
3582
3583 data_pending:
3584 /* Data is still being extracted from buffers. */
3585 pthread_mutex_unlock(&consumer_data.lock);
3586 rcu_read_unlock();
3587 return 1;
3588 }
3589
3590 /*
3591 * Send a ret code status message to the sessiond daemon.
3592 *
3593 * Return the sendmsg() return value.
3594 */
3595 int consumer_send_status_msg(int sock, int ret_code)
3596 {
3597 struct lttcomm_consumer_status_msg msg;
3598
3599 msg.ret_code = ret_code;
3600
3601 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3602 }
3603
3604 /*
3605 * Send a channel status message to the sessiond daemon.
3606 *
3607 * Return the sendmsg() return value.
3608 */
3609 int consumer_send_status_channel(int sock,
3610 struct lttng_consumer_channel *channel)
3611 {
3612 struct lttcomm_consumer_status_channel msg;
3613
3614 assert(sock >= 0);
3615
3616 if (!channel) {
3617 msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
3618 } else {
3619 msg.ret_code = LTTNG_OK;
3620 msg.key = channel->key;
3621 msg.stream_count = channel->streams.count;
3622 }
3623
3624 return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
3625 }
3626
3627 /*
3628 * Using a maximum stream size with the produced and consumed position of a
3629 * stream, computes the new consumed position to be as close as possible to the
3630 * maximum possible stream size.
3631 *
3632 * If maximum stream size is lower than the possible buffer size (produced -
3633 * consumed), the consumed_pos given is returned untouched else the new value
3634 * is returned.
3635 */
3636 unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
3637 unsigned long produced_pos, uint64_t max_stream_size)
3638 {
3639 if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
3640 /* Offset from the produced position to get the latest buffers. */
3641 return produced_pos - max_stream_size;
3642 }
3643
3644 return consumed_pos;
3645 }
This page took 0.156586 seconds and 4 git commands to generate.