Add new thread in consumer for metadata handling
[lttng-tools.git] / src / common / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30 #include <inttypes.h>
31
32 #include <common/common.h>
33 #include <common/utils.h>
34 #include <common/compat/poll.h>
35 #include <common/kernel-ctl/kernel-ctl.h>
36 #include <common/sessiond-comm/relayd.h>
37 #include <common/sessiond-comm/sessiond-comm.h>
38 #include <common/kernel-consumer/kernel-consumer.h>
39 #include <common/relayd/relayd.h>
40 #include <common/ust-consumer/ust-consumer.h>
41
42 #include "consumer.h"
43
44 struct lttng_consumer_global_data consumer_data = {
45 .stream_count = 0,
46 .need_update = 1,
47 .type = LTTNG_CONSUMER_UNKNOWN,
48 };
49
50 /* timeout parameter, to control the polling thread grace period. */
51 int consumer_poll_timeout = -1;
52
53 /*
54 * Flag to inform the polling thread to quit when all fd hung up. Updated by
55 * the consumer_thread_receive_fds when it notices that all fds has hung up.
56 * Also updated by the signal handler (consumer_should_exit()). Read by the
57 * polling threads.
58 */
59 volatile int consumer_quit = 0;
60
61 /*
62 * Find a stream. The consumer_data.lock must be locked during this
63 * call.
64 */
65 static struct lttng_consumer_stream *consumer_find_stream(int key)
66 {
67 struct lttng_ht_iter iter;
68 struct lttng_ht_node_ulong *node;
69 struct lttng_consumer_stream *stream = NULL;
70
71 /* Negative keys are lookup failures */
72 if (key < 0)
73 return NULL;
74
75 rcu_read_lock();
76
77 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
78 &iter);
79 node = lttng_ht_iter_get_node_ulong(&iter);
80 if (node != NULL) {
81 stream = caa_container_of(node, struct lttng_consumer_stream, node);
82 }
83
84 rcu_read_unlock();
85
86 return stream;
87 }
88
89 static void consumer_steal_stream_key(int key)
90 {
91 struct lttng_consumer_stream *stream;
92
93 rcu_read_lock();
94 stream = consumer_find_stream(key);
95 if (stream) {
96 stream->key = -1;
97 /*
98 * We don't want the lookup to match, but we still need
99 * to iterate on this stream when iterating over the hash table. Just
100 * change the node key.
101 */
102 stream->node.key = -1;
103 }
104 rcu_read_unlock();
105 }
106
107 static struct lttng_consumer_channel *consumer_find_channel(int key)
108 {
109 struct lttng_ht_iter iter;
110 struct lttng_ht_node_ulong *node;
111 struct lttng_consumer_channel *channel = NULL;
112
113 /* Negative keys are lookup failures */
114 if (key < 0)
115 return NULL;
116
117 rcu_read_lock();
118
119 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
120 &iter);
121 node = lttng_ht_iter_get_node_ulong(&iter);
122 if (node != NULL) {
123 channel = caa_container_of(node, struct lttng_consumer_channel, node);
124 }
125
126 rcu_read_unlock();
127
128 return channel;
129 }
130
131 static void consumer_steal_channel_key(int key)
132 {
133 struct lttng_consumer_channel *channel;
134
135 rcu_read_lock();
136 channel = consumer_find_channel(key);
137 if (channel) {
138 channel->key = -1;
139 /*
140 * We don't want the lookup to match, but we still need
141 * to iterate on this channel when iterating over the hash table. Just
142 * change the node key.
143 */
144 channel->node.key = -1;
145 }
146 rcu_read_unlock();
147 }
148
149 static
150 void consumer_free_stream(struct rcu_head *head)
151 {
152 struct lttng_ht_node_ulong *node =
153 caa_container_of(head, struct lttng_ht_node_ulong, head);
154 struct lttng_consumer_stream *stream =
155 caa_container_of(node, struct lttng_consumer_stream, node);
156
157 free(stream);
158 }
159
160 /*
161 * RCU protected relayd socket pair free.
162 */
163 static void consumer_rcu_free_relayd(struct rcu_head *head)
164 {
165 struct lttng_ht_node_ulong *node =
166 caa_container_of(head, struct lttng_ht_node_ulong, head);
167 struct consumer_relayd_sock_pair *relayd =
168 caa_container_of(node, struct consumer_relayd_sock_pair, node);
169
170 free(relayd);
171 }
172
173 /*
174 * Destroy and free relayd socket pair object.
175 *
176 * This function MUST be called with the consumer_data lock acquired.
177 */
178 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
179 {
180 int ret;
181 struct lttng_ht_iter iter;
182
183 if (relayd == NULL) {
184 return;
185 }
186
187 DBG("Consumer destroy and close relayd socket pair");
188
189 iter.iter.node = &relayd->node.node;
190 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
191 if (ret != 0) {
192 /* We assume the relayd was already destroyed */
193 return;
194 }
195
196 /* Close all sockets */
197 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
198 (void) relayd_close(&relayd->control_sock);
199 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
200 (void) relayd_close(&relayd->data_sock);
201
202 /* RCU free() call */
203 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
204 }
205
206 /*
207 * Flag a relayd socket pair for destruction. Destroy it if the refcount
208 * reaches zero.
209 *
210 * RCU read side lock MUST be aquired before calling this function.
211 */
212 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
213 {
214 assert(relayd);
215
216 /* Set destroy flag for this object */
217 uatomic_set(&relayd->destroy_flag, 1);
218
219 /* Destroy the relayd if refcount is 0 */
220 if (uatomic_read(&relayd->refcount) == 0) {
221 consumer_destroy_relayd(relayd);
222 }
223 }
224
225 /*
226 * Remove a stream from the global list protected by a mutex. This
227 * function is also responsible for freeing its data structures.
228 */
229 void consumer_del_stream(struct lttng_consumer_stream *stream)
230 {
231 int ret;
232 struct lttng_ht_iter iter;
233 struct lttng_consumer_channel *free_chan = NULL;
234 struct consumer_relayd_sock_pair *relayd;
235
236 assert(stream);
237
238 pthread_mutex_lock(&consumer_data.lock);
239
240 switch (consumer_data.type) {
241 case LTTNG_CONSUMER_KERNEL:
242 if (stream->mmap_base != NULL) {
243 ret = munmap(stream->mmap_base, stream->mmap_len);
244 if (ret != 0) {
245 perror("munmap");
246 }
247 }
248 break;
249 case LTTNG_CONSUMER32_UST:
250 case LTTNG_CONSUMER64_UST:
251 lttng_ustconsumer_del_stream(stream);
252 break;
253 default:
254 ERR("Unknown consumer_data type");
255 assert(0);
256 goto end;
257 }
258
259 rcu_read_lock();
260 iter.iter.node = &stream->node.node;
261 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
262 assert(!ret);
263
264 rcu_read_unlock();
265
266 if (consumer_data.stream_count <= 0) {
267 goto end;
268 }
269 consumer_data.stream_count--;
270 if (!stream) {
271 goto end;
272 }
273 if (stream->out_fd >= 0) {
274 ret = close(stream->out_fd);
275 if (ret) {
276 PERROR("close");
277 }
278 }
279 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
280 ret = close(stream->wait_fd);
281 if (ret) {
282 PERROR("close");
283 }
284 }
285 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
286 ret = close(stream->shm_fd);
287 if (ret) {
288 PERROR("close");
289 }
290 }
291
292 /* Check and cleanup relayd */
293 rcu_read_lock();
294 relayd = consumer_find_relayd(stream->net_seq_idx);
295 if (relayd != NULL) {
296 uatomic_dec(&relayd->refcount);
297 assert(uatomic_read(&relayd->refcount) >= 0);
298
299 /* Closing streams requires to lock the control socket. */
300 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
301 ret = relayd_send_close_stream(&relayd->control_sock,
302 stream->relayd_stream_id,
303 stream->next_net_seq_num - 1);
304 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
305 if (ret < 0) {
306 DBG("Unable to close stream on the relayd. Continuing");
307 /*
308 * Continue here. There is nothing we can do for the relayd.
309 * Chances are that the relayd has closed the socket so we just
310 * continue cleaning up.
311 */
312 }
313
314 /* Both conditions are met, we destroy the relayd. */
315 if (uatomic_read(&relayd->refcount) == 0 &&
316 uatomic_read(&relayd->destroy_flag)) {
317 consumer_destroy_relayd(relayd);
318 }
319 }
320 rcu_read_unlock();
321
322 if (!--stream->chan->refcount) {
323 free_chan = stream->chan;
324 }
325
326
327 call_rcu(&stream->node.head, consumer_free_stream);
328 end:
329 consumer_data.need_update = 1;
330 pthread_mutex_unlock(&consumer_data.lock);
331
332 if (free_chan)
333 consumer_del_channel(free_chan);
334 }
335
336 struct lttng_consumer_stream *consumer_allocate_stream(
337 int channel_key, int stream_key,
338 int shm_fd, int wait_fd,
339 enum lttng_consumer_stream_state state,
340 uint64_t mmap_len,
341 enum lttng_event_output output,
342 const char *path_name,
343 uid_t uid,
344 gid_t gid,
345 int net_index,
346 int metadata_flag)
347 {
348 struct lttng_consumer_stream *stream;
349 int ret;
350
351 stream = zmalloc(sizeof(*stream));
352 if (stream == NULL) {
353 perror("malloc struct lttng_consumer_stream");
354 goto end;
355 }
356 stream->chan = consumer_find_channel(channel_key);
357 if (!stream->chan) {
358 perror("Unable to find channel key");
359 goto end;
360 }
361 stream->chan->refcount++;
362 stream->key = stream_key;
363 stream->shm_fd = shm_fd;
364 stream->wait_fd = wait_fd;
365 stream->out_fd = -1;
366 stream->out_fd_offset = 0;
367 stream->state = state;
368 stream->mmap_len = mmap_len;
369 stream->mmap_base = NULL;
370 stream->output = output;
371 stream->uid = uid;
372 stream->gid = gid;
373 stream->net_seq_idx = net_index;
374 stream->metadata_flag = metadata_flag;
375 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
376 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
377 lttng_ht_node_init_ulong(&stream->node, stream->key);
378 lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
379
380 switch (consumer_data.type) {
381 case LTTNG_CONSUMER_KERNEL:
382 break;
383 case LTTNG_CONSUMER32_UST:
384 case LTTNG_CONSUMER64_UST:
385 stream->cpu = stream->chan->cpucount++;
386 ret = lttng_ustconsumer_allocate_stream(stream);
387 if (ret) {
388 free(stream);
389 return NULL;
390 }
391 break;
392 default:
393 ERR("Unknown consumer_data type");
394 assert(0);
395 goto end;
396 }
397 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
398 stream->path_name, stream->key,
399 stream->shm_fd,
400 stream->wait_fd,
401 (unsigned long long) stream->mmap_len,
402 stream->out_fd,
403 stream->net_seq_idx);
404 end:
405 return stream;
406 }
407
408 /*
409 * Add a stream to the global list protected by a mutex.
410 */
411 int consumer_add_stream(struct lttng_consumer_stream *stream)
412 {
413 int ret = 0;
414 struct lttng_ht_node_ulong *node;
415 struct lttng_ht_iter iter;
416 struct consumer_relayd_sock_pair *relayd;
417
418 pthread_mutex_lock(&consumer_data.lock);
419 /* Steal stream identifier, for UST */
420 consumer_steal_stream_key(stream->key);
421
422 rcu_read_lock();
423 lttng_ht_lookup(consumer_data.stream_ht,
424 (void *)((unsigned long) stream->key), &iter);
425 node = lttng_ht_iter_get_node_ulong(&iter);
426 if (node != NULL) {
427 rcu_read_unlock();
428 /* Stream already exist. Ignore the insertion */
429 goto end;
430 }
431
432 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
433
434 /* Check and cleanup relayd */
435 relayd = consumer_find_relayd(stream->net_seq_idx);
436 if (relayd != NULL) {
437 uatomic_inc(&relayd->refcount);
438 }
439 rcu_read_unlock();
440
441 /* Update consumer data */
442 consumer_data.stream_count++;
443 consumer_data.need_update = 1;
444
445 end:
446 pthread_mutex_unlock(&consumer_data.lock);
447
448 return ret;
449 }
450
451 /*
452 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
453 * be acquired before calling this.
454 */
455
456 int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
457 {
458 int ret = 0;
459 struct lttng_ht_node_ulong *node;
460 struct lttng_ht_iter iter;
461
462 if (relayd == NULL) {
463 ret = -1;
464 goto end;
465 }
466
467 lttng_ht_lookup(consumer_data.relayd_ht,
468 (void *)((unsigned long) relayd->net_seq_idx), &iter);
469 node = lttng_ht_iter_get_node_ulong(&iter);
470 if (node != NULL) {
471 /* Relayd already exist. Ignore the insertion */
472 goto end;
473 }
474 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
475
476 end:
477 return ret;
478 }
479
480 /*
481 * Allocate and return a consumer relayd socket.
482 */
483 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
484 int net_seq_idx)
485 {
486 struct consumer_relayd_sock_pair *obj = NULL;
487
488 /* Negative net sequence index is a failure */
489 if (net_seq_idx < 0) {
490 goto error;
491 }
492
493 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
494 if (obj == NULL) {
495 PERROR("zmalloc relayd sock");
496 goto error;
497 }
498
499 obj->net_seq_idx = net_seq_idx;
500 obj->refcount = 0;
501 obj->destroy_flag = 0;
502 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
503 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
504
505 error:
506 return obj;
507 }
508
509 /*
510 * Find a relayd socket pair in the global consumer data.
511 *
512 * Return the object if found else NULL.
513 * RCU read-side lock must be held across this call and while using the
514 * returned object.
515 */
516 struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
517 {
518 struct lttng_ht_iter iter;
519 struct lttng_ht_node_ulong *node;
520 struct consumer_relayd_sock_pair *relayd = NULL;
521
522 /* Negative keys are lookup failures */
523 if (key < 0) {
524 goto error;
525 }
526
527 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
528 &iter);
529 node = lttng_ht_iter_get_node_ulong(&iter);
530 if (node != NULL) {
531 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
532 }
533
534 error:
535 return relayd;
536 }
537
538 /*
539 * Handle stream for relayd transmission if the stream applies for network
540 * streaming where the net sequence index is set.
541 *
542 * Return destination file descriptor or negative value on error.
543 */
544 static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
545 size_t data_size, struct consumer_relayd_sock_pair *relayd)
546 {
547 int outfd = -1, ret;
548 struct lttcomm_relayd_data_hdr data_hdr;
549
550 /* Safety net */
551 assert(stream);
552 assert(relayd);
553
554 /* Reset data header */
555 memset(&data_hdr, 0, sizeof(data_hdr));
556
557 if (stream->metadata_flag) {
558 /* Caller MUST acquire the relayd control socket lock */
559 ret = relayd_send_metadata(&relayd->control_sock, data_size);
560 if (ret < 0) {
561 goto error;
562 }
563
564 /* Metadata are always sent on the control socket. */
565 outfd = relayd->control_sock.fd;
566 } else {
567 /* Set header with stream information */
568 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
569 data_hdr.data_size = htobe32(data_size);
570 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
571 /* Other fields are zeroed previously */
572
573 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
574 sizeof(data_hdr));
575 if (ret < 0) {
576 goto error;
577 }
578
579 /* Set to go on data socket */
580 outfd = relayd->data_sock.fd;
581 }
582
583 error:
584 return outfd;
585 }
586
587 /*
588 * Update a stream according to what we just received.
589 */
590 void consumer_change_stream_state(int stream_key,
591 enum lttng_consumer_stream_state state)
592 {
593 struct lttng_consumer_stream *stream;
594
595 pthread_mutex_lock(&consumer_data.lock);
596 stream = consumer_find_stream(stream_key);
597 if (stream) {
598 stream->state = state;
599 }
600 consumer_data.need_update = 1;
601 pthread_mutex_unlock(&consumer_data.lock);
602 }
603
604 static
605 void consumer_free_channel(struct rcu_head *head)
606 {
607 struct lttng_ht_node_ulong *node =
608 caa_container_of(head, struct lttng_ht_node_ulong, head);
609 struct lttng_consumer_channel *channel =
610 caa_container_of(node, struct lttng_consumer_channel, node);
611
612 free(channel);
613 }
614
615 /*
616 * Remove a channel from the global list protected by a mutex. This
617 * function is also responsible for freeing its data structures.
618 */
619 void consumer_del_channel(struct lttng_consumer_channel *channel)
620 {
621 int ret;
622 struct lttng_ht_iter iter;
623
624 pthread_mutex_lock(&consumer_data.lock);
625
626 switch (consumer_data.type) {
627 case LTTNG_CONSUMER_KERNEL:
628 break;
629 case LTTNG_CONSUMER32_UST:
630 case LTTNG_CONSUMER64_UST:
631 lttng_ustconsumer_del_channel(channel);
632 break;
633 default:
634 ERR("Unknown consumer_data type");
635 assert(0);
636 goto end;
637 }
638
639 rcu_read_lock();
640 iter.iter.node = &channel->node.node;
641 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
642 assert(!ret);
643 rcu_read_unlock();
644
645 if (channel->mmap_base != NULL) {
646 ret = munmap(channel->mmap_base, channel->mmap_len);
647 if (ret != 0) {
648 perror("munmap");
649 }
650 }
651 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
652 ret = close(channel->wait_fd);
653 if (ret) {
654 PERROR("close");
655 }
656 }
657 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
658 ret = close(channel->shm_fd);
659 if (ret) {
660 PERROR("close");
661 }
662 }
663
664 call_rcu(&channel->node.head, consumer_free_channel);
665 end:
666 pthread_mutex_unlock(&consumer_data.lock);
667 }
668
669 struct lttng_consumer_channel *consumer_allocate_channel(
670 int channel_key,
671 int shm_fd, int wait_fd,
672 uint64_t mmap_len,
673 uint64_t max_sb_size)
674 {
675 struct lttng_consumer_channel *channel;
676 int ret;
677
678 channel = zmalloc(sizeof(*channel));
679 if (channel == NULL) {
680 perror("malloc struct lttng_consumer_channel");
681 goto end;
682 }
683 channel->key = channel_key;
684 channel->shm_fd = shm_fd;
685 channel->wait_fd = wait_fd;
686 channel->mmap_len = mmap_len;
687 channel->max_sb_size = max_sb_size;
688 channel->refcount = 0;
689 lttng_ht_node_init_ulong(&channel->node, channel->key);
690
691 switch (consumer_data.type) {
692 case LTTNG_CONSUMER_KERNEL:
693 channel->mmap_base = NULL;
694 channel->mmap_len = 0;
695 break;
696 case LTTNG_CONSUMER32_UST:
697 case LTTNG_CONSUMER64_UST:
698 ret = lttng_ustconsumer_allocate_channel(channel);
699 if (ret) {
700 free(channel);
701 return NULL;
702 }
703 break;
704 default:
705 ERR("Unknown consumer_data type");
706 assert(0);
707 goto end;
708 }
709 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
710 channel->key, channel->shm_fd, channel->wait_fd,
711 (unsigned long long) channel->mmap_len,
712 (unsigned long long) channel->max_sb_size);
713 end:
714 return channel;
715 }
716
717 /*
718 * Add a channel to the global list protected by a mutex.
719 */
720 int consumer_add_channel(struct lttng_consumer_channel *channel)
721 {
722 struct lttng_ht_node_ulong *node;
723 struct lttng_ht_iter iter;
724
725 pthread_mutex_lock(&consumer_data.lock);
726 /* Steal channel identifier, for UST */
727 consumer_steal_channel_key(channel->key);
728 rcu_read_lock();
729
730 lttng_ht_lookup(consumer_data.channel_ht,
731 (void *)((unsigned long) channel->key), &iter);
732 node = lttng_ht_iter_get_node_ulong(&iter);
733 if (node != NULL) {
734 /* Channel already exist. Ignore the insertion */
735 goto end;
736 }
737
738 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
739
740 end:
741 rcu_read_unlock();
742 pthread_mutex_unlock(&consumer_data.lock);
743
744 return 0;
745 }
746
747 /*
748 * Allocate the pollfd structure and the local view of the out fds to avoid
749 * doing a lookup in the linked list and concurrency issues when writing is
750 * needed. Called with consumer_data.lock held.
751 *
752 * Returns the number of fds in the structures.
753 */
754 int consumer_update_poll_array(
755 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
756 struct lttng_consumer_stream **local_stream)
757 {
758 int i = 0;
759 struct lttng_ht_iter iter;
760 struct lttng_consumer_stream *stream;
761
762 DBG("Updating poll fd array");
763 rcu_read_lock();
764 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
765 node.node) {
766 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
767 continue;
768 }
769 DBG("Active FD %d", stream->wait_fd);
770 (*pollfd)[i].fd = stream->wait_fd;
771 (*pollfd)[i].events = POLLIN | POLLPRI;
772 local_stream[i] = stream;
773 i++;
774 }
775 rcu_read_unlock();
776
777 /*
778 * Insert the consumer_poll_pipe at the end of the array and don't
779 * increment i so nb_fd is the number of real FD.
780 */
781 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
782 (*pollfd)[i].events = POLLIN | POLLPRI;
783 return i;
784 }
785
786 /*
787 * Poll on the should_quit pipe and the command socket return -1 on error and
788 * should exit, 0 if data is available on the command socket
789 */
790 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
791 {
792 int num_rdy;
793
794 restart:
795 num_rdy = poll(consumer_sockpoll, 2, -1);
796 if (num_rdy == -1) {
797 /*
798 * Restart interrupted system call.
799 */
800 if (errno == EINTR) {
801 goto restart;
802 }
803 perror("Poll error");
804 goto exit;
805 }
806 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
807 DBG("consumer_should_quit wake up");
808 goto exit;
809 }
810 return 0;
811
812 exit:
813 return -1;
814 }
815
816 /*
817 * Set the error socket.
818 */
819 void lttng_consumer_set_error_sock(
820 struct lttng_consumer_local_data *ctx, int sock)
821 {
822 ctx->consumer_error_socket = sock;
823 }
824
825 /*
826 * Set the command socket path.
827 */
828 void lttng_consumer_set_command_sock_path(
829 struct lttng_consumer_local_data *ctx, char *sock)
830 {
831 ctx->consumer_command_sock_path = sock;
832 }
833
834 /*
835 * Send return code to the session daemon.
836 * If the socket is not defined, we return 0, it is not a fatal error
837 */
838 int lttng_consumer_send_error(
839 struct lttng_consumer_local_data *ctx, int cmd)
840 {
841 if (ctx->consumer_error_socket > 0) {
842 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
843 sizeof(enum lttcomm_sessiond_command));
844 }
845
846 return 0;
847 }
848
849 /*
850 * Close all the tracefiles and stream fds, should be called when all instances
851 * are destroyed.
852 */
853 void lttng_consumer_cleanup(void)
854 {
855 struct lttng_ht_iter iter;
856 struct lttng_ht_node_ulong *node;
857
858 rcu_read_lock();
859
860 /*
861 * close all outfd. Called when there are no more threads running (after
862 * joining on the threads), no need to protect list iteration with mutex.
863 */
864 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
865 node) {
866 struct lttng_consumer_stream *stream =
867 caa_container_of(node, struct lttng_consumer_stream, node);
868 consumer_del_stream(stream);
869 }
870
871 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
872 node) {
873 struct lttng_consumer_channel *channel =
874 caa_container_of(node, struct lttng_consumer_channel, node);
875 consumer_del_channel(channel);
876 }
877
878 rcu_read_unlock();
879
880 lttng_ht_destroy(consumer_data.stream_ht);
881 lttng_ht_destroy(consumer_data.channel_ht);
882 }
883
884 /*
885 * Called from signal handler.
886 */
887 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
888 {
889 int ret;
890 consumer_quit = 1;
891 do {
892 ret = write(ctx->consumer_should_quit[1], "4", 1);
893 } while (ret < 0 && errno == EINTR);
894 if (ret < 0) {
895 perror("write consumer quit");
896 }
897 }
898
899 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
900 off_t orig_offset)
901 {
902 int outfd = stream->out_fd;
903
904 /*
905 * This does a blocking write-and-wait on any page that belongs to the
906 * subbuffer prior to the one we just wrote.
907 * Don't care about error values, as these are just hints and ways to
908 * limit the amount of page cache used.
909 */
910 if (orig_offset < stream->chan->max_sb_size) {
911 return;
912 }
913 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
914 stream->chan->max_sb_size,
915 SYNC_FILE_RANGE_WAIT_BEFORE
916 | SYNC_FILE_RANGE_WRITE
917 | SYNC_FILE_RANGE_WAIT_AFTER);
918 /*
919 * Give hints to the kernel about how we access the file:
920 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
921 * we write it.
922 *
923 * We need to call fadvise again after the file grows because the
924 * kernel does not seem to apply fadvise to non-existing parts of the
925 * file.
926 *
927 * Call fadvise _after_ having waited for the page writeback to
928 * complete because the dirty page writeback semantic is not well
929 * defined. So it can be expected to lead to lower throughput in
930 * streaming.
931 */
932 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
933 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
934 }
935
936 /*
937 * Initialise the necessary environnement :
938 * - create a new context
939 * - create the poll_pipe
940 * - create the should_quit pipe (for signal handler)
941 * - create the thread pipe (for splice)
942 *
943 * Takes a function pointer as argument, this function is called when data is
944 * available on a buffer. This function is responsible to do the
945 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
946 * buffer configuration and then kernctl_put_next_subbuf at the end.
947 *
948 * Returns a pointer to the new context or NULL on error.
949 */
950 struct lttng_consumer_local_data *lttng_consumer_create(
951 enum lttng_consumer_type type,
952 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
953 struct lttng_consumer_local_data *ctx),
954 int (*recv_channel)(struct lttng_consumer_channel *channel),
955 int (*recv_stream)(struct lttng_consumer_stream *stream),
956 int (*update_stream)(int stream_key, uint32_t state))
957 {
958 int ret, i;
959 struct lttng_consumer_local_data *ctx;
960
961 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
962 consumer_data.type == type);
963 consumer_data.type = type;
964
965 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
966 if (ctx == NULL) {
967 perror("allocating context");
968 goto error;
969 }
970
971 ctx->consumer_error_socket = -1;
972 /* assign the callbacks */
973 ctx->on_buffer_ready = buffer_ready;
974 ctx->on_recv_channel = recv_channel;
975 ctx->on_recv_stream = recv_stream;
976 ctx->on_update_stream = update_stream;
977
978 ret = pipe(ctx->consumer_poll_pipe);
979 if (ret < 0) {
980 perror("Error creating poll pipe");
981 goto error_poll_pipe;
982 }
983
984 /* set read end of the pipe to non-blocking */
985 ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
986 if (ret < 0) {
987 perror("fcntl O_NONBLOCK");
988 goto error_poll_fcntl;
989 }
990
991 /* set write end of the pipe to non-blocking */
992 ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
993 if (ret < 0) {
994 perror("fcntl O_NONBLOCK");
995 goto error_poll_fcntl;
996 }
997
998 ret = pipe(ctx->consumer_should_quit);
999 if (ret < 0) {
1000 perror("Error creating recv pipe");
1001 goto error_quit_pipe;
1002 }
1003
1004 ret = pipe(ctx->consumer_thread_pipe);
1005 if (ret < 0) {
1006 perror("Error creating thread pipe");
1007 goto error_thread_pipe;
1008 }
1009
1010 ret = utils_create_pipe(ctx->consumer_metadata_pipe);
1011 if (ret < 0) {
1012 goto error_metadata_pipe;
1013 }
1014
1015 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1016 if (ret < 0) {
1017 goto error_splice_pipe;
1018 }
1019
1020 return ctx;
1021
1022 error_splice_pipe:
1023 utils_close_pipe(ctx->consumer_metadata_pipe);
1024 error_metadata_pipe:
1025 utils_close_pipe(ctx->consumer_thread_pipe);
1026 error_thread_pipe:
1027 for (i = 0; i < 2; i++) {
1028 int err;
1029
1030 err = close(ctx->consumer_should_quit[i]);
1031 if (err) {
1032 PERROR("close");
1033 }
1034 }
1035 error_poll_fcntl:
1036 error_quit_pipe:
1037 for (i = 0; i < 2; i++) {
1038 int err;
1039
1040 err = close(ctx->consumer_poll_pipe[i]);
1041 if (err) {
1042 PERROR("close");
1043 }
1044 }
1045 error_poll_pipe:
1046 free(ctx);
1047 error:
1048 return NULL;
1049 }
1050
1051 /*
1052 * Close all fds associated with the instance and free the context.
1053 */
1054 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1055 {
1056 int ret;
1057
1058 ret = close(ctx->consumer_error_socket);
1059 if (ret) {
1060 PERROR("close");
1061 }
1062 ret = close(ctx->consumer_thread_pipe[0]);
1063 if (ret) {
1064 PERROR("close");
1065 }
1066 ret = close(ctx->consumer_thread_pipe[1]);
1067 if (ret) {
1068 PERROR("close");
1069 }
1070 ret = close(ctx->consumer_poll_pipe[0]);
1071 if (ret) {
1072 PERROR("close");
1073 }
1074 ret = close(ctx->consumer_poll_pipe[1]);
1075 if (ret) {
1076 PERROR("close");
1077 }
1078 ret = close(ctx->consumer_should_quit[0]);
1079 if (ret) {
1080 PERROR("close");
1081 }
1082 ret = close(ctx->consumer_should_quit[1]);
1083 if (ret) {
1084 PERROR("close");
1085 }
1086 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1087
1088 unlink(ctx->consumer_command_sock_path);
1089 free(ctx);
1090 }
1091
1092 /*
1093 * Write the metadata stream id on the specified file descriptor.
1094 */
1095 static int write_relayd_metadata_id(int fd,
1096 struct lttng_consumer_stream *stream,
1097 struct consumer_relayd_sock_pair *relayd)
1098 {
1099 int ret;
1100 uint64_t metadata_id;
1101
1102 metadata_id = htobe64(stream->relayd_stream_id);
1103 do {
1104 ret = write(fd, (void *) &metadata_id,
1105 sizeof(stream->relayd_stream_id));
1106 } while (ret < 0 && errno == EINTR);
1107 if (ret < 0) {
1108 PERROR("write metadata stream id");
1109 goto end;
1110 }
1111 DBG("Metadata stream id %" PRIu64 " written before data",
1112 stream->relayd_stream_id);
1113
1114 end:
1115 return ret;
1116 }
1117
1118 /*
1119 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1120 * core function for writing trace buffers to either the local filesystem or
1121 * the network.
1122 *
1123 * Careful review MUST be put if any changes occur!
1124 *
1125 * Returns the number of bytes written
1126 */
1127 ssize_t lttng_consumer_on_read_subbuffer_mmap(
1128 struct lttng_consumer_local_data *ctx,
1129 struct lttng_consumer_stream *stream, unsigned long len)
1130 {
1131 unsigned long mmap_offset;
1132 ssize_t ret = 0, written = 0;
1133 off_t orig_offset = stream->out_fd_offset;
1134 /* Default is on the disk */
1135 int outfd = stream->out_fd;
1136 struct consumer_relayd_sock_pair *relayd = NULL;
1137
1138 /* RCU lock for the relayd pointer */
1139 rcu_read_lock();
1140
1141 /* Flag that the current stream if set for network streaming. */
1142 if (stream->net_seq_idx != -1) {
1143 relayd = consumer_find_relayd(stream->net_seq_idx);
1144 if (relayd == NULL) {
1145 goto end;
1146 }
1147 }
1148
1149 /* get the offset inside the fd to mmap */
1150 switch (consumer_data.type) {
1151 case LTTNG_CONSUMER_KERNEL:
1152 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1153 break;
1154 case LTTNG_CONSUMER32_UST:
1155 case LTTNG_CONSUMER64_UST:
1156 ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
1157 stream->buf, &mmap_offset);
1158 break;
1159 default:
1160 ERR("Unknown consumer_data type");
1161 assert(0);
1162 }
1163 if (ret != 0) {
1164 errno = -ret;
1165 PERROR("tracer ctl get_mmap_read_offset");
1166 written = ret;
1167 goto end;
1168 }
1169
1170 /* Handle stream on the relayd if the output is on the network */
1171 if (relayd) {
1172 unsigned long netlen = len;
1173
1174 /*
1175 * Lock the control socket for the complete duration of the function
1176 * since from this point on we will use the socket.
1177 */
1178 if (stream->metadata_flag) {
1179 /* Metadata requires the control socket. */
1180 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1181 netlen += sizeof(stream->relayd_stream_id);
1182 }
1183
1184 ret = write_relayd_stream_header(stream, netlen, relayd);
1185 if (ret >= 0) {
1186 /* Use the returned socket. */
1187 outfd = ret;
1188
1189 /* Write metadata stream id before payload */
1190 if (stream->metadata_flag) {
1191 ret = write_relayd_metadata_id(outfd, stream, relayd);
1192 if (ret < 0) {
1193 written = ret;
1194 goto end;
1195 }
1196 }
1197 }
1198 /* Else, use the default set before which is the filesystem. */
1199 }
1200
1201 while (len > 0) {
1202 do {
1203 ret = write(outfd, stream->mmap_base + mmap_offset, len);
1204 } while (ret < 0 && errno == EINTR);
1205 if (ret < 0) {
1206 PERROR("Error in file write");
1207 if (written == 0) {
1208 written = ret;
1209 }
1210 goto end;
1211 } else if (ret > len) {
1212 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
1213 written += ret;
1214 goto end;
1215 } else {
1216 len -= ret;
1217 mmap_offset += ret;
1218 }
1219 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
1220
1221 /* This call is useless on a socket so better save a syscall. */
1222 if (!relayd) {
1223 /* This won't block, but will start writeout asynchronously */
1224 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1225 SYNC_FILE_RANGE_WRITE);
1226 stream->out_fd_offset += ret;
1227 }
1228 written += ret;
1229 }
1230 lttng_consumer_sync_trace_file(stream, orig_offset);
1231
1232 end:
1233 /* Unlock only if ctrl socket used */
1234 if (relayd && stream->metadata_flag) {
1235 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1236 }
1237
1238 rcu_read_unlock();
1239 return written;
1240 }
1241
1242 /*
1243 * Splice the data from the ring buffer to the tracefile.
1244 *
1245 * Returns the number of bytes spliced.
1246 */
1247 ssize_t lttng_consumer_on_read_subbuffer_splice(
1248 struct lttng_consumer_local_data *ctx,
1249 struct lttng_consumer_stream *stream, unsigned long len)
1250 {
1251 ssize_t ret = 0, written = 0, ret_splice = 0;
1252 loff_t offset = 0;
1253 off_t orig_offset = stream->out_fd_offset;
1254 int fd = stream->wait_fd;
1255 /* Default is on the disk */
1256 int outfd = stream->out_fd;
1257 struct consumer_relayd_sock_pair *relayd = NULL;
1258 int *splice_pipe;
1259
1260 switch (consumer_data.type) {
1261 case LTTNG_CONSUMER_KERNEL:
1262 break;
1263 case LTTNG_CONSUMER32_UST:
1264 case LTTNG_CONSUMER64_UST:
1265 /* Not supported for user space tracing */
1266 return -ENOSYS;
1267 default:
1268 ERR("Unknown consumer_data type");
1269 assert(0);
1270 }
1271
1272 /* RCU lock for the relayd pointer */
1273 rcu_read_lock();
1274
1275 /* Flag that the current stream if set for network streaming. */
1276 if (stream->net_seq_idx != -1) {
1277 relayd = consumer_find_relayd(stream->net_seq_idx);
1278 if (relayd == NULL) {
1279 goto end;
1280 }
1281 }
1282
1283 /*
1284 * Choose right pipe for splice. Metadata and trace data are handled by
1285 * different threads hence the use of two pipes in order not to race or
1286 * corrupt the written data.
1287 */
1288 if (stream->metadata_flag) {
1289 splice_pipe = ctx->consumer_splice_metadata_pipe;
1290 } else {
1291 splice_pipe = ctx->consumer_thread_pipe;
1292 }
1293
1294 /* Write metadata stream id before payload */
1295 if (stream->metadata_flag && relayd) {
1296 /*
1297 * Lock the control socket for the complete duration of the function
1298 * since from this point on we will use the socket.
1299 */
1300 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1301
1302 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd);
1303 if (ret < 0) {
1304 written = ret;
1305 goto end;
1306 }
1307 }
1308
1309 while (len > 0) {
1310 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
1311 (unsigned long)offset, len, fd);
1312 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
1313 SPLICE_F_MOVE | SPLICE_F_MORE);
1314 DBG("splice chan to pipe, ret %zd", ret_splice);
1315 if (ret_splice < 0) {
1316 PERROR("Error in relay splice");
1317 if (written == 0) {
1318 written = ret_splice;
1319 }
1320 ret = errno;
1321 goto splice_error;
1322 }
1323
1324 /* Handle stream on the relayd if the output is on the network */
1325 if (relayd) {
1326 if (stream->metadata_flag) {
1327 /* Update counter to fit the spliced data */
1328 ret_splice += sizeof(stream->relayd_stream_id);
1329 len += sizeof(stream->relayd_stream_id);
1330 /*
1331 * We do this so the return value can match the len passed as
1332 * argument to this function.
1333 */
1334 written -= sizeof(stream->relayd_stream_id);
1335 }
1336
1337 ret = write_relayd_stream_header(stream, ret_splice, relayd);
1338 if (ret >= 0) {
1339 /* Use the returned socket. */
1340 outfd = ret;
1341 } else {
1342 ERR("Remote relayd disconnected. Stopping");
1343 goto end;
1344 }
1345 }
1346
1347 /* Splice data out */
1348 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
1349 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1350 DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice);
1351 if (ret_splice < 0) {
1352 PERROR("Error in file splice");
1353 if (written == 0) {
1354 written = ret_splice;
1355 }
1356 ret = errno;
1357 goto splice_error;
1358 } else if (ret_splice > len) {
1359 errno = EINVAL;
1360 PERROR("Wrote more data than requested %zd (len: %lu)",
1361 ret_splice, len);
1362 written += ret_splice;
1363 ret = errno;
1364 goto splice_error;
1365 }
1366 len -= ret_splice;
1367
1368 /* This call is useless on a socket so better save a syscall. */
1369 if (!relayd) {
1370 /* This won't block, but will start writeout asynchronously */
1371 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1372 SYNC_FILE_RANGE_WRITE);
1373 stream->out_fd_offset += ret_splice;
1374 }
1375 written += ret_splice;
1376 }
1377 lttng_consumer_sync_trace_file(stream, orig_offset);
1378
1379 ret = ret_splice;
1380
1381 goto end;
1382
1383 splice_error:
1384 /* send the appropriate error description to sessiond */
1385 switch (ret) {
1386 case EBADF:
1387 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
1388 break;
1389 case EINVAL:
1390 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
1391 break;
1392 case ENOMEM:
1393 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
1394 break;
1395 case ESPIPE:
1396 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
1397 break;
1398 }
1399
1400 end:
1401 if (relayd && stream->metadata_flag) {
1402 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1403 }
1404
1405 rcu_read_unlock();
1406 return written;
1407 }
1408
1409 /*
1410 * Take a snapshot for a specific fd
1411 *
1412 * Returns 0 on success, < 0 on error
1413 */
1414 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1415 struct lttng_consumer_stream *stream)
1416 {
1417 switch (consumer_data.type) {
1418 case LTTNG_CONSUMER_KERNEL:
1419 return lttng_kconsumer_take_snapshot(ctx, stream);
1420 case LTTNG_CONSUMER32_UST:
1421 case LTTNG_CONSUMER64_UST:
1422 return lttng_ustconsumer_take_snapshot(ctx, stream);
1423 default:
1424 ERR("Unknown consumer_data type");
1425 assert(0);
1426 return -ENOSYS;
1427 }
1428
1429 }
1430
1431 /*
1432 * Get the produced position
1433 *
1434 * Returns 0 on success, < 0 on error
1435 */
1436 int lttng_consumer_get_produced_snapshot(
1437 struct lttng_consumer_local_data *ctx,
1438 struct lttng_consumer_stream *stream,
1439 unsigned long *pos)
1440 {
1441 switch (consumer_data.type) {
1442 case LTTNG_CONSUMER_KERNEL:
1443 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
1444 case LTTNG_CONSUMER32_UST:
1445 case LTTNG_CONSUMER64_UST:
1446 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1447 default:
1448 ERR("Unknown consumer_data type");
1449 assert(0);
1450 return -ENOSYS;
1451 }
1452 }
1453
1454 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1455 int sock, struct pollfd *consumer_sockpoll)
1456 {
1457 switch (consumer_data.type) {
1458 case LTTNG_CONSUMER_KERNEL:
1459 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1460 case LTTNG_CONSUMER32_UST:
1461 case LTTNG_CONSUMER64_UST:
1462 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1463 default:
1464 ERR("Unknown consumer_data type");
1465 assert(0);
1466 return -ENOSYS;
1467 }
1468 }
1469
1470 /*
1471 * Iterate over all stream element of the hashtable and free them. This is race
1472 * free since the hashtable received MUST be in a race free synchronization
1473 * state. It's the caller responsability to make sure of that.
1474 */
1475 static void destroy_stream_ht(struct lttng_ht *ht)
1476 {
1477 int ret;
1478 struct lttng_ht_iter iter;
1479 struct lttng_consumer_stream *stream;
1480
1481 if (ht == NULL) {
1482 return;
1483 }
1484
1485 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1486 ret = lttng_ht_del(ht, &iter);
1487 assert(!ret);
1488
1489 free(stream);
1490 }
1491
1492 lttng_ht_destroy(ht);
1493 }
1494
1495 /*
1496 * Clean up a metadata stream and free its memory.
1497 */
1498 static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
1499 {
1500 int ret;
1501 struct lttng_consumer_channel *free_chan = NULL;
1502 struct consumer_relayd_sock_pair *relayd;
1503
1504 assert(stream);
1505 /*
1506 * This call should NEVER receive regular stream. It must always be
1507 * metadata stream and this is crucial for data structure synchronization.
1508 */
1509 assert(stream->metadata_flag);
1510
1511 pthread_mutex_lock(&consumer_data.lock);
1512 switch (consumer_data.type) {
1513 case LTTNG_CONSUMER_KERNEL:
1514 if (stream->mmap_base != NULL) {
1515 ret = munmap(stream->mmap_base, stream->mmap_len);
1516 if (ret != 0) {
1517 PERROR("munmap metadata stream");
1518 }
1519 }
1520 break;
1521 case LTTNG_CONSUMER32_UST:
1522 case LTTNG_CONSUMER64_UST:
1523 lttng_ustconsumer_del_stream(stream);
1524 break;
1525 default:
1526 ERR("Unknown consumer_data type");
1527 assert(0);
1528 }
1529 pthread_mutex_unlock(&consumer_data.lock);
1530
1531 if (stream->out_fd >= 0) {
1532 ret = close(stream->out_fd);
1533 if (ret) {
1534 PERROR("close");
1535 }
1536 }
1537
1538 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
1539 ret = close(stream->wait_fd);
1540 if (ret) {
1541 PERROR("close");
1542 }
1543 }
1544
1545 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
1546 ret = close(stream->shm_fd);
1547 if (ret) {
1548 PERROR("close");
1549 }
1550 }
1551
1552 /* Check and cleanup relayd */
1553 rcu_read_lock();
1554 relayd = consumer_find_relayd(stream->net_seq_idx);
1555 if (relayd != NULL) {
1556 uatomic_dec(&relayd->refcount);
1557 assert(uatomic_read(&relayd->refcount) >= 0);
1558
1559 /* Closing streams requires to lock the control socket. */
1560 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1561 ret = relayd_send_close_stream(&relayd->control_sock,
1562 stream->relayd_stream_id, stream->next_net_seq_num - 1);
1563 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1564 if (ret < 0) {
1565 DBG("Unable to close stream on the relayd. Continuing");
1566 /*
1567 * Continue here. There is nothing we can do for the relayd.
1568 * Chances are that the relayd has closed the socket so we just
1569 * continue cleaning up.
1570 */
1571 }
1572
1573 /* Both conditions are met, we destroy the relayd. */
1574 if (uatomic_read(&relayd->refcount) == 0 &&
1575 uatomic_read(&relayd->destroy_flag)) {
1576 consumer_destroy_relayd(relayd);
1577 }
1578 }
1579 rcu_read_unlock();
1580
1581 /* Atomically decrement channel refcount since other threads can use it. */
1582 uatomic_dec(&stream->chan->refcount);
1583 if (!uatomic_read(&stream->chan->refcount)) {
1584 free_chan = stream->chan;
1585 }
1586
1587 if (free_chan) {
1588 consumer_del_channel(free_chan);
1589 }
1590
1591 free(stream);
1592 }
1593
1594 /*
1595 * Action done with the metadata stream when adding it to the consumer internal
1596 * data structures to handle it.
1597 */
1598 static void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
1599 {
1600 struct consumer_relayd_sock_pair *relayd;
1601
1602 /* Find relayd and, if one is found, increment refcount. */
1603 rcu_read_lock();
1604 relayd = consumer_find_relayd(stream->net_seq_idx);
1605 if (relayd != NULL) {
1606 uatomic_inc(&relayd->refcount);
1607 }
1608 rcu_read_unlock();
1609 }
1610
1611 /*
1612 * Thread polls on metadata file descriptor and write them on disk or on the
1613 * network.
1614 */
1615 void *lttng_consumer_thread_poll_metadata(void *data)
1616 {
1617 int ret, i, pollfd;
1618 uint32_t revents, nb_fd;
1619 struct lttng_consumer_stream *stream;
1620 struct lttng_ht_iter iter;
1621 struct lttng_ht_node_ulong *node;
1622 struct lttng_ht *metadata_ht = NULL;
1623 struct lttng_poll_event events;
1624 struct lttng_consumer_local_data *ctx = data;
1625 ssize_t len;
1626
1627 rcu_register_thread();
1628
1629 DBG("Thread metadata poll started");
1630
1631 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1632 if (metadata_ht == NULL) {
1633 goto end;
1634 }
1635
1636 /* Size is set to 1 for the consumer_metadata pipe */
1637 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
1638 if (ret < 0) {
1639 ERR("Poll set creation failed");
1640 goto end;
1641 }
1642
1643 ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
1644 if (ret < 0) {
1645 goto end;
1646 }
1647
1648 /* Main loop */
1649 DBG("Metadata main loop started");
1650
1651 while (1) {
1652 lttng_poll_reset(&events);
1653
1654 nb_fd = LTTNG_POLL_GETNB(&events);
1655
1656 /* Only the metadata pipe is set */
1657 if (nb_fd == 0 && consumer_quit == 1) {
1658 goto end;
1659 }
1660
1661 restart:
1662 DBG("Metadata poll wait with %d fd(s)", nb_fd);
1663 ret = lttng_poll_wait(&events, -1);
1664 DBG("Metadata event catched in thread");
1665 if (ret < 0) {
1666 if (errno == EINTR) {
1667 goto restart;
1668 }
1669 goto error;
1670 }
1671
1672 for (i = 0; i < nb_fd; i++) {
1673 revents = LTTNG_POLL_GETEV(&events, i);
1674 pollfd = LTTNG_POLL_GETFD(&events, i);
1675
1676 /* Check the metadata pipe for incoming metadata. */
1677 if (pollfd == ctx->consumer_metadata_pipe[0]) {
1678 if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) {
1679 DBG("Metadata thread pipe hung up");
1680 /*
1681 * Remove the pipe from the poll set and continue the loop
1682 * since their might be data to consume.
1683 */
1684 lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
1685 close(ctx->consumer_metadata_pipe[0]);
1686 continue;
1687 } else if (revents & LPOLLIN) {
1688 stream = zmalloc(sizeof(struct lttng_consumer_stream));
1689 if (stream == NULL) {
1690 PERROR("zmalloc metadata consumer stream");
1691 goto error;
1692 }
1693
1694 do {
1695 /* Get the stream and add it to the local hash table */
1696 ret = read(pollfd, stream,
1697 sizeof(struct lttng_consumer_stream));
1698 } while (ret < 0 && errno == EINTR);
1699 if (ret < 0 || ret < sizeof(struct lttng_consumer_stream)) {
1700 PERROR("read metadata stream");
1701 free(stream);
1702 /*
1703 * Let's continue here and hope we can still work
1704 * without stopping the consumer. XXX: Should we?
1705 */
1706 continue;
1707 }
1708
1709 DBG("Adding metadata stream %d to poll set",
1710 stream->wait_fd);
1711
1712 /* The node should be init at this point */
1713 lttng_ht_add_unique_ulong(metadata_ht,
1714 &stream->waitfd_node);
1715
1716 /* Add metadata stream to the global poll events list */
1717 lttng_poll_add(&events, stream->wait_fd,
1718 LPOLLIN | LPOLLPRI);
1719
1720 consumer_add_metadata_stream(stream);
1721 }
1722
1723 /* Metadata pipe handled. Continue handling the others */
1724 continue;
1725 }
1726
1727 /* From here, the event is a metadata wait fd */
1728
1729 lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
1730 &iter);
1731 node = lttng_ht_iter_get_node_ulong(&iter);
1732 if (node == NULL) {
1733 /* FD not found, continue loop */
1734 continue;
1735 }
1736
1737 stream = caa_container_of(node, struct lttng_consumer_stream,
1738 waitfd_node);
1739
1740 /* Get the data out of the metadata file descriptor */
1741 if (revents & (LPOLLIN | LPOLLPRI)) {
1742 DBG("Metadata available on fd %d", pollfd);
1743 assert(stream->wait_fd == pollfd);
1744
1745 len = ctx->on_buffer_ready(stream, ctx);
1746 /* It's ok to have an unavailable sub-buffer */
1747 if (len < 0 && len != -EAGAIN) {
1748 goto end;
1749 } else if (len > 0) {
1750 stream->data_read = 1;
1751 }
1752 }
1753
1754 /*
1755 * Remove the stream from the hash table since there is no data
1756 * left on the fd because we previously did a read on the buffer.
1757 */
1758 if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) {
1759 DBG("Metadata fd %d is hup|err|nval.", pollfd);
1760 if (!stream->hangup_flush_done
1761 && (consumer_data.type == LTTNG_CONSUMER32_UST
1762 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1763 DBG("Attempting to flush and consume the UST buffers");
1764 lttng_ustconsumer_on_stream_hangup(stream);
1765
1766 /* We just flushed the stream now read it. */
1767 len = ctx->on_buffer_ready(stream, ctx);
1768 /* It's ok to have an unavailable sub-buffer */
1769 if (len < 0 && len != -EAGAIN) {
1770 goto end;
1771 }
1772 }
1773
1774 /* Removing it from hash table, poll set and free memory */
1775 lttng_ht_del(metadata_ht, &iter);
1776 lttng_poll_del(&events, stream->wait_fd);
1777 consumer_del_metadata_stream(stream);
1778 }
1779 }
1780 }
1781
1782 error:
1783 end:
1784 DBG("Metadata poll thread exiting");
1785 lttng_poll_clean(&events);
1786
1787 if (metadata_ht) {
1788 destroy_stream_ht(metadata_ht);
1789 }
1790
1791 rcu_unregister_thread();
1792 return NULL;
1793 }
1794
1795 /*
1796 * This thread polls the fds in the set to consume the data and write
1797 * it to tracefile if necessary.
1798 */
1799 void *lttng_consumer_thread_poll_fds(void *data)
1800 {
1801 int num_rdy, num_hup, high_prio, ret, i;
1802 struct pollfd *pollfd = NULL;
1803 /* local view of the streams */
1804 struct lttng_consumer_stream **local_stream = NULL;
1805 /* local view of consumer_data.fds_count */
1806 int nb_fd = 0;
1807 struct lttng_consumer_local_data *ctx = data;
1808 ssize_t len;
1809 pthread_t metadata_thread;
1810 void *status;
1811
1812 rcu_register_thread();
1813
1814 /* Start metadata polling thread */
1815 ret = pthread_create(&metadata_thread, NULL,
1816 lttng_consumer_thread_poll_metadata, (void *) ctx);
1817 if (ret < 0) {
1818 PERROR("pthread_create metadata thread");
1819 goto end;
1820 }
1821
1822 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
1823
1824 while (1) {
1825 high_prio = 0;
1826 num_hup = 0;
1827
1828 /*
1829 * the fds set has been updated, we need to update our
1830 * local array as well
1831 */
1832 pthread_mutex_lock(&consumer_data.lock);
1833 if (consumer_data.need_update) {
1834 if (pollfd != NULL) {
1835 free(pollfd);
1836 pollfd = NULL;
1837 }
1838 if (local_stream != NULL) {
1839 free(local_stream);
1840 local_stream = NULL;
1841 }
1842
1843 /* allocate for all fds + 1 for the consumer_poll_pipe */
1844 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
1845 if (pollfd == NULL) {
1846 perror("pollfd malloc");
1847 pthread_mutex_unlock(&consumer_data.lock);
1848 goto end;
1849 }
1850
1851 /* allocate for all fds + 1 for the consumer_poll_pipe */
1852 local_stream = zmalloc((consumer_data.stream_count + 1) *
1853 sizeof(struct lttng_consumer_stream));
1854 if (local_stream == NULL) {
1855 perror("local_stream malloc");
1856 pthread_mutex_unlock(&consumer_data.lock);
1857 goto end;
1858 }
1859 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
1860 if (ret < 0) {
1861 ERR("Error in allocating pollfd or local_outfds");
1862 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
1863 pthread_mutex_unlock(&consumer_data.lock);
1864 goto end;
1865 }
1866 nb_fd = ret;
1867 consumer_data.need_update = 0;
1868 }
1869 pthread_mutex_unlock(&consumer_data.lock);
1870
1871 /* No FDs and consumer_quit, consumer_cleanup the thread */
1872 if (nb_fd == 0 && consumer_quit == 1) {
1873 goto end;
1874 }
1875 /* poll on the array of fds */
1876 restart:
1877 DBG("polling on %d fd", nb_fd + 1);
1878 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
1879 DBG("poll num_rdy : %d", num_rdy);
1880 if (num_rdy == -1) {
1881 /*
1882 * Restart interrupted system call.
1883 */
1884 if (errno == EINTR) {
1885 goto restart;
1886 }
1887 perror("Poll error");
1888 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
1889 goto end;
1890 } else if (num_rdy == 0) {
1891 DBG("Polling thread timed out");
1892 goto end;
1893 }
1894
1895 /*
1896 * If the consumer_poll_pipe triggered poll go directly to the
1897 * beginning of the loop to update the array. We want to prioritize
1898 * array update over low-priority reads.
1899 */
1900 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
1901 size_t pipe_readlen;
1902 char tmp;
1903
1904 DBG("consumer_poll_pipe wake up");
1905 /* Consume 1 byte of pipe data */
1906 do {
1907 pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
1908 } while (pipe_readlen == -1 && errno == EINTR);
1909 continue;
1910 }
1911
1912 /* Take care of high priority channels first. */
1913 for (i = 0; i < nb_fd; i++) {
1914 if (pollfd[i].revents & POLLPRI) {
1915 DBG("Urgent read on fd %d", pollfd[i].fd);
1916 high_prio = 1;
1917 len = ctx->on_buffer_ready(local_stream[i], ctx);
1918 /* it's ok to have an unavailable sub-buffer */
1919 if (len < 0 && len != -EAGAIN) {
1920 goto end;
1921 } else if (len > 0) {
1922 local_stream[i]->data_read = 1;
1923 }
1924 }
1925 }
1926
1927 /*
1928 * If we read high prio channel in this loop, try again
1929 * for more high prio data.
1930 */
1931 if (high_prio) {
1932 continue;
1933 }
1934
1935 /* Take care of low priority channels. */
1936 for (i = 0; i < nb_fd; i++) {
1937 if ((pollfd[i].revents & POLLIN) ||
1938 local_stream[i]->hangup_flush_done) {
1939 DBG("Normal read on fd %d", pollfd[i].fd);
1940 len = ctx->on_buffer_ready(local_stream[i], ctx);
1941 /* it's ok to have an unavailable sub-buffer */
1942 if (len < 0 && len != -EAGAIN) {
1943 goto end;
1944 } else if (len > 0) {
1945 local_stream[i]->data_read = 1;
1946 }
1947 }
1948 }
1949
1950 /* Handle hangup and errors */
1951 for (i = 0; i < nb_fd; i++) {
1952 if (!local_stream[i]->hangup_flush_done
1953 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
1954 && (consumer_data.type == LTTNG_CONSUMER32_UST
1955 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1956 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1957 pollfd[i].fd);
1958 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
1959 /* Attempt read again, for the data we just flushed. */
1960 local_stream[i]->data_read = 1;
1961 }
1962 /*
1963 * If the poll flag is HUP/ERR/NVAL and we have
1964 * read no data in this pass, we can remove the
1965 * stream from its hash table.
1966 */
1967 if ((pollfd[i].revents & POLLHUP)) {
1968 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
1969 if (!local_stream[i]->data_read) {
1970 consumer_del_stream(local_stream[i]);
1971 num_hup++;
1972 }
1973 } else if (pollfd[i].revents & POLLERR) {
1974 ERR("Error returned in polling fd %d.", pollfd[i].fd);
1975 if (!local_stream[i]->data_read) {
1976 consumer_del_stream(local_stream[i]);
1977 num_hup++;
1978 }
1979 } else if (pollfd[i].revents & POLLNVAL) {
1980 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
1981 if (!local_stream[i]->data_read) {
1982 consumer_del_stream(local_stream[i]);
1983 num_hup++;
1984 }
1985 }
1986 local_stream[i]->data_read = 0;
1987 }
1988 }
1989 end:
1990 DBG("polling thread exiting");
1991 if (pollfd != NULL) {
1992 free(pollfd);
1993 pollfd = NULL;
1994 }
1995 if (local_stream != NULL) {
1996 free(local_stream);
1997 local_stream = NULL;
1998 }
1999
2000 /*
2001 * Close the write side of the pipe so epoll_wait() in
2002 * lttng_consumer_thread_poll_metadata can catch it. The thread is
2003 * monitoring the read side of the pipe. If we close them both, epoll_wait
2004 * strangely does not return and could create a endless wait period if the
2005 * pipe is the only tracked fd in the poll set. The thread will take care
2006 * of closing the read side.
2007 */
2008 close(ctx->consumer_metadata_pipe[1]);
2009 if (ret) {
2010 ret = pthread_join(metadata_thread, &status);
2011 if (ret < 0) {
2012 PERROR("pthread_join metadata thread");
2013 }
2014 }
2015
2016 rcu_unregister_thread();
2017 return NULL;
2018 }
2019
2020 /*
2021 * This thread listens on the consumerd socket and receives the file
2022 * descriptors from the session daemon.
2023 */
2024 void *lttng_consumer_thread_receive_fds(void *data)
2025 {
2026 int sock, client_socket, ret;
2027 /*
2028 * structure to poll for incoming data on communication socket avoids
2029 * making blocking sockets.
2030 */
2031 struct pollfd consumer_sockpoll[2];
2032 struct lttng_consumer_local_data *ctx = data;
2033
2034 rcu_register_thread();
2035
2036 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
2037 unlink(ctx->consumer_command_sock_path);
2038 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
2039 if (client_socket < 0) {
2040 ERR("Cannot create command socket");
2041 goto end;
2042 }
2043
2044 ret = lttcomm_listen_unix_sock(client_socket);
2045 if (ret < 0) {
2046 goto end;
2047 }
2048
2049 DBG("Sending ready command to lttng-sessiond");
2050 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
2051 /* return < 0 on error, but == 0 is not fatal */
2052 if (ret < 0) {
2053 ERR("Error sending ready command to lttng-sessiond");
2054 goto end;
2055 }
2056
2057 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
2058 if (ret < 0) {
2059 perror("fcntl O_NONBLOCK");
2060 goto end;
2061 }
2062
2063 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2064 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
2065 consumer_sockpoll[0].events = POLLIN | POLLPRI;
2066 consumer_sockpoll[1].fd = client_socket;
2067 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2068
2069 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2070 goto end;
2071 }
2072 DBG("Connection on client_socket");
2073
2074 /* Blocking call, waiting for transmission */
2075 sock = lttcomm_accept_unix_sock(client_socket);
2076 if (sock <= 0) {
2077 WARN("On accept");
2078 goto end;
2079 }
2080 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
2081 if (ret < 0) {
2082 perror("fcntl O_NONBLOCK");
2083 goto end;
2084 }
2085
2086 /* update the polling structure to poll on the established socket */
2087 consumer_sockpoll[1].fd = sock;
2088 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2089
2090 while (1) {
2091 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2092 goto end;
2093 }
2094 DBG("Incoming command on sock");
2095 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
2096 if (ret == -ENOENT) {
2097 DBG("Received STOP command");
2098 goto end;
2099 }
2100 if (ret <= 0) {
2101 /*
2102 * This could simply be a session daemon quitting. Don't output
2103 * ERR() here.
2104 */
2105 DBG("Communication interrupted on command socket");
2106 goto end;
2107 }
2108 if (consumer_quit) {
2109 DBG("consumer_thread_receive_fds received quit from signal");
2110 goto end;
2111 }
2112 DBG("received fds on sock");
2113 }
2114 end:
2115 DBG("consumer_thread_receive_fds exiting");
2116
2117 /*
2118 * when all fds have hung up, the polling thread
2119 * can exit cleanly
2120 */
2121 consumer_quit = 1;
2122
2123 /*
2124 * 2s of grace period, if no polling events occur during
2125 * this period, the polling thread will exit even if there
2126 * are still open FDs (should not happen, but safety mechanism).
2127 */
2128 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
2129
2130 /*
2131 * Wake-up the other end by writing a null byte in the pipe
2132 * (non-blocking). Important note: Because writing into the
2133 * pipe is non-blocking (and therefore we allow dropping wakeup
2134 * data, as long as there is wakeup data present in the pipe
2135 * buffer to wake up the other end), the other end should
2136 * perform the following sequence for waiting:
2137 * 1) empty the pipe (reads).
2138 * 2) perform update operation.
2139 * 3) wait on the pipe (poll).
2140 */
2141 do {
2142 ret = write(ctx->consumer_poll_pipe[1], "", 1);
2143 } while (ret < 0 && errno == EINTR);
2144 rcu_unregister_thread();
2145 return NULL;
2146 }
2147
2148 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
2149 struct lttng_consumer_local_data *ctx)
2150 {
2151 switch (consumer_data.type) {
2152 case LTTNG_CONSUMER_KERNEL:
2153 return lttng_kconsumer_read_subbuffer(stream, ctx);
2154 case LTTNG_CONSUMER32_UST:
2155 case LTTNG_CONSUMER64_UST:
2156 return lttng_ustconsumer_read_subbuffer(stream, ctx);
2157 default:
2158 ERR("Unknown consumer_data type");
2159 assert(0);
2160 return -ENOSYS;
2161 }
2162 }
2163
2164 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
2165 {
2166 switch (consumer_data.type) {
2167 case LTTNG_CONSUMER_KERNEL:
2168 return lttng_kconsumer_on_recv_stream(stream);
2169 case LTTNG_CONSUMER32_UST:
2170 case LTTNG_CONSUMER64_UST:
2171 return lttng_ustconsumer_on_recv_stream(stream);
2172 default:
2173 ERR("Unknown consumer_data type");
2174 assert(0);
2175 return -ENOSYS;
2176 }
2177 }
2178
2179 /*
2180 * Allocate and set consumer data hash tables.
2181 */
2182 void lttng_consumer_init(void)
2183 {
2184 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2185 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2186 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2187 }
2188
2189 /*
2190 * Process the ADD_RELAYD command receive by a consumer.
2191 *
2192 * This will create a relayd socket pair and add it to the relayd hash table.
2193 * The caller MUST acquire a RCU read side lock before calling it.
2194 */
2195 int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
2196 struct lttng_consumer_local_data *ctx, int sock,
2197 struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
2198 {
2199 int fd, ret = -1;
2200 struct consumer_relayd_sock_pair *relayd;
2201
2202 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
2203
2204 /* Get relayd reference if exists. */
2205 relayd = consumer_find_relayd(net_seq_idx);
2206 if (relayd == NULL) {
2207 /* Not found. Allocate one. */
2208 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
2209 if (relayd == NULL) {
2210 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
2211 goto error;
2212 }
2213 }
2214
2215 /* Poll on consumer socket. */
2216 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2217 ret = -EINTR;
2218 goto error;
2219 }
2220
2221 /* Get relayd socket from session daemon */
2222 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
2223 if (ret != sizeof(fd)) {
2224 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
2225 ret = -1;
2226 goto error;
2227 }
2228
2229 /* Copy socket information and received FD */
2230 switch (sock_type) {
2231 case LTTNG_STREAM_CONTROL:
2232 /* Copy received lttcomm socket */
2233 lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
2234 ret = lttcomm_create_sock(&relayd->control_sock);
2235 if (ret < 0) {
2236 goto error;
2237 }
2238
2239 /* Close the created socket fd which is useless */
2240 close(relayd->control_sock.fd);
2241
2242 /* Assign new file descriptor */
2243 relayd->control_sock.fd = fd;
2244 break;
2245 case LTTNG_STREAM_DATA:
2246 /* Copy received lttcomm socket */
2247 lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
2248 ret = lttcomm_create_sock(&relayd->data_sock);
2249 if (ret < 0) {
2250 goto error;
2251 }
2252
2253 /* Close the created socket fd which is useless */
2254 close(relayd->data_sock.fd);
2255
2256 /* Assign new file descriptor */
2257 relayd->data_sock.fd = fd;
2258 break;
2259 default:
2260 ERR("Unknown relayd socket type (%d)", sock_type);
2261 goto error;
2262 }
2263
2264 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
2265 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
2266 relayd->net_seq_idx, fd);
2267
2268 /*
2269 * Add relayd socket pair to consumer data hashtable. If object already
2270 * exists or on error, the function gracefully returns.
2271 */
2272 consumer_add_relayd(relayd);
2273
2274 /* All good! */
2275 ret = 0;
2276
2277 error:
2278 return ret;
2279 }
This page took 0.114518 seconds and 4 git commands to generate.