Fix: Sync issue when deleting a data stream
[lttng-tools.git] / src / common / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <inttypes.h>
31
32 #include <common/common.h>
33 #include <common/utils.h>
34 #include <common/compat/poll.h>
35 #include <common/kernel-ctl/kernel-ctl.h>
36 #include <common/sessiond-comm/relayd.h>
37 #include <common/sessiond-comm/sessiond-comm.h>
38 #include <common/kernel-consumer/kernel-consumer.h>
39 #include <common/relayd/relayd.h>
40 #include <common/ust-consumer/ust-consumer.h>
41
42 #include "consumer.h"
43
44 struct lttng_consumer_global_data consumer_data = {
45 .stream_count = 0,
46 .need_update = 1,
47 .type = LTTNG_CONSUMER_UNKNOWN,
48 };
49
50 /* timeout parameter, to control the polling thread grace period. */
51 int consumer_poll_timeout = -1;
52
53 /*
54 * Flag to inform the polling thread to quit when all fd hung up. Updated by
55 * the consumer_thread_receive_fds when it notices that all fds has hung up.
56 * Also updated by the signal handler (consumer_should_exit()). Read by the
57 * polling threads.
58 */
59 volatile int consumer_quit;
60
61 /*
62 * The following two hash tables are visible by all threads which are separated
63 * in different source files.
64 *
65 * Global hash table containing respectively metadata and data streams. The
66 * stream element in this ht should only be updated by the metadata poll thread
67 * for the metadata and the data poll thread for the data.
68 */
69 struct lttng_ht *metadata_ht;
70 struct lttng_ht *data_ht;
71
72 /*
73 * Notify a thread pipe to poll back again. This usually means that some global
74 * state has changed so we just send back the thread in a poll wait call.
75 */
76 static void notify_thread_pipe(int wpipe)
77 {
78 int ret;
79
80 do {
81 struct lttng_consumer_stream *null_stream = NULL;
82
83 ret = write(wpipe, &null_stream, sizeof(null_stream));
84 } while (ret < 0 && errno == EINTR);
85 }
86
87 /*
88 * Find a stream. The consumer_data.lock must be locked during this
89 * call.
90 */
91 static struct lttng_consumer_stream *consumer_find_stream(int key,
92 struct lttng_ht *ht)
93 {
94 struct lttng_ht_iter iter;
95 struct lttng_ht_node_ulong *node;
96 struct lttng_consumer_stream *stream = NULL;
97
98 assert(ht);
99
100 /* Negative keys are lookup failures */
101 if (key < 0) {
102 return NULL;
103 }
104
105 rcu_read_lock();
106
107 lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
108 node = lttng_ht_iter_get_node_ulong(&iter);
109 if (node != NULL) {
110 stream = caa_container_of(node, struct lttng_consumer_stream, node);
111 }
112
113 rcu_read_unlock();
114
115 return stream;
116 }
117
118 void consumer_steal_stream_key(int key, struct lttng_ht *ht)
119 {
120 struct lttng_consumer_stream *stream;
121
122 rcu_read_lock();
123 stream = consumer_find_stream(key, ht);
124 if (stream) {
125 stream->key = -1;
126 /*
127 * We don't want the lookup to match, but we still need
128 * to iterate on this stream when iterating over the hash table. Just
129 * change the node key.
130 */
131 stream->node.key = -1;
132 }
133 rcu_read_unlock();
134 }
135
136 static struct lttng_consumer_channel *consumer_find_channel(int key)
137 {
138 struct lttng_ht_iter iter;
139 struct lttng_ht_node_ulong *node;
140 struct lttng_consumer_channel *channel = NULL;
141
142 /* Negative keys are lookup failures */
143 if (key < 0) {
144 return NULL;
145 }
146
147 rcu_read_lock();
148
149 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
150 &iter);
151 node = lttng_ht_iter_get_node_ulong(&iter);
152 if (node != NULL) {
153 channel = caa_container_of(node, struct lttng_consumer_channel, node);
154 }
155
156 rcu_read_unlock();
157
158 return channel;
159 }
160
161 static void consumer_steal_channel_key(int key)
162 {
163 struct lttng_consumer_channel *channel;
164
165 rcu_read_lock();
166 channel = consumer_find_channel(key);
167 if (channel) {
168 channel->key = -1;
169 /*
170 * We don't want the lookup to match, but we still need
171 * to iterate on this channel when iterating over the hash table. Just
172 * change the node key.
173 */
174 channel->node.key = -1;
175 }
176 rcu_read_unlock();
177 }
178
179 static
180 void consumer_free_stream(struct rcu_head *head)
181 {
182 struct lttng_ht_node_ulong *node =
183 caa_container_of(head, struct lttng_ht_node_ulong, head);
184 struct lttng_consumer_stream *stream =
185 caa_container_of(node, struct lttng_consumer_stream, node);
186
187 free(stream);
188 }
189
190 /*
191 * RCU protected relayd socket pair free.
192 */
193 static void consumer_rcu_free_relayd(struct rcu_head *head)
194 {
195 struct lttng_ht_node_ulong *node =
196 caa_container_of(head, struct lttng_ht_node_ulong, head);
197 struct consumer_relayd_sock_pair *relayd =
198 caa_container_of(node, struct consumer_relayd_sock_pair, node);
199
200 /*
201 * Close all sockets. This is done in the call RCU since we don't want the
202 * socket fds to be reassigned thus potentially creating bad state of the
203 * relayd object.
204 *
205 * We do not have to lock the control socket mutex here since at this stage
206 * there is no one referencing to this relayd object.
207 */
208 (void) relayd_close(&relayd->control_sock);
209 (void) relayd_close(&relayd->data_sock);
210
211 free(relayd);
212 }
213
214 /*
215 * Destroy and free relayd socket pair object.
216 *
217 * This function MUST be called with the consumer_data lock acquired.
218 */
219 static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
220 {
221 int ret;
222 struct lttng_ht_iter iter;
223
224 if (relayd == NULL) {
225 return;
226 }
227
228 DBG("Consumer destroy and close relayd socket pair");
229
230 iter.iter.node = &relayd->node.node;
231 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
232 if (ret != 0) {
233 /* We assume the relayd is being or is destroyed */
234 return;
235 }
236
237 /* RCU free() call */
238 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
239 }
240
241 /*
242 * Update the end point status of all streams having the given network sequence
243 * index (relayd index).
244 *
245 * It's atomically set without having the stream mutex locked which is fine
246 * because we handle the write/read race with a pipe wakeup for each thread.
247 */
248 static void update_endpoint_status_by_netidx(int net_seq_idx,
249 enum consumer_endpoint_status status)
250 {
251 struct lttng_ht_iter iter;
252 struct lttng_consumer_stream *stream;
253
254 DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
255
256 rcu_read_lock();
257
258 /* Let's begin with metadata */
259 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
260 if (stream->net_seq_idx == net_seq_idx) {
261 uatomic_set(&stream->endpoint_status, status);
262 DBG("Delete flag set to metadata stream %d", stream->wait_fd);
263 }
264 }
265
266 /* Follow up by the data streams */
267 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
268 if (stream->net_seq_idx == net_seq_idx) {
269 uatomic_set(&stream->endpoint_status, status);
270 DBG("Delete flag set to data stream %d", stream->wait_fd);
271 }
272 }
273 rcu_read_unlock();
274 }
275
276 /*
277 * Cleanup a relayd object by flagging every associated streams for deletion,
278 * destroying the object meaning removing it from the relayd hash table,
279 * closing the sockets and freeing the memory in a RCU call.
280 *
281 * If a local data context is available, notify the threads that the streams'
282 * state have changed.
283 */
284 static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
285 struct lttng_consumer_local_data *ctx)
286 {
287 int netidx;
288
289 assert(relayd);
290
291 DBG("Cleaning up relayd sockets");
292
293 /* Save the net sequence index before destroying the object */
294 netidx = relayd->net_seq_idx;
295
296 /*
297 * Delete the relayd from the relayd hash table, close the sockets and free
298 * the object in a RCU call.
299 */
300 destroy_relayd(relayd);
301
302 /* Set inactive endpoint to all streams */
303 update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
304
305 /*
306 * With a local data context, notify the threads that the streams' state
307 * have changed. The write() action on the pipe acts as an "implicit"
308 * memory barrier ordering the updates of the end point status from the
309 * read of this status which happens AFTER receiving this notify.
310 */
311 if (ctx) {
312 notify_thread_pipe(ctx->consumer_data_pipe[1]);
313 notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
314 }
315 }
316
317 /*
318 * Flag a relayd socket pair for destruction. Destroy it if the refcount
319 * reaches zero.
320 *
321 * RCU read side lock MUST be aquired before calling this function.
322 */
323 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
324 {
325 assert(relayd);
326
327 /* Set destroy flag for this object */
328 uatomic_set(&relayd->destroy_flag, 1);
329
330 /* Destroy the relayd if refcount is 0 */
331 if (uatomic_read(&relayd->refcount) == 0) {
332 destroy_relayd(relayd);
333 }
334 }
335
336 /*
337 * Remove a stream from the global list protected by a mutex. This
338 * function is also responsible for freeing its data structures.
339 */
340 void consumer_del_stream(struct lttng_consumer_stream *stream,
341 struct lttng_ht *ht)
342 {
343 int ret;
344 struct lttng_ht_iter iter;
345 struct lttng_consumer_channel *free_chan = NULL;
346 struct consumer_relayd_sock_pair *relayd;
347
348 assert(stream);
349
350 DBG("Consumer del stream %d", stream->wait_fd);
351
352 if (ht == NULL) {
353 /* Means the stream was allocated but not successfully added */
354 goto free_stream;
355 }
356
357 pthread_mutex_lock(&stream->lock);
358 pthread_mutex_lock(&consumer_data.lock);
359
360 switch (consumer_data.type) {
361 case LTTNG_CONSUMER_KERNEL:
362 if (stream->mmap_base != NULL) {
363 ret = munmap(stream->mmap_base, stream->mmap_len);
364 if (ret != 0) {
365 PERROR("munmap");
366 }
367 }
368 break;
369 case LTTNG_CONSUMER32_UST:
370 case LTTNG_CONSUMER64_UST:
371 lttng_ustconsumer_del_stream(stream);
372 break;
373 default:
374 ERR("Unknown consumer_data type");
375 assert(0);
376 goto end;
377 }
378
379 rcu_read_lock();
380 iter.iter.node = &stream->node.node;
381 ret = lttng_ht_del(ht, &iter);
382 assert(!ret);
383
384 /* Remove node session id from the consumer_data stream ht */
385 iter.iter.node = &stream->node_session_id.node;
386 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
387 assert(!ret);
388 rcu_read_unlock();
389
390 assert(consumer_data.stream_count > 0);
391 consumer_data.stream_count--;
392
393 if (stream->out_fd >= 0) {
394 ret = close(stream->out_fd);
395 if (ret) {
396 PERROR("close");
397 }
398 }
399 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
400 ret = close(stream->wait_fd);
401 if (ret) {
402 PERROR("close");
403 }
404 }
405 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
406 ret = close(stream->shm_fd);
407 if (ret) {
408 PERROR("close");
409 }
410 }
411
412 /* Check and cleanup relayd */
413 rcu_read_lock();
414 relayd = consumer_find_relayd(stream->net_seq_idx);
415 if (relayd != NULL) {
416 uatomic_dec(&relayd->refcount);
417 assert(uatomic_read(&relayd->refcount) >= 0);
418
419 /* Closing streams requires to lock the control socket. */
420 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
421 ret = relayd_send_close_stream(&relayd->control_sock,
422 stream->relayd_stream_id,
423 stream->next_net_seq_num - 1);
424 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
425 if (ret < 0) {
426 DBG("Unable to close stream on the relayd. Continuing");
427 /*
428 * Continue here. There is nothing we can do for the relayd.
429 * Chances are that the relayd has closed the socket so we just
430 * continue cleaning up.
431 */
432 }
433
434 /* Both conditions are met, we destroy the relayd. */
435 if (uatomic_read(&relayd->refcount) == 0 &&
436 uatomic_read(&relayd->destroy_flag)) {
437 destroy_relayd(relayd);
438 }
439 }
440 rcu_read_unlock();
441
442 uatomic_dec(&stream->chan->refcount);
443 if (!uatomic_read(&stream->chan->refcount)
444 && !uatomic_read(&stream->chan->nb_init_streams)) {
445 free_chan = stream->chan;
446 }
447
448 end:
449 consumer_data.need_update = 1;
450 pthread_mutex_unlock(&consumer_data.lock);
451 pthread_mutex_unlock(&stream->lock);
452
453 if (free_chan) {
454 consumer_del_channel(free_chan);
455 }
456
457 free_stream:
458 call_rcu(&stream->node.head, consumer_free_stream);
459 }
460
461 struct lttng_consumer_stream *consumer_allocate_stream(
462 int channel_key, int stream_key,
463 int shm_fd, int wait_fd,
464 enum lttng_consumer_stream_state state,
465 uint64_t mmap_len,
466 enum lttng_event_output output,
467 const char *path_name,
468 uid_t uid,
469 gid_t gid,
470 int net_index,
471 int metadata_flag,
472 uint64_t session_id,
473 int *alloc_ret)
474 {
475 struct lttng_consumer_stream *stream;
476
477 stream = zmalloc(sizeof(*stream));
478 if (stream == NULL) {
479 PERROR("malloc struct lttng_consumer_stream");
480 *alloc_ret = -ENOMEM;
481 goto end;
482 }
483
484 /*
485 * Get stream's channel reference. Needed when adding the stream to the
486 * global hash table.
487 */
488 stream->chan = consumer_find_channel(channel_key);
489 if (!stream->chan) {
490 *alloc_ret = -ENOENT;
491 ERR("Unable to find channel for stream %d", stream_key);
492 goto error;
493 }
494
495 stream->key = stream_key;
496 stream->shm_fd = shm_fd;
497 stream->wait_fd = wait_fd;
498 stream->out_fd = -1;
499 stream->out_fd_offset = 0;
500 stream->state = state;
501 stream->mmap_len = mmap_len;
502 stream->mmap_base = NULL;
503 stream->output = output;
504 stream->uid = uid;
505 stream->gid = gid;
506 stream->net_seq_idx = net_index;
507 stream->metadata_flag = metadata_flag;
508 stream->session_id = session_id;
509 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
510 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
511 pthread_mutex_init(&stream->lock, NULL);
512
513 /*
514 * Index differently the metadata node because the thread is using an
515 * internal hash table to match streams in the metadata_ht to the epoll set
516 * file descriptor.
517 */
518 if (metadata_flag) {
519 lttng_ht_node_init_ulong(&stream->node, stream->wait_fd);
520 } else {
521 lttng_ht_node_init_ulong(&stream->node, stream->key);
522 }
523
524 /* Init session id node with the stream session id */
525 lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
526
527 /*
528 * The cpu number is needed before using any ustctl_* actions. Ignored for
529 * the kernel so the value does not matter.
530 */
531 pthread_mutex_lock(&consumer_data.lock);
532 stream->cpu = stream->chan->cpucount++;
533 pthread_mutex_unlock(&consumer_data.lock);
534
535 DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
536 " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
537 stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
538 (unsigned long long) stream->mmap_len, stream->out_fd,
539 stream->net_seq_idx, stream->session_id);
540 return stream;
541
542 error:
543 free(stream);
544 end:
545 return NULL;
546 }
547
548 /*
549 * Add a stream to the global list protected by a mutex.
550 */
551 static int consumer_add_stream(struct lttng_consumer_stream *stream,
552 struct lttng_ht *ht)
553 {
554 int ret = 0;
555 struct consumer_relayd_sock_pair *relayd;
556
557 assert(stream);
558 assert(ht);
559
560 DBG3("Adding consumer stream %d", stream->key);
561
562 pthread_mutex_lock(&consumer_data.lock);
563 rcu_read_lock();
564
565 /* Steal stream identifier to avoid having streams with the same key */
566 consumer_steal_stream_key(stream->key, ht);
567
568 lttng_ht_add_unique_ulong(ht, &stream->node);
569
570 /*
571 * Add stream to the stream_list_ht of the consumer data. No need to steal
572 * the key since the HT does not use it and we allow to add redundant keys
573 * into this table.
574 */
575 lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
576
577 /* Check and cleanup relayd */
578 relayd = consumer_find_relayd(stream->net_seq_idx);
579 if (relayd != NULL) {
580 uatomic_inc(&relayd->refcount);
581 }
582
583 /* Update channel refcount once added without error(s). */
584 uatomic_inc(&stream->chan->refcount);
585
586 /*
587 * When nb_init_streams reaches 0, we don't need to trigger any action in
588 * terms of destroying the associated channel, because the action that
589 * causes the count to become 0 also causes a stream to be added. The
590 * channel deletion will thus be triggered by the following removal of this
591 * stream.
592 */
593 if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
594 uatomic_dec(&stream->chan->nb_init_streams);
595 }
596
597 /* Update consumer data once the node is inserted. */
598 consumer_data.stream_count++;
599 consumer_data.need_update = 1;
600
601 rcu_read_unlock();
602 pthread_mutex_unlock(&consumer_data.lock);
603
604 return ret;
605 }
606
607 /*
608 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
609 * be acquired before calling this.
610 */
611 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
612 {
613 int ret = 0;
614 struct lttng_ht_node_ulong *node;
615 struct lttng_ht_iter iter;
616
617 if (relayd == NULL) {
618 ret = -1;
619 goto end;
620 }
621
622 lttng_ht_lookup(consumer_data.relayd_ht,
623 (void *)((unsigned long) relayd->net_seq_idx), &iter);
624 node = lttng_ht_iter_get_node_ulong(&iter);
625 if (node != NULL) {
626 /* Relayd already exist. Ignore the insertion */
627 goto end;
628 }
629 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
630
631 end:
632 return ret;
633 }
634
635 /*
636 * Allocate and return a consumer relayd socket.
637 */
638 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
639 int net_seq_idx)
640 {
641 struct consumer_relayd_sock_pair *obj = NULL;
642
643 /* Negative net sequence index is a failure */
644 if (net_seq_idx < 0) {
645 goto error;
646 }
647
648 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
649 if (obj == NULL) {
650 PERROR("zmalloc relayd sock");
651 goto error;
652 }
653
654 obj->net_seq_idx = net_seq_idx;
655 obj->refcount = 0;
656 obj->destroy_flag = 0;
657 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
658 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
659
660 error:
661 return obj;
662 }
663
664 /*
665 * Find a relayd socket pair in the global consumer data.
666 *
667 * Return the object if found else NULL.
668 * RCU read-side lock must be held across this call and while using the
669 * returned object.
670 */
671 struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
672 {
673 struct lttng_ht_iter iter;
674 struct lttng_ht_node_ulong *node;
675 struct consumer_relayd_sock_pair *relayd = NULL;
676
677 /* Negative keys are lookup failures */
678 if (key < 0) {
679 goto error;
680 }
681
682 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
683 &iter);
684 node = lttng_ht_iter_get_node_ulong(&iter);
685 if (node != NULL) {
686 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
687 }
688
689 error:
690 return relayd;
691 }
692
693 /*
694 * Handle stream for relayd transmission if the stream applies for network
695 * streaming where the net sequence index is set.
696 *
697 * Return destination file descriptor or negative value on error.
698 */
699 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
700 size_t data_size, unsigned long padding,
701 struct consumer_relayd_sock_pair *relayd)
702 {
703 int outfd = -1, ret;
704 struct lttcomm_relayd_data_hdr data_hdr;
705
706 /* Safety net */
707 assert(stream);
708 assert(relayd);
709
710 /* Reset data header */
711 memset(&data_hdr, 0, sizeof(data_hdr));
712
713 if (stream->metadata_flag) {
714 /* Caller MUST acquire the relayd control socket lock */
715 ret = relayd_send_metadata(&relayd->control_sock, data_size);
716 if (ret < 0) {
717 goto error;
718 }
719
720 /* Metadata are always sent on the control socket. */
721 outfd = relayd->control_sock.fd;
722 } else {
723 /* Set header with stream information */
724 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
725 data_hdr.data_size = htobe32(data_size);
726 data_hdr.padding_size = htobe32(padding);
727 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
728 /* Other fields are zeroed previously */
729
730 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
731 sizeof(data_hdr));
732 if (ret < 0) {
733 goto error;
734 }
735
736 /* Set to go on data socket */
737 outfd = relayd->data_sock.fd;
738 }
739
740 error:
741 return outfd;
742 }
743
744 static
745 void consumer_free_channel(struct rcu_head *head)
746 {
747 struct lttng_ht_node_ulong *node =
748 caa_container_of(head, struct lttng_ht_node_ulong, head);
749 struct lttng_consumer_channel *channel =
750 caa_container_of(node, struct lttng_consumer_channel, node);
751
752 free(channel);
753 }
754
755 /*
756 * Remove a channel from the global list protected by a mutex. This
757 * function is also responsible for freeing its data structures.
758 */
759 void consumer_del_channel(struct lttng_consumer_channel *channel)
760 {
761 int ret;
762 struct lttng_ht_iter iter;
763
764 pthread_mutex_lock(&consumer_data.lock);
765
766 switch (consumer_data.type) {
767 case LTTNG_CONSUMER_KERNEL:
768 break;
769 case LTTNG_CONSUMER32_UST:
770 case LTTNG_CONSUMER64_UST:
771 lttng_ustconsumer_del_channel(channel);
772 break;
773 default:
774 ERR("Unknown consumer_data type");
775 assert(0);
776 goto end;
777 }
778
779 rcu_read_lock();
780 iter.iter.node = &channel->node.node;
781 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
782 assert(!ret);
783 rcu_read_unlock();
784
785 if (channel->mmap_base != NULL) {
786 ret = munmap(channel->mmap_base, channel->mmap_len);
787 if (ret != 0) {
788 PERROR("munmap");
789 }
790 }
791 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
792 ret = close(channel->wait_fd);
793 if (ret) {
794 PERROR("close");
795 }
796 }
797 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
798 ret = close(channel->shm_fd);
799 if (ret) {
800 PERROR("close");
801 }
802 }
803
804 call_rcu(&channel->node.head, consumer_free_channel);
805 end:
806 pthread_mutex_unlock(&consumer_data.lock);
807 }
808
809 struct lttng_consumer_channel *consumer_allocate_channel(
810 int channel_key,
811 int shm_fd, int wait_fd,
812 uint64_t mmap_len,
813 uint64_t max_sb_size,
814 unsigned int nb_init_streams)
815 {
816 struct lttng_consumer_channel *channel;
817 int ret;
818
819 channel = zmalloc(sizeof(*channel));
820 if (channel == NULL) {
821 PERROR("malloc struct lttng_consumer_channel");
822 goto end;
823 }
824 channel->key = channel_key;
825 channel->shm_fd = shm_fd;
826 channel->wait_fd = wait_fd;
827 channel->mmap_len = mmap_len;
828 channel->max_sb_size = max_sb_size;
829 channel->refcount = 0;
830 channel->nb_init_streams = nb_init_streams;
831 lttng_ht_node_init_ulong(&channel->node, channel->key);
832
833 switch (consumer_data.type) {
834 case LTTNG_CONSUMER_KERNEL:
835 channel->mmap_base = NULL;
836 channel->mmap_len = 0;
837 break;
838 case LTTNG_CONSUMER32_UST:
839 case LTTNG_CONSUMER64_UST:
840 ret = lttng_ustconsumer_allocate_channel(channel);
841 if (ret) {
842 free(channel);
843 return NULL;
844 }
845 break;
846 default:
847 ERR("Unknown consumer_data type");
848 assert(0);
849 goto end;
850 }
851 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
852 channel->key, channel->shm_fd, channel->wait_fd,
853 (unsigned long long) channel->mmap_len,
854 (unsigned long long) channel->max_sb_size);
855 end:
856 return channel;
857 }
858
859 /*
860 * Add a channel to the global list protected by a mutex.
861 */
862 int consumer_add_channel(struct lttng_consumer_channel *channel)
863 {
864 struct lttng_ht_node_ulong *node;
865 struct lttng_ht_iter iter;
866
867 pthread_mutex_lock(&consumer_data.lock);
868 /* Steal channel identifier, for UST */
869 consumer_steal_channel_key(channel->key);
870 rcu_read_lock();
871
872 lttng_ht_lookup(consumer_data.channel_ht,
873 (void *)((unsigned long) channel->key), &iter);
874 node = lttng_ht_iter_get_node_ulong(&iter);
875 if (node != NULL) {
876 /* Channel already exist. Ignore the insertion */
877 goto end;
878 }
879
880 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
881
882 end:
883 rcu_read_unlock();
884 pthread_mutex_unlock(&consumer_data.lock);
885
886 return 0;
887 }
888
889 /*
890 * Allocate the pollfd structure and the local view of the out fds to avoid
891 * doing a lookup in the linked list and concurrency issues when writing is
892 * needed. Called with consumer_data.lock held.
893 *
894 * Returns the number of fds in the structures.
895 */
896 static int consumer_update_poll_array(
897 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
898 struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
899 {
900 int i = 0;
901 struct lttng_ht_iter iter;
902 struct lttng_consumer_stream *stream;
903
904 DBG("Updating poll fd array");
905 rcu_read_lock();
906 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
907 /*
908 * Only active streams with an active end point can be added to the
909 * poll set and local stream storage of the thread.
910 *
911 * There is a potential race here for endpoint_status to be updated
912 * just after the check. However, this is OK since the stream(s) will
913 * be deleted once the thread is notified that the end point state has
914 * changed where this function will be called back again.
915 */
916 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
917 stream->endpoint_status) {
918 continue;
919 }
920 DBG("Active FD %d", stream->wait_fd);
921 (*pollfd)[i].fd = stream->wait_fd;
922 (*pollfd)[i].events = POLLIN | POLLPRI;
923 local_stream[i] = stream;
924 i++;
925 }
926 rcu_read_unlock();
927
928 /*
929 * Insert the consumer_data_pipe at the end of the array and don't
930 * increment i so nb_fd is the number of real FD.
931 */
932 (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
933 (*pollfd)[i].events = POLLIN | POLLPRI;
934 return i;
935 }
936
937 /*
938 * Poll on the should_quit pipe and the command socket return -1 on error and
939 * should exit, 0 if data is available on the command socket
940 */
941 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
942 {
943 int num_rdy;
944
945 restart:
946 num_rdy = poll(consumer_sockpoll, 2, -1);
947 if (num_rdy == -1) {
948 /*
949 * Restart interrupted system call.
950 */
951 if (errno == EINTR) {
952 goto restart;
953 }
954 PERROR("Poll error");
955 goto exit;
956 }
957 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
958 DBG("consumer_should_quit wake up");
959 goto exit;
960 }
961 return 0;
962
963 exit:
964 return -1;
965 }
966
967 /*
968 * Set the error socket.
969 */
970 void lttng_consumer_set_error_sock(
971 struct lttng_consumer_local_data *ctx, int sock)
972 {
973 ctx->consumer_error_socket = sock;
974 }
975
976 /*
977 * Set the command socket path.
978 */
979 void lttng_consumer_set_command_sock_path(
980 struct lttng_consumer_local_data *ctx, char *sock)
981 {
982 ctx->consumer_command_sock_path = sock;
983 }
984
985 /*
986 * Send return code to the session daemon.
987 * If the socket is not defined, we return 0, it is not a fatal error
988 */
989 int lttng_consumer_send_error(
990 struct lttng_consumer_local_data *ctx, int cmd)
991 {
992 if (ctx->consumer_error_socket > 0) {
993 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
994 sizeof(enum lttcomm_sessiond_command));
995 }
996
997 return 0;
998 }
999
1000 /*
1001 * Close all the tracefiles and stream fds, should be called when all instances
1002 * are destroyed.
1003 */
1004 void lttng_consumer_cleanup(void)
1005 {
1006 struct lttng_ht_iter iter;
1007 struct lttng_ht_node_ulong *node;
1008
1009 rcu_read_lock();
1010
1011 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
1012 node) {
1013 struct lttng_consumer_channel *channel =
1014 caa_container_of(node, struct lttng_consumer_channel, node);
1015 consumer_del_channel(channel);
1016 }
1017
1018 rcu_read_unlock();
1019
1020 lttng_ht_destroy(consumer_data.channel_ht);
1021 }
1022
1023 /*
1024 * Called from signal handler.
1025 */
1026 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
1027 {
1028 int ret;
1029 consumer_quit = 1;
1030 do {
1031 ret = write(ctx->consumer_should_quit[1], "4", 1);
1032 } while (ret < 0 && errno == EINTR);
1033 if (ret < 0) {
1034 PERROR("write consumer quit");
1035 }
1036
1037 DBG("Consumer flag that it should quit");
1038 }
1039
1040 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
1041 off_t orig_offset)
1042 {
1043 int outfd = stream->out_fd;
1044
1045 /*
1046 * This does a blocking write-and-wait on any page that belongs to the
1047 * subbuffer prior to the one we just wrote.
1048 * Don't care about error values, as these are just hints and ways to
1049 * limit the amount of page cache used.
1050 */
1051 if (orig_offset < stream->chan->max_sb_size) {
1052 return;
1053 }
1054 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
1055 stream->chan->max_sb_size,
1056 SYNC_FILE_RANGE_WAIT_BEFORE
1057 | SYNC_FILE_RANGE_WRITE
1058 | SYNC_FILE_RANGE_WAIT_AFTER);
1059 /*
1060 * Give hints to the kernel about how we access the file:
1061 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1062 * we write it.
1063 *
1064 * We need to call fadvise again after the file grows because the
1065 * kernel does not seem to apply fadvise to non-existing parts of the
1066 * file.
1067 *
1068 * Call fadvise _after_ having waited for the page writeback to
1069 * complete because the dirty page writeback semantic is not well
1070 * defined. So it can be expected to lead to lower throughput in
1071 * streaming.
1072 */
1073 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
1074 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
1075 }
1076
1077 /*
1078 * Initialise the necessary environnement :
1079 * - create a new context
1080 * - create the poll_pipe
1081 * - create the should_quit pipe (for signal handler)
1082 * - create the thread pipe (for splice)
1083 *
1084 * Takes a function pointer as argument, this function is called when data is
1085 * available on a buffer. This function is responsible to do the
1086 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1087 * buffer configuration and then kernctl_put_next_subbuf at the end.
1088 *
1089 * Returns a pointer to the new context or NULL on error.
1090 */
1091 struct lttng_consumer_local_data *lttng_consumer_create(
1092 enum lttng_consumer_type type,
1093 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
1094 struct lttng_consumer_local_data *ctx),
1095 int (*recv_channel)(struct lttng_consumer_channel *channel),
1096 int (*recv_stream)(struct lttng_consumer_stream *stream),
1097 int (*update_stream)(int stream_key, uint32_t state))
1098 {
1099 int ret, i;
1100 struct lttng_consumer_local_data *ctx;
1101
1102 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
1103 consumer_data.type == type);
1104 consumer_data.type = type;
1105
1106 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
1107 if (ctx == NULL) {
1108 PERROR("allocating context");
1109 goto error;
1110 }
1111
1112 ctx->consumer_error_socket = -1;
1113 /* assign the callbacks */
1114 ctx->on_buffer_ready = buffer_ready;
1115 ctx->on_recv_channel = recv_channel;
1116 ctx->on_recv_stream = recv_stream;
1117 ctx->on_update_stream = update_stream;
1118
1119 ret = pipe(ctx->consumer_data_pipe);
1120 if (ret < 0) {
1121 PERROR("Error creating poll pipe");
1122 goto error_poll_pipe;
1123 }
1124
1125 /* set read end of the pipe to non-blocking */
1126 ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
1127 if (ret < 0) {
1128 PERROR("fcntl O_NONBLOCK");
1129 goto error_poll_fcntl;
1130 }
1131
1132 /* set write end of the pipe to non-blocking */
1133 ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
1134 if (ret < 0) {
1135 PERROR("fcntl O_NONBLOCK");
1136 goto error_poll_fcntl;
1137 }
1138
1139 ret = pipe(ctx->consumer_should_quit);
1140 if (ret < 0) {
1141 PERROR("Error creating recv pipe");
1142 goto error_quit_pipe;
1143 }
1144
1145 ret = pipe(ctx->consumer_thread_pipe);
1146 if (ret < 0) {
1147 PERROR("Error creating thread pipe");
1148 goto error_thread_pipe;
1149 }
1150
1151 ret = utils_create_pipe(ctx->consumer_metadata_pipe);
1152 if (ret < 0) {
1153 goto error_metadata_pipe;
1154 }
1155
1156 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1157 if (ret < 0) {
1158 goto error_splice_pipe;
1159 }
1160
1161 return ctx;
1162
1163 error_splice_pipe:
1164 utils_close_pipe(ctx->consumer_metadata_pipe);
1165 error_metadata_pipe:
1166 utils_close_pipe(ctx->consumer_thread_pipe);
1167 error_thread_pipe:
1168 for (i = 0; i < 2; i++) {
1169 int err;
1170
1171 err = close(ctx->consumer_should_quit[i]);
1172 if (err) {
1173 PERROR("close");
1174 }
1175 }
1176 error_poll_fcntl:
1177 error_quit_pipe:
1178 for (i = 0; i < 2; i++) {
1179 int err;
1180
1181 err = close(ctx->consumer_data_pipe[i]);
1182 if (err) {
1183 PERROR("close");
1184 }
1185 }
1186 error_poll_pipe:
1187 free(ctx);
1188 error:
1189 return NULL;
1190 }
1191
1192 /*
1193 * Close all fds associated with the instance and free the context.
1194 */
1195 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1196 {
1197 int ret;
1198
1199 DBG("Consumer destroying it. Closing everything.");
1200
1201 ret = close(ctx->consumer_error_socket);
1202 if (ret) {
1203 PERROR("close");
1204 }
1205 ret = close(ctx->consumer_thread_pipe[0]);
1206 if (ret) {
1207 PERROR("close");
1208 }
1209 ret = close(ctx->consumer_thread_pipe[1]);
1210 if (ret) {
1211 PERROR("close");
1212 }
1213 ret = close(ctx->consumer_data_pipe[0]);
1214 if (ret) {
1215 PERROR("close");
1216 }
1217 ret = close(ctx->consumer_data_pipe[1]);
1218 if (ret) {
1219 PERROR("close");
1220 }
1221 ret = close(ctx->consumer_should_quit[0]);
1222 if (ret) {
1223 PERROR("close");
1224 }
1225 ret = close(ctx->consumer_should_quit[1]);
1226 if (ret) {
1227 PERROR("close");
1228 }
1229 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1230
1231 unlink(ctx->consumer_command_sock_path);
1232 free(ctx);
1233 }
1234
1235 /*
1236 * Write the metadata stream id on the specified file descriptor.
1237 */
1238 static int write_relayd_metadata_id(int fd,
1239 struct lttng_consumer_stream *stream,
1240 struct consumer_relayd_sock_pair *relayd,
1241 unsigned long padding)
1242 {
1243 int ret;
1244 struct lttcomm_relayd_metadata_payload hdr;
1245
1246 hdr.stream_id = htobe64(stream->relayd_stream_id);
1247 hdr.padding_size = htobe32(padding);
1248 do {
1249 ret = write(fd, (void *) &hdr, sizeof(hdr));
1250 } while (ret < 0 && errno == EINTR);
1251 if (ret < 0) {
1252 PERROR("write metadata stream id");
1253 goto end;
1254 }
1255 DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
1256 stream->relayd_stream_id, padding);
1257
1258 end:
1259 return ret;
1260 }
1261
1262 /*
1263 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1264 * core function for writing trace buffers to either the local filesystem or
1265 * the network.
1266 *
1267 * Careful review MUST be put if any changes occur!
1268 *
1269 * Returns the number of bytes written
1270 */
1271 ssize_t lttng_consumer_on_read_subbuffer_mmap(
1272 struct lttng_consumer_local_data *ctx,
1273 struct lttng_consumer_stream *stream, unsigned long len,
1274 unsigned long padding)
1275 {
1276 unsigned long mmap_offset;
1277 ssize_t ret = 0, written = 0;
1278 off_t orig_offset = stream->out_fd_offset;
1279 /* Default is on the disk */
1280 int outfd = stream->out_fd;
1281 struct consumer_relayd_sock_pair *relayd = NULL;
1282 unsigned int relayd_hang_up = 0;
1283
1284 /* RCU lock for the relayd pointer */
1285 rcu_read_lock();
1286
1287 pthread_mutex_lock(&stream->lock);
1288
1289 /* Flag that the current stream if set for network streaming. */
1290 if (stream->net_seq_idx != -1) {
1291 relayd = consumer_find_relayd(stream->net_seq_idx);
1292 if (relayd == NULL) {
1293 goto end;
1294 }
1295 }
1296
1297 /* get the offset inside the fd to mmap */
1298 switch (consumer_data.type) {
1299 case LTTNG_CONSUMER_KERNEL:
1300 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1301 break;
1302 case LTTNG_CONSUMER32_UST:
1303 case LTTNG_CONSUMER64_UST:
1304 ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
1305 stream->buf, &mmap_offset);
1306 break;
1307 default:
1308 ERR("Unknown consumer_data type");
1309 assert(0);
1310 }
1311 if (ret != 0) {
1312 errno = -ret;
1313 PERROR("tracer ctl get_mmap_read_offset");
1314 written = ret;
1315 goto end;
1316 }
1317
1318 /* Handle stream on the relayd if the output is on the network */
1319 if (relayd) {
1320 unsigned long netlen = len;
1321
1322 /*
1323 * Lock the control socket for the complete duration of the function
1324 * since from this point on we will use the socket.
1325 */
1326 if (stream->metadata_flag) {
1327 /* Metadata requires the control socket. */
1328 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1329 netlen += sizeof(struct lttcomm_relayd_metadata_payload);
1330 }
1331
1332 ret = write_relayd_stream_header(stream, netlen, padding, relayd);
1333 if (ret >= 0) {
1334 /* Use the returned socket. */
1335 outfd = ret;
1336
1337 /* Write metadata stream id before payload */
1338 if (stream->metadata_flag) {
1339 ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
1340 if (ret < 0) {
1341 written = ret;
1342 /* Socket operation failed. We consider the relayd dead */
1343 if (ret == -EPIPE || ret == -EINVAL) {
1344 relayd_hang_up = 1;
1345 goto write_error;
1346 }
1347 goto end;
1348 }
1349 }
1350 } else {
1351 /* Socket operation failed. We consider the relayd dead */
1352 if (ret == -EPIPE || ret == -EINVAL) {
1353 relayd_hang_up = 1;
1354 goto write_error;
1355 }
1356 /* Else, use the default set before which is the filesystem. */
1357 }
1358 } else {
1359 /* No streaming, we have to set the len with the full padding */
1360 len += padding;
1361 }
1362
1363 while (len > 0) {
1364 do {
1365 ret = write(outfd, stream->mmap_base + mmap_offset, len);
1366 } while (ret < 0 && errno == EINTR);
1367 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
1368 if (ret < 0) {
1369 PERROR("Error in file write");
1370 if (written == 0) {
1371 written = ret;
1372 }
1373 /* Socket operation failed. We consider the relayd dead */
1374 if (errno == EPIPE || errno == EINVAL) {
1375 relayd_hang_up = 1;
1376 goto write_error;
1377 }
1378 goto end;
1379 } else if (ret > len) {
1380 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
1381 written += ret;
1382 goto end;
1383 } else {
1384 len -= ret;
1385 mmap_offset += ret;
1386 }
1387
1388 /* This call is useless on a socket so better save a syscall. */
1389 if (!relayd) {
1390 /* This won't block, but will start writeout asynchronously */
1391 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1392 SYNC_FILE_RANGE_WRITE);
1393 stream->out_fd_offset += ret;
1394 }
1395 written += ret;
1396 }
1397 lttng_consumer_sync_trace_file(stream, orig_offset);
1398
1399 write_error:
1400 /*
1401 * This is a special case that the relayd has closed its socket. Let's
1402 * cleanup the relayd object and all associated streams.
1403 */
1404 if (relayd && relayd_hang_up) {
1405 cleanup_relayd(relayd, ctx);
1406 }
1407
1408 end:
1409 /* Unlock only if ctrl socket used */
1410 if (relayd && stream->metadata_flag) {
1411 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1412 }
1413 pthread_mutex_unlock(&stream->lock);
1414
1415 rcu_read_unlock();
1416 return written;
1417 }
1418
1419 /*
1420 * Splice the data from the ring buffer to the tracefile.
1421 *
1422 * Returns the number of bytes spliced.
1423 */
1424 ssize_t lttng_consumer_on_read_subbuffer_splice(
1425 struct lttng_consumer_local_data *ctx,
1426 struct lttng_consumer_stream *stream, unsigned long len,
1427 unsigned long padding)
1428 {
1429 ssize_t ret = 0, written = 0, ret_splice = 0;
1430 loff_t offset = 0;
1431 off_t orig_offset = stream->out_fd_offset;
1432 int fd = stream->wait_fd;
1433 /* Default is on the disk */
1434 int outfd = stream->out_fd;
1435 struct consumer_relayd_sock_pair *relayd = NULL;
1436 int *splice_pipe;
1437 unsigned int relayd_hang_up = 0;
1438
1439 switch (consumer_data.type) {
1440 case LTTNG_CONSUMER_KERNEL:
1441 break;
1442 case LTTNG_CONSUMER32_UST:
1443 case LTTNG_CONSUMER64_UST:
1444 /* Not supported for user space tracing */
1445 return -ENOSYS;
1446 default:
1447 ERR("Unknown consumer_data type");
1448 assert(0);
1449 }
1450
1451 /* RCU lock for the relayd pointer */
1452 rcu_read_lock();
1453
1454 pthread_mutex_lock(&stream->lock);
1455
1456 /* Flag that the current stream if set for network streaming. */
1457 if (stream->net_seq_idx != -1) {
1458 relayd = consumer_find_relayd(stream->net_seq_idx);
1459 if (relayd == NULL) {
1460 goto end;
1461 }
1462 }
1463
1464 /*
1465 * Choose right pipe for splice. Metadata and trace data are handled by
1466 * different threads hence the use of two pipes in order not to race or
1467 * corrupt the written data.
1468 */
1469 if (stream->metadata_flag) {
1470 splice_pipe = ctx->consumer_splice_metadata_pipe;
1471 } else {
1472 splice_pipe = ctx->consumer_thread_pipe;
1473 }
1474
1475 /* Write metadata stream id before payload */
1476 if (relayd) {
1477 int total_len = len;
1478
1479 if (stream->metadata_flag) {
1480 /*
1481 * Lock the control socket for the complete duration of the function
1482 * since from this point on we will use the socket.
1483 */
1484 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1485
1486 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
1487 padding);
1488 if (ret < 0) {
1489 written = ret;
1490 /* Socket operation failed. We consider the relayd dead */
1491 if (ret == -EBADF) {
1492 WARN("Remote relayd disconnected. Stopping");
1493 relayd_hang_up = 1;
1494 goto write_error;
1495 }
1496 goto end;
1497 }
1498
1499 total_len += sizeof(struct lttcomm_relayd_metadata_payload);
1500 }
1501
1502 ret = write_relayd_stream_header(stream, total_len, padding, relayd);
1503 if (ret >= 0) {
1504 /* Use the returned socket. */
1505 outfd = ret;
1506 } else {
1507 /* Socket operation failed. We consider the relayd dead */
1508 if (ret == -EBADF) {
1509 WARN("Remote relayd disconnected. Stopping");
1510 relayd_hang_up = 1;
1511 goto write_error;
1512 }
1513 goto end;
1514 }
1515 } else {
1516 /* No streaming, we have to set the len with the full padding */
1517 len += padding;
1518 }
1519
1520 while (len > 0) {
1521 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1522 (unsigned long)offset, len, fd, splice_pipe[1]);
1523 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
1524 SPLICE_F_MOVE | SPLICE_F_MORE);
1525 DBG("splice chan to pipe, ret %zd", ret_splice);
1526 if (ret_splice < 0) {
1527 PERROR("Error in relay splice");
1528 if (written == 0) {
1529 written = ret_splice;
1530 }
1531 ret = errno;
1532 goto splice_error;
1533 }
1534
1535 /* Handle stream on the relayd if the output is on the network */
1536 if (relayd) {
1537 if (stream->metadata_flag) {
1538 size_t metadata_payload_size =
1539 sizeof(struct lttcomm_relayd_metadata_payload);
1540
1541 /* Update counter to fit the spliced data */
1542 ret_splice += metadata_payload_size;
1543 len += metadata_payload_size;
1544 /*
1545 * We do this so the return value can match the len passed as
1546 * argument to this function.
1547 */
1548 written -= metadata_payload_size;
1549 }
1550 }
1551
1552 /* Splice data out */
1553 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
1554 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1555 DBG("Consumer splice pipe to file, ret %zd", ret_splice);
1556 if (ret_splice < 0) {
1557 PERROR("Error in file splice");
1558 if (written == 0) {
1559 written = ret_splice;
1560 }
1561 /* Socket operation failed. We consider the relayd dead */
1562 if (errno == EBADF) {
1563 WARN("Remote relayd disconnected. Stopping");
1564 relayd_hang_up = 1;
1565 goto write_error;
1566 }
1567 ret = errno;
1568 goto splice_error;
1569 } else if (ret_splice > len) {
1570 errno = EINVAL;
1571 PERROR("Wrote more data than requested %zd (len: %lu)",
1572 ret_splice, len);
1573 written += ret_splice;
1574 ret = errno;
1575 goto splice_error;
1576 }
1577 len -= ret_splice;
1578
1579 /* This call is useless on a socket so better save a syscall. */
1580 if (!relayd) {
1581 /* This won't block, but will start writeout asynchronously */
1582 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1583 SYNC_FILE_RANGE_WRITE);
1584 stream->out_fd_offset += ret_splice;
1585 }
1586 written += ret_splice;
1587 }
1588 lttng_consumer_sync_trace_file(stream, orig_offset);
1589
1590 ret = ret_splice;
1591
1592 goto end;
1593
1594 write_error:
1595 /*
1596 * This is a special case that the relayd has closed its socket. Let's
1597 * cleanup the relayd object and all associated streams.
1598 */
1599 if (relayd && relayd_hang_up) {
1600 cleanup_relayd(relayd, ctx);
1601 /* Skip splice error so the consumer does not fail */
1602 goto end;
1603 }
1604
1605 splice_error:
1606 /* send the appropriate error description to sessiond */
1607 switch (ret) {
1608 case EINVAL:
1609 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
1610 break;
1611 case ENOMEM:
1612 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
1613 break;
1614 case ESPIPE:
1615 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
1616 break;
1617 }
1618
1619 end:
1620 if (relayd && stream->metadata_flag) {
1621 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1622 }
1623 pthread_mutex_unlock(&stream->lock);
1624
1625 rcu_read_unlock();
1626 return written;
1627 }
1628
1629 /*
1630 * Take a snapshot for a specific fd
1631 *
1632 * Returns 0 on success, < 0 on error
1633 */
1634 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1635 struct lttng_consumer_stream *stream)
1636 {
1637 switch (consumer_data.type) {
1638 case LTTNG_CONSUMER_KERNEL:
1639 return lttng_kconsumer_take_snapshot(ctx, stream);
1640 case LTTNG_CONSUMER32_UST:
1641 case LTTNG_CONSUMER64_UST:
1642 return lttng_ustconsumer_take_snapshot(ctx, stream);
1643 default:
1644 ERR("Unknown consumer_data type");
1645 assert(0);
1646 return -ENOSYS;
1647 }
1648
1649 }
1650
1651 /*
1652 * Get the produced position
1653 *
1654 * Returns 0 on success, < 0 on error
1655 */
1656 int lttng_consumer_get_produced_snapshot(
1657 struct lttng_consumer_local_data *ctx,
1658 struct lttng_consumer_stream *stream,
1659 unsigned long *pos)
1660 {
1661 switch (consumer_data.type) {
1662 case LTTNG_CONSUMER_KERNEL:
1663 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
1664 case LTTNG_CONSUMER32_UST:
1665 case LTTNG_CONSUMER64_UST:
1666 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1667 default:
1668 ERR("Unknown consumer_data type");
1669 assert(0);
1670 return -ENOSYS;
1671 }
1672 }
1673
1674 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1675 int sock, struct pollfd *consumer_sockpoll)
1676 {
1677 switch (consumer_data.type) {
1678 case LTTNG_CONSUMER_KERNEL:
1679 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1680 case LTTNG_CONSUMER32_UST:
1681 case LTTNG_CONSUMER64_UST:
1682 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1683 default:
1684 ERR("Unknown consumer_data type");
1685 assert(0);
1686 return -ENOSYS;
1687 }
1688 }
1689
1690 /*
1691 * Iterate over all streams of the hashtable and free them properly.
1692 *
1693 * WARNING: *MUST* be used with data stream only.
1694 */
1695 static void destroy_data_stream_ht(struct lttng_ht *ht)
1696 {
1697 int ret;
1698 struct lttng_ht_iter iter;
1699 struct lttng_consumer_stream *stream;
1700
1701 if (ht == NULL) {
1702 return;
1703 }
1704
1705 rcu_read_lock();
1706 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1707 ret = lttng_ht_del(ht, &iter);
1708 assert(!ret);
1709
1710 call_rcu(&stream->node.head, consumer_free_stream);
1711 }
1712 rcu_read_unlock();
1713
1714 lttng_ht_destroy(ht);
1715 }
1716
1717 /*
1718 * Iterate over all streams of the hashtable and free them properly.
1719 *
1720 * XXX: Should not be only for metadata stream or else use an other name.
1721 */
1722 static void destroy_stream_ht(struct lttng_ht *ht)
1723 {
1724 int ret;
1725 struct lttng_ht_iter iter;
1726 struct lttng_consumer_stream *stream;
1727
1728 if (ht == NULL) {
1729 return;
1730 }
1731
1732 rcu_read_lock();
1733 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1734 ret = lttng_ht_del(ht, &iter);
1735 assert(!ret);
1736
1737 call_rcu(&stream->node.head, consumer_free_stream);
1738 }
1739 rcu_read_unlock();
1740
1741 lttng_ht_destroy(ht);
1742 }
1743
1744 /*
1745 * Clean up a metadata stream and free its memory.
1746 */
1747 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
1748 struct lttng_ht *ht)
1749 {
1750 int ret;
1751 struct lttng_ht_iter iter;
1752 struct lttng_consumer_channel *free_chan = NULL;
1753 struct consumer_relayd_sock_pair *relayd;
1754
1755 assert(stream);
1756 /*
1757 * This call should NEVER receive regular stream. It must always be
1758 * metadata stream and this is crucial for data structure synchronization.
1759 */
1760 assert(stream->metadata_flag);
1761
1762 DBG3("Consumer delete metadata stream %d", stream->wait_fd);
1763
1764 if (ht == NULL) {
1765 /* Means the stream was allocated but not successfully added */
1766 goto free_stream;
1767 }
1768
1769 pthread_mutex_lock(&stream->lock);
1770
1771 pthread_mutex_lock(&consumer_data.lock);
1772 switch (consumer_data.type) {
1773 case LTTNG_CONSUMER_KERNEL:
1774 if (stream->mmap_base != NULL) {
1775 ret = munmap(stream->mmap_base, stream->mmap_len);
1776 if (ret != 0) {
1777 PERROR("munmap metadata stream");
1778 }
1779 }
1780 break;
1781 case LTTNG_CONSUMER32_UST:
1782 case LTTNG_CONSUMER64_UST:
1783 lttng_ustconsumer_del_stream(stream);
1784 break;
1785 default:
1786 ERR("Unknown consumer_data type");
1787 assert(0);
1788 goto end;
1789 }
1790
1791 rcu_read_lock();
1792 iter.iter.node = &stream->node.node;
1793 ret = lttng_ht_del(ht, &iter);
1794 assert(!ret);
1795
1796 /* Remove node session id from the consumer_data stream ht */
1797 iter.iter.node = &stream->node_session_id.node;
1798 ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
1799 assert(!ret);
1800 rcu_read_unlock();
1801
1802 if (stream->out_fd >= 0) {
1803 ret = close(stream->out_fd);
1804 if (ret) {
1805 PERROR("close");
1806 }
1807 }
1808
1809 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
1810 ret = close(stream->wait_fd);
1811 if (ret) {
1812 PERROR("close");
1813 }
1814 }
1815
1816 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
1817 ret = close(stream->shm_fd);
1818 if (ret) {
1819 PERROR("close");
1820 }
1821 }
1822
1823 /* Check and cleanup relayd */
1824 rcu_read_lock();
1825 relayd = consumer_find_relayd(stream->net_seq_idx);
1826 if (relayd != NULL) {
1827 uatomic_dec(&relayd->refcount);
1828 assert(uatomic_read(&relayd->refcount) >= 0);
1829
1830 /* Closing streams requires to lock the control socket. */
1831 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1832 ret = relayd_send_close_stream(&relayd->control_sock,
1833 stream->relayd_stream_id, stream->next_net_seq_num - 1);
1834 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1835 if (ret < 0) {
1836 DBG("Unable to close stream on the relayd. Continuing");
1837 /*
1838 * Continue here. There is nothing we can do for the relayd.
1839 * Chances are that the relayd has closed the socket so we just
1840 * continue cleaning up.
1841 */
1842 }
1843
1844 /* Both conditions are met, we destroy the relayd. */
1845 if (uatomic_read(&relayd->refcount) == 0 &&
1846 uatomic_read(&relayd->destroy_flag)) {
1847 destroy_relayd(relayd);
1848 }
1849 }
1850 rcu_read_unlock();
1851
1852 /* Atomically decrement channel refcount since other threads can use it. */
1853 uatomic_dec(&stream->chan->refcount);
1854 if (!uatomic_read(&stream->chan->refcount)
1855 && !uatomic_read(&stream->chan->nb_init_streams)) {
1856 /* Go for channel deletion! */
1857 free_chan = stream->chan;
1858 }
1859
1860 end:
1861 pthread_mutex_unlock(&consumer_data.lock);
1862 pthread_mutex_unlock(&stream->lock);
1863
1864 if (free_chan) {
1865 consumer_del_channel(free_chan);
1866 }
1867
1868 free_stream:
1869 call_rcu(&stream->node.head, consumer_free_stream);
1870 }
1871
1872 /*
1873 * Action done with the metadata stream when adding it to the consumer internal
1874 * data structures to handle it.
1875 */
1876 static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
1877 struct lttng_ht *ht)
1878 {
1879 int ret = 0;
1880 struct consumer_relayd_sock_pair *relayd;
1881
1882 assert(stream);
1883 assert(ht);
1884
1885 DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
1886
1887 pthread_mutex_lock(&consumer_data.lock);
1888
1889 /*
1890 * From here, refcounts are updated so be _careful_ when returning an error
1891 * after this point.
1892 */
1893
1894 rcu_read_lock();
1895 /* Find relayd and, if one is found, increment refcount. */
1896 relayd = consumer_find_relayd(stream->net_seq_idx);
1897 if (relayd != NULL) {
1898 uatomic_inc(&relayd->refcount);
1899 }
1900
1901 /* Update channel refcount once added without error(s). */
1902 uatomic_inc(&stream->chan->refcount);
1903
1904 /*
1905 * When nb_init_streams reaches 0, we don't need to trigger any action in
1906 * terms of destroying the associated channel, because the action that
1907 * causes the count to become 0 also causes a stream to be added. The
1908 * channel deletion will thus be triggered by the following removal of this
1909 * stream.
1910 */
1911 if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
1912 uatomic_dec(&stream->chan->nb_init_streams);
1913 }
1914
1915 /* Steal stream identifier to avoid having streams with the same key */
1916 consumer_steal_stream_key(stream->key, ht);
1917
1918 lttng_ht_add_unique_ulong(ht, &stream->node);
1919
1920 /*
1921 * Add stream to the stream_list_ht of the consumer data. No need to steal
1922 * the key since the HT does not use it and we allow to add redundant keys
1923 * into this table.
1924 */
1925 lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
1926
1927 rcu_read_unlock();
1928
1929 pthread_mutex_unlock(&consumer_data.lock);
1930 return ret;
1931 }
1932
1933 /*
1934 * Delete data stream that are flagged for deletion (endpoint_status).
1935 */
1936 static void validate_endpoint_status_data_stream(void)
1937 {
1938 struct lttng_ht_iter iter;
1939 struct lttng_consumer_stream *stream;
1940
1941 DBG("Consumer delete flagged data stream");
1942
1943 rcu_read_lock();
1944 cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
1945 /* Validate delete flag of the stream */
1946 if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
1947 continue;
1948 }
1949 /* Delete it right now */
1950 consumer_del_stream(stream, data_ht);
1951 }
1952 rcu_read_unlock();
1953 }
1954
1955 /*
1956 * Delete metadata stream that are flagged for deletion (endpoint_status).
1957 */
1958 static void validate_endpoint_status_metadata_stream(
1959 struct lttng_poll_event *pollset)
1960 {
1961 struct lttng_ht_iter iter;
1962 struct lttng_consumer_stream *stream;
1963
1964 DBG("Consumer delete flagged metadata stream");
1965
1966 assert(pollset);
1967
1968 rcu_read_lock();
1969 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
1970 /* Validate delete flag of the stream */
1971 if (!stream->endpoint_status) {
1972 continue;
1973 }
1974 /*
1975 * Remove from pollset so the metadata thread can continue without
1976 * blocking on a deleted stream.
1977 */
1978 lttng_poll_del(pollset, stream->wait_fd);
1979
1980 /* Delete it right now */
1981 consumer_del_metadata_stream(stream, metadata_ht);
1982 }
1983 rcu_read_unlock();
1984 }
1985
1986 /*
1987 * Thread polls on metadata file descriptor and write them on disk or on the
1988 * network.
1989 */
1990 void *consumer_thread_metadata_poll(void *data)
1991 {
1992 int ret, i, pollfd;
1993 uint32_t revents, nb_fd;
1994 struct lttng_consumer_stream *stream = NULL;
1995 struct lttng_ht_iter iter;
1996 struct lttng_ht_node_ulong *node;
1997 struct lttng_poll_event events;
1998 struct lttng_consumer_local_data *ctx = data;
1999 ssize_t len;
2000
2001 rcu_register_thread();
2002
2003 DBG("Thread metadata poll started");
2004
2005 /* Size is set to 1 for the consumer_metadata pipe */
2006 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
2007 if (ret < 0) {
2008 ERR("Poll set creation failed");
2009 goto end;
2010 }
2011
2012 ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
2013 if (ret < 0) {
2014 goto end;
2015 }
2016
2017 /* Main loop */
2018 DBG("Metadata main loop started");
2019
2020 while (1) {
2021 lttng_poll_reset(&events);
2022
2023 nb_fd = LTTNG_POLL_GETNB(&events);
2024
2025 /* Only the metadata pipe is set */
2026 if (nb_fd == 0 && consumer_quit == 1) {
2027 goto end;
2028 }
2029
2030 restart:
2031 DBG("Metadata poll wait with %d fd(s)", nb_fd);
2032 ret = lttng_poll_wait(&events, -1);
2033 DBG("Metadata event catched in thread");
2034 if (ret < 0) {
2035 if (errno == EINTR) {
2036 ERR("Poll EINTR catched");
2037 goto restart;
2038 }
2039 goto error;
2040 }
2041
2042 /* From here, the event is a metadata wait fd */
2043 for (i = 0; i < nb_fd; i++) {
2044 revents = LTTNG_POLL_GETEV(&events, i);
2045 pollfd = LTTNG_POLL_GETFD(&events, i);
2046
2047 /* Just don't waste time if no returned events for the fd */
2048 if (!revents) {
2049 continue;
2050 }
2051
2052 if (pollfd == ctx->consumer_metadata_pipe[0]) {
2053 if (revents & (LPOLLERR | LPOLLHUP )) {
2054 DBG("Metadata thread pipe hung up");
2055 /*
2056 * Remove the pipe from the poll set and continue the loop
2057 * since their might be data to consume.
2058 */
2059 lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
2060 close(ctx->consumer_metadata_pipe[0]);
2061 continue;
2062 } else if (revents & LPOLLIN) {
2063 do {
2064 /* Get the stream pointer received */
2065 ret = read(pollfd, &stream, sizeof(stream));
2066 } while (ret < 0 && errno == EINTR);
2067 if (ret < 0 ||
2068 ret < sizeof(struct lttng_consumer_stream *)) {
2069 PERROR("read metadata stream");
2070 /*
2071 * Let's continue here and hope we can still work
2072 * without stopping the consumer. XXX: Should we?
2073 */
2074 continue;
2075 }
2076
2077 /* A NULL stream means that the state has changed. */
2078 if (stream == NULL) {
2079 /* Check for deleted streams. */
2080 validate_endpoint_status_metadata_stream(&events);
2081 continue;
2082 }
2083
2084 DBG("Adding metadata stream %d to poll set",
2085 stream->wait_fd);
2086
2087 ret = consumer_add_metadata_stream(stream, metadata_ht);
2088 if (ret) {
2089 ERR("Unable to add metadata stream");
2090 /* Stream was not setup properly. Continuing. */
2091 consumer_del_metadata_stream(stream, NULL);
2092 continue;
2093 }
2094
2095 /* Add metadata stream to the global poll events list */
2096 lttng_poll_add(&events, stream->wait_fd,
2097 LPOLLIN | LPOLLPRI);
2098 }
2099
2100 /* Handle other stream */
2101 continue;
2102 }
2103
2104 rcu_read_lock();
2105 lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
2106 &iter);
2107 node = lttng_ht_iter_get_node_ulong(&iter);
2108 assert(node);
2109
2110 stream = caa_container_of(node, struct lttng_consumer_stream,
2111 node);
2112
2113 /* Check for error event */
2114 if (revents & (LPOLLERR | LPOLLHUP)) {
2115 DBG("Metadata fd %d is hup|err.", pollfd);
2116 if (!stream->hangup_flush_done
2117 && (consumer_data.type == LTTNG_CONSUMER32_UST
2118 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2119 DBG("Attempting to flush and consume the UST buffers");
2120 lttng_ustconsumer_on_stream_hangup(stream);
2121
2122 /* We just flushed the stream now read it. */
2123 do {
2124 len = ctx->on_buffer_ready(stream, ctx);
2125 /*
2126 * We don't check the return value here since if we get
2127 * a negative len, it means an error occured thus we
2128 * simply remove it from the poll set and free the
2129 * stream.
2130 */
2131 } while (len > 0);
2132 }
2133
2134 lttng_poll_del(&events, stream->wait_fd);
2135 /*
2136 * This call update the channel states, closes file descriptors
2137 * and securely free the stream.
2138 */
2139 consumer_del_metadata_stream(stream, metadata_ht);
2140 } else if (revents & (LPOLLIN | LPOLLPRI)) {
2141 /* Get the data out of the metadata file descriptor */
2142 DBG("Metadata available on fd %d", pollfd);
2143 assert(stream->wait_fd == pollfd);
2144
2145 len = ctx->on_buffer_ready(stream, ctx);
2146 /* It's ok to have an unavailable sub-buffer */
2147 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2148 /* Clean up stream from consumer and free it. */
2149 lttng_poll_del(&events, stream->wait_fd);
2150 consumer_del_metadata_stream(stream, metadata_ht);
2151 } else if (len > 0) {
2152 stream->data_read = 1;
2153 }
2154 }
2155
2156 /* Release RCU lock for the stream looked up */
2157 rcu_read_unlock();
2158 }
2159 }
2160
2161 error:
2162 end:
2163 DBG("Metadata poll thread exiting");
2164 lttng_poll_clean(&events);
2165
2166 if (metadata_ht) {
2167 destroy_stream_ht(metadata_ht);
2168 }
2169
2170 rcu_unregister_thread();
2171 return NULL;
2172 }
2173
2174 /*
2175 * This thread polls the fds in the set to consume the data and write
2176 * it to tracefile if necessary.
2177 */
2178 void *consumer_thread_data_poll(void *data)
2179 {
2180 int num_rdy, num_hup, high_prio, ret, i;
2181 struct pollfd *pollfd = NULL;
2182 /* local view of the streams */
2183 struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
2184 /* local view of consumer_data.fds_count */
2185 int nb_fd = 0;
2186 struct lttng_consumer_local_data *ctx = data;
2187 ssize_t len;
2188
2189 rcu_register_thread();
2190
2191 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2192 if (data_ht == NULL) {
2193 goto end;
2194 }
2195
2196 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
2197
2198 while (1) {
2199 high_prio = 0;
2200 num_hup = 0;
2201
2202 /*
2203 * the fds set has been updated, we need to update our
2204 * local array as well
2205 */
2206 pthread_mutex_lock(&consumer_data.lock);
2207 if (consumer_data.need_update) {
2208 if (pollfd != NULL) {
2209 free(pollfd);
2210 pollfd = NULL;
2211 }
2212 if (local_stream != NULL) {
2213 free(local_stream);
2214 local_stream = NULL;
2215 }
2216
2217 /* allocate for all fds + 1 for the consumer_data_pipe */
2218 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
2219 if (pollfd == NULL) {
2220 PERROR("pollfd malloc");
2221 pthread_mutex_unlock(&consumer_data.lock);
2222 goto end;
2223 }
2224
2225 /* allocate for all fds + 1 for the consumer_data_pipe */
2226 local_stream = zmalloc((consumer_data.stream_count + 1) *
2227 sizeof(struct lttng_consumer_stream));
2228 if (local_stream == NULL) {
2229 PERROR("local_stream malloc");
2230 pthread_mutex_unlock(&consumer_data.lock);
2231 goto end;
2232 }
2233 ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
2234 data_ht);
2235 if (ret < 0) {
2236 ERR("Error in allocating pollfd or local_outfds");
2237 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2238 pthread_mutex_unlock(&consumer_data.lock);
2239 goto end;
2240 }
2241 nb_fd = ret;
2242 consumer_data.need_update = 0;
2243 }
2244 pthread_mutex_unlock(&consumer_data.lock);
2245
2246 /* No FDs and consumer_quit, consumer_cleanup the thread */
2247 if (nb_fd == 0 && consumer_quit == 1) {
2248 goto end;
2249 }
2250 /* poll on the array of fds */
2251 restart:
2252 DBG("polling on %d fd", nb_fd + 1);
2253 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
2254 DBG("poll num_rdy : %d", num_rdy);
2255 if (num_rdy == -1) {
2256 /*
2257 * Restart interrupted system call.
2258 */
2259 if (errno == EINTR) {
2260 goto restart;
2261 }
2262 PERROR("Poll error");
2263 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
2264 goto end;
2265 } else if (num_rdy == 0) {
2266 DBG("Polling thread timed out");
2267 goto end;
2268 }
2269
2270 /*
2271 * If the consumer_data_pipe triggered poll go directly to the
2272 * beginning of the loop to update the array. We want to prioritize
2273 * array update over low-priority reads.
2274 */
2275 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
2276 size_t pipe_readlen;
2277
2278 DBG("consumer_data_pipe wake up");
2279 /* Consume 1 byte of pipe data */
2280 do {
2281 pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
2282 sizeof(new_stream));
2283 } while (pipe_readlen == -1 && errno == EINTR);
2284
2285 /*
2286 * If the stream is NULL, just ignore it. It's also possible that
2287 * the sessiond poll thread changed the consumer_quit state and is
2288 * waking us up to test it.
2289 */
2290 if (new_stream == NULL) {
2291 validate_endpoint_status_data_stream();
2292 continue;
2293 }
2294
2295 ret = consumer_add_stream(new_stream, data_ht);
2296 if (ret) {
2297 ERR("Consumer add stream %d failed. Continuing",
2298 new_stream->key);
2299 /*
2300 * At this point, if the add_stream fails, it is not in the
2301 * hash table thus passing the NULL value here.
2302 */
2303 consumer_del_stream(new_stream, NULL);
2304 }
2305
2306 /* Continue to update the local streams and handle prio ones */
2307 continue;
2308 }
2309
2310 /* Take care of high priority channels first. */
2311 for (i = 0; i < nb_fd; i++) {
2312 if (local_stream[i] == NULL) {
2313 continue;
2314 }
2315 if (pollfd[i].revents & POLLPRI) {
2316 DBG("Urgent read on fd %d", pollfd[i].fd);
2317 high_prio = 1;
2318 len = ctx->on_buffer_ready(local_stream[i], ctx);
2319 /* it's ok to have an unavailable sub-buffer */
2320 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2321 /* Clean the stream and free it. */
2322 consumer_del_stream(local_stream[i], data_ht);
2323 local_stream[i] = NULL;
2324 } else if (len > 0) {
2325 local_stream[i]->data_read = 1;
2326 }
2327 }
2328 }
2329
2330 /*
2331 * If we read high prio channel in this loop, try again
2332 * for more high prio data.
2333 */
2334 if (high_prio) {
2335 continue;
2336 }
2337
2338 /* Take care of low priority channels. */
2339 for (i = 0; i < nb_fd; i++) {
2340 if (local_stream[i] == NULL) {
2341 continue;
2342 }
2343 if ((pollfd[i].revents & POLLIN) ||
2344 local_stream[i]->hangup_flush_done) {
2345 DBG("Normal read on fd %d", pollfd[i].fd);
2346 len = ctx->on_buffer_ready(local_stream[i], ctx);
2347 /* it's ok to have an unavailable sub-buffer */
2348 if (len < 0 && len != -EAGAIN && len != -ENODATA) {
2349 /* Clean the stream and free it. */
2350 consumer_del_stream(local_stream[i], data_ht);
2351 local_stream[i] = NULL;
2352 } else if (len > 0) {
2353 local_stream[i]->data_read = 1;
2354 }
2355 }
2356 }
2357
2358 /* Handle hangup and errors */
2359 for (i = 0; i < nb_fd; i++) {
2360 if (local_stream[i] == NULL) {
2361 continue;
2362 }
2363 if (!local_stream[i]->hangup_flush_done
2364 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
2365 && (consumer_data.type == LTTNG_CONSUMER32_UST
2366 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
2367 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2368 pollfd[i].fd);
2369 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
2370 /* Attempt read again, for the data we just flushed. */
2371 local_stream[i]->data_read = 1;
2372 }
2373 /*
2374 * If the poll flag is HUP/ERR/NVAL and we have
2375 * read no data in this pass, we can remove the
2376 * stream from its hash table.
2377 */
2378 if ((pollfd[i].revents & POLLHUP)) {
2379 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
2380 if (!local_stream[i]->data_read) {
2381 consumer_del_stream(local_stream[i], data_ht);
2382 local_stream[i] = NULL;
2383 num_hup++;
2384 }
2385 } else if (pollfd[i].revents & POLLERR) {
2386 ERR("Error returned in polling fd %d.", pollfd[i].fd);
2387 if (!local_stream[i]->data_read) {
2388 consumer_del_stream(local_stream[i], data_ht);
2389 local_stream[i] = NULL;
2390 num_hup++;
2391 }
2392 } else if (pollfd[i].revents & POLLNVAL) {
2393 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
2394 if (!local_stream[i]->data_read) {
2395 consumer_del_stream(local_stream[i], data_ht);
2396 local_stream[i] = NULL;
2397 num_hup++;
2398 }
2399 }
2400 if (local_stream[i] != NULL) {
2401 local_stream[i]->data_read = 0;
2402 }
2403 }
2404 }
2405 end:
2406 DBG("polling thread exiting");
2407 if (pollfd != NULL) {
2408 free(pollfd);
2409 pollfd = NULL;
2410 }
2411 if (local_stream != NULL) {
2412 free(local_stream);
2413 local_stream = NULL;
2414 }
2415
2416 /*
2417 * Close the write side of the pipe so epoll_wait() in
2418 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2419 * read side of the pipe. If we close them both, epoll_wait strangely does
2420 * not return and could create a endless wait period if the pipe is the
2421 * only tracked fd in the poll set. The thread will take care of closing
2422 * the read side.
2423 */
2424 close(ctx->consumer_metadata_pipe[1]);
2425
2426 if (data_ht) {
2427 destroy_data_stream_ht(data_ht);
2428 }
2429
2430 rcu_unregister_thread();
2431 return NULL;
2432 }
2433
2434 /*
2435 * This thread listens on the consumerd socket and receives the file
2436 * descriptors from the session daemon.
2437 */
2438 void *consumer_thread_sessiond_poll(void *data)
2439 {
2440 int sock, client_socket, ret;
2441 /*
2442 * structure to poll for incoming data on communication socket avoids
2443 * making blocking sockets.
2444 */
2445 struct pollfd consumer_sockpoll[2];
2446 struct lttng_consumer_local_data *ctx = data;
2447
2448 rcu_register_thread();
2449
2450 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
2451 unlink(ctx->consumer_command_sock_path);
2452 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
2453 if (client_socket < 0) {
2454 ERR("Cannot create command socket");
2455 goto end;
2456 }
2457
2458 ret = lttcomm_listen_unix_sock(client_socket);
2459 if (ret < 0) {
2460 goto end;
2461 }
2462
2463 DBG("Sending ready command to lttng-sessiond");
2464 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
2465 /* return < 0 on error, but == 0 is not fatal */
2466 if (ret < 0) {
2467 ERR("Error sending ready command to lttng-sessiond");
2468 goto end;
2469 }
2470
2471 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
2472 if (ret < 0) {
2473 PERROR("fcntl O_NONBLOCK");
2474 goto end;
2475 }
2476
2477 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2478 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
2479 consumer_sockpoll[0].events = POLLIN | POLLPRI;
2480 consumer_sockpoll[1].fd = client_socket;
2481 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2482
2483 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2484 goto end;
2485 }
2486 DBG("Connection on client_socket");
2487
2488 /* Blocking call, waiting for transmission */
2489 sock = lttcomm_accept_unix_sock(client_socket);
2490 if (sock <= 0) {
2491 WARN("On accept");
2492 goto end;
2493 }
2494 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
2495 if (ret < 0) {
2496 PERROR("fcntl O_NONBLOCK");
2497 goto end;
2498 }
2499
2500 /* update the polling structure to poll on the established socket */
2501 consumer_sockpoll[1].fd = sock;
2502 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2503
2504 while (1) {
2505 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2506 goto end;
2507 }
2508 DBG("Incoming command on sock");
2509 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
2510 if (ret == -ENOENT) {
2511 DBG("Received STOP command");
2512 goto end;
2513 }
2514 if (ret <= 0) {
2515 /*
2516 * This could simply be a session daemon quitting. Don't output
2517 * ERR() here.
2518 */
2519 DBG("Communication interrupted on command socket");
2520 goto end;
2521 }
2522 if (consumer_quit) {
2523 DBG("consumer_thread_receive_fds received quit from signal");
2524 goto end;
2525 }
2526 DBG("received fds on sock");
2527 }
2528 end:
2529 DBG("consumer_thread_receive_fds exiting");
2530
2531 /*
2532 * when all fds have hung up, the polling thread
2533 * can exit cleanly
2534 */
2535 consumer_quit = 1;
2536
2537 /*
2538 * 2s of grace period, if no polling events occur during
2539 * this period, the polling thread will exit even if there
2540 * are still open FDs (should not happen, but safety mechanism).
2541 */
2542 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
2543
2544 /*
2545 * Notify the data poll thread to poll back again and test the
2546 * consumer_quit state that we just set so to quit gracefully.
2547 */
2548 notify_thread_pipe(ctx->consumer_data_pipe[1]);
2549
2550 rcu_unregister_thread();
2551 return NULL;
2552 }
2553
2554 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
2555 struct lttng_consumer_local_data *ctx)
2556 {
2557 switch (consumer_data.type) {
2558 case LTTNG_CONSUMER_KERNEL:
2559 return lttng_kconsumer_read_subbuffer(stream, ctx);
2560 case LTTNG_CONSUMER32_UST:
2561 case LTTNG_CONSUMER64_UST:
2562 return lttng_ustconsumer_read_subbuffer(stream, ctx);
2563 default:
2564 ERR("Unknown consumer_data type");
2565 assert(0);
2566 return -ENOSYS;
2567 }
2568 }
2569
2570 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
2571 {
2572 switch (consumer_data.type) {
2573 case LTTNG_CONSUMER_KERNEL:
2574 return lttng_kconsumer_on_recv_stream(stream);
2575 case LTTNG_CONSUMER32_UST:
2576 case LTTNG_CONSUMER64_UST:
2577 return lttng_ustconsumer_on_recv_stream(stream);
2578 default:
2579 ERR("Unknown consumer_data type");
2580 assert(0);
2581 return -ENOSYS;
2582 }
2583 }
2584
2585 /*
2586 * Allocate and set consumer data hash tables.
2587 */
2588 void lttng_consumer_init(void)
2589 {
2590 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2591 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2592 consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2593
2594 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2595 assert(metadata_ht);
2596 data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2597 assert(data_ht);
2598 }
2599
2600 /*
2601 * Process the ADD_RELAYD command receive by a consumer.
2602 *
2603 * This will create a relayd socket pair and add it to the relayd hash table.
2604 * The caller MUST acquire a RCU read side lock before calling it.
2605 */
2606 int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
2607 struct lttng_consumer_local_data *ctx, int sock,
2608 struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
2609 {
2610 int fd, ret = -1;
2611 struct consumer_relayd_sock_pair *relayd;
2612
2613 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
2614
2615 /* Get relayd reference if exists. */
2616 relayd = consumer_find_relayd(net_seq_idx);
2617 if (relayd == NULL) {
2618 /* Not found. Allocate one. */
2619 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
2620 if (relayd == NULL) {
2621 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
2622 goto error;
2623 }
2624 }
2625
2626 /* Poll on consumer socket. */
2627 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2628 ret = -EINTR;
2629 goto error;
2630 }
2631
2632 /* Get relayd socket from session daemon */
2633 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
2634 if (ret != sizeof(fd)) {
2635 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
2636 ret = -1;
2637 goto error;
2638 }
2639
2640 /* Copy socket information and received FD */
2641 switch (sock_type) {
2642 case LTTNG_STREAM_CONTROL:
2643 /* Copy received lttcomm socket */
2644 lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
2645 ret = lttcomm_create_sock(&relayd->control_sock);
2646 if (ret < 0) {
2647 goto error;
2648 }
2649
2650 /* Close the created socket fd which is useless */
2651 close(relayd->control_sock.fd);
2652
2653 /* Assign new file descriptor */
2654 relayd->control_sock.fd = fd;
2655 break;
2656 case LTTNG_STREAM_DATA:
2657 /* Copy received lttcomm socket */
2658 lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
2659 ret = lttcomm_create_sock(&relayd->data_sock);
2660 if (ret < 0) {
2661 goto error;
2662 }
2663
2664 /* Close the created socket fd which is useless */
2665 close(relayd->data_sock.fd);
2666
2667 /* Assign new file descriptor */
2668 relayd->data_sock.fd = fd;
2669 break;
2670 default:
2671 ERR("Unknown relayd socket type (%d)", sock_type);
2672 goto error;
2673 }
2674
2675 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
2676 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
2677 relayd->net_seq_idx, fd);
2678
2679 /*
2680 * Add relayd socket pair to consumer data hashtable. If object already
2681 * exists or on error, the function gracefully returns.
2682 */
2683 add_relayd(relayd);
2684
2685 /* All good! */
2686 ret = 0;
2687
2688 error:
2689 return ret;
2690 }
2691
2692 /*
2693 * Try to lock the stream mutex.
2694 *
2695 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
2696 */
2697 static int stream_try_lock(struct lttng_consumer_stream *stream)
2698 {
2699 int ret;
2700
2701 assert(stream);
2702
2703 /*
2704 * Try to lock the stream mutex. On failure, we know that the stream is
2705 * being used else where hence there is data still being extracted.
2706 */
2707 ret = pthread_mutex_trylock(&stream->lock);
2708 if (ret) {
2709 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
2710 ret = 0;
2711 goto end;
2712 }
2713
2714 ret = 1;
2715
2716 end:
2717 return ret;
2718 }
2719
2720 /*
2721 * Check if for a given session id there is still data needed to be extract
2722 * from the buffers.
2723 *
2724 * Return 1 if data is pending or else 0 meaning ready to be read.
2725 */
2726 int consumer_data_pending(uint64_t id)
2727 {
2728 int ret;
2729 struct lttng_ht_iter iter;
2730 struct lttng_ht *ht;
2731 struct lttng_consumer_stream *stream;
2732 struct consumer_relayd_sock_pair *relayd;
2733 int (*data_pending)(struct lttng_consumer_stream *);
2734
2735 DBG("Consumer data pending command on session id %" PRIu64, id);
2736
2737 rcu_read_lock();
2738 pthread_mutex_lock(&consumer_data.lock);
2739
2740 switch (consumer_data.type) {
2741 case LTTNG_CONSUMER_KERNEL:
2742 data_pending = lttng_kconsumer_data_pending;
2743 break;
2744 case LTTNG_CONSUMER32_UST:
2745 case LTTNG_CONSUMER64_UST:
2746 data_pending = lttng_ustconsumer_data_pending;
2747 break;
2748 default:
2749 ERR("Unknown consumer data type");
2750 assert(0);
2751 }
2752
2753 /* Ease our life a bit */
2754 ht = consumer_data.stream_list_ht;
2755
2756 cds_lfht_for_each_entry_duplicate(ht->ht,
2757 ht->hash_fct((void *)((unsigned long) id), 0x42UL),
2758 ht->match_fct, (void *)((unsigned long) id),
2759 &iter.iter, stream, node_session_id.node) {
2760 /* If this call fails, the stream is being used hence data pending. */
2761 ret = stream_try_lock(stream);
2762 if (!ret) {
2763 goto data_not_pending;
2764 }
2765
2766 /*
2767 * A removed node from the hash table indicates that the stream has
2768 * been deleted thus having a guarantee that the buffers are closed
2769 * on the consumer side. However, data can still be transmitted
2770 * over the network so don't skip the relayd check.
2771 */
2772 ret = cds_lfht_is_node_deleted(&stream->node.node);
2773 if (!ret) {
2774 /* Check the stream if there is data in the buffers. */
2775 ret = data_pending(stream);
2776 if (ret == 1) {
2777 pthread_mutex_unlock(&stream->lock);
2778 goto data_not_pending;
2779 }
2780 }
2781
2782 /* Relayd check */
2783 if (stream->net_seq_idx != -1) {
2784 relayd = consumer_find_relayd(stream->net_seq_idx);
2785 if (!relayd) {
2786 /*
2787 * At this point, if the relayd object is not available for the
2788 * given stream, it is because the relayd is being cleaned up
2789 * so every stream associated with it (for a session id value)
2790 * are or will be marked for deletion hence no data pending.
2791 */
2792 pthread_mutex_unlock(&stream->lock);
2793 goto data_not_pending;
2794 }
2795
2796 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
2797 if (stream->metadata_flag) {
2798 ret = relayd_quiescent_control(&relayd->control_sock);
2799 } else {
2800 ret = relayd_data_pending(&relayd->control_sock,
2801 stream->relayd_stream_id, stream->next_net_seq_num);
2802 }
2803 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
2804 if (ret == 1) {
2805 pthread_mutex_unlock(&stream->lock);
2806 goto data_not_pending;
2807 }
2808 }
2809 pthread_mutex_unlock(&stream->lock);
2810 }
2811
2812 /*
2813 * Finding _no_ node in the hash table means that the stream(s) have been
2814 * removed thus data is guaranteed to be available for analysis from the
2815 * trace files. This is *only* true for local consumer and not network
2816 * streaming.
2817 */
2818
2819 /* Data is available to be read by a viewer. */
2820 pthread_mutex_unlock(&consumer_data.lock);
2821 rcu_read_unlock();
2822 return 0;
2823
2824 data_not_pending:
2825 /* Data is still being extracted from buffers. */
2826 pthread_mutex_unlock(&consumer_data.lock);
2827 rcu_read_unlock();
2828 return 1;
2829 }
This page took 0.158073 seconds and 4 git commands to generate.