Fix: consumer relay sender RCU usage
[lttng-tools.git] / src / common / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/relayd.h>
34 #include <common/sessiond-comm/sessiond-comm.h>
35 #include <common/kernel-consumer/kernel-consumer.h>
36 #include <common/relayd/relayd.h>
37 #include <common/ust-consumer/ust-consumer.h>
38
39 #include "consumer.h"
40
41 struct lttng_consumer_global_data consumer_data = {
42 .stream_count = 0,
43 .need_update = 1,
44 .type = LTTNG_CONSUMER_UNKNOWN,
45 };
46
47 /* timeout parameter, to control the polling thread grace period. */
48 int consumer_poll_timeout = -1;
49
50 /*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56 volatile int consumer_quit = 0;
57
58 /*
59 * Find a stream. The consumer_data.lock must be locked during this
60 * call.
61 */
62 static struct lttng_consumer_stream *consumer_find_stream(int key)
63 {
64 struct lttng_ht_iter iter;
65 struct lttng_ht_node_ulong *node;
66 struct lttng_consumer_stream *stream = NULL;
67
68 /* Negative keys are lookup failures */
69 if (key < 0)
70 return NULL;
71
72 rcu_read_lock();
73
74 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
75 &iter);
76 node = lttng_ht_iter_get_node_ulong(&iter);
77 if (node != NULL) {
78 stream = caa_container_of(node, struct lttng_consumer_stream, node);
79 }
80
81 rcu_read_unlock();
82
83 return stream;
84 }
85
86 static void consumer_steal_stream_key(int key)
87 {
88 struct lttng_consumer_stream *stream;
89
90 rcu_read_lock();
91 stream = consumer_find_stream(key);
92 if (stream) {
93 stream->key = -1;
94 /*
95 * We don't want the lookup to match, but we still need
96 * to iterate on this stream when iterating over the hash table. Just
97 * change the node key.
98 */
99 stream->node.key = -1;
100 }
101 rcu_read_unlock();
102 }
103
104 static struct lttng_consumer_channel *consumer_find_channel(int key)
105 {
106 struct lttng_ht_iter iter;
107 struct lttng_ht_node_ulong *node;
108 struct lttng_consumer_channel *channel = NULL;
109
110 /* Negative keys are lookup failures */
111 if (key < 0)
112 return NULL;
113
114 rcu_read_lock();
115
116 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
117 &iter);
118 node = lttng_ht_iter_get_node_ulong(&iter);
119 if (node != NULL) {
120 channel = caa_container_of(node, struct lttng_consumer_channel, node);
121 }
122
123 rcu_read_unlock();
124
125 return channel;
126 }
127
128 static void consumer_steal_channel_key(int key)
129 {
130 struct lttng_consumer_channel *channel;
131
132 rcu_read_lock();
133 channel = consumer_find_channel(key);
134 if (channel) {
135 channel->key = -1;
136 /*
137 * We don't want the lookup to match, but we still need
138 * to iterate on this channel when iterating over the hash table. Just
139 * change the node key.
140 */
141 channel->node.key = -1;
142 }
143 rcu_read_unlock();
144 }
145
146 static
147 void consumer_free_stream(struct rcu_head *head)
148 {
149 struct lttng_ht_node_ulong *node =
150 caa_container_of(head, struct lttng_ht_node_ulong, head);
151 struct lttng_consumer_stream *stream =
152 caa_container_of(node, struct lttng_consumer_stream, node);
153
154 free(stream);
155 }
156
157 /*
158 * RCU protected relayd socket pair free.
159 */
160 static void consumer_rcu_free_relayd(struct rcu_head *head)
161 {
162 struct lttng_ht_node_ulong *node =
163 caa_container_of(head, struct lttng_ht_node_ulong, head);
164 struct consumer_relayd_sock_pair *relayd =
165 caa_container_of(node, struct consumer_relayd_sock_pair, node);
166
167 free(relayd);
168 }
169
170 /*
171 * Destroy and free relayd socket pair object.
172 *
173 * This function MUST be called with the consumer_data lock acquired.
174 */
175 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
176 {
177 int ret;
178 struct lttng_ht_iter iter;
179
180 DBG("Consumer destroy and close relayd socket pair");
181
182 iter.iter.node = &relayd->node.node;
183 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
184 assert(!ret);
185
186 /* Close all sockets */
187 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
188 (void) relayd_close(&relayd->control_sock);
189 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
190 (void) relayd_close(&relayd->data_sock);
191
192 /* RCU free() call */
193 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
194 }
195
196 /*
197 * Remove a stream from the global list protected by a mutex. This
198 * function is also responsible for freeing its data structures.
199 */
200 void consumer_del_stream(struct lttng_consumer_stream *stream)
201 {
202 int ret;
203 struct lttng_ht_iter iter;
204 struct lttng_consumer_channel *free_chan = NULL;
205 struct consumer_relayd_sock_pair *relayd;
206
207 assert(stream);
208
209 pthread_mutex_lock(&consumer_data.lock);
210
211 switch (consumer_data.type) {
212 case LTTNG_CONSUMER_KERNEL:
213 if (stream->mmap_base != NULL) {
214 ret = munmap(stream->mmap_base, stream->mmap_len);
215 if (ret != 0) {
216 perror("munmap");
217 }
218 }
219 break;
220 case LTTNG_CONSUMER32_UST:
221 case LTTNG_CONSUMER64_UST:
222 lttng_ustconsumer_del_stream(stream);
223 break;
224 default:
225 ERR("Unknown consumer_data type");
226 assert(0);
227 goto end;
228 }
229
230 rcu_read_lock();
231 iter.iter.node = &stream->node.node;
232 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
233 assert(!ret);
234
235 rcu_read_unlock();
236
237 if (consumer_data.stream_count <= 0) {
238 goto end;
239 }
240 consumer_data.stream_count--;
241 if (!stream) {
242 goto end;
243 }
244 if (stream->out_fd >= 0) {
245 ret = close(stream->out_fd);
246 if (ret) {
247 PERROR("close");
248 }
249 }
250 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
251 ret = close(stream->wait_fd);
252 if (ret) {
253 PERROR("close");
254 }
255 }
256 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
257 ret = close(stream->shm_fd);
258 if (ret) {
259 PERROR("close");
260 }
261 }
262
263 /* Check and cleanup relayd */
264 rcu_read_lock();
265 relayd = consumer_find_relayd(stream->net_seq_idx);
266 if (relayd != NULL) {
267 uatomic_dec(&relayd->refcount);
268 assert(uatomic_read(&relayd->refcount) >= 0);
269 if (uatomic_read(&relayd->refcount) == 0) {
270 /* Refcount of the relayd struct is 0, destroy it */
271 consumer_destroy_relayd(relayd);
272 }
273 }
274 rcu_read_unlock();
275
276 if (!--stream->chan->refcount) {
277 free_chan = stream->chan;
278 }
279
280
281 call_rcu(&stream->node.head, consumer_free_stream);
282 end:
283 consumer_data.need_update = 1;
284 pthread_mutex_unlock(&consumer_data.lock);
285
286 if (free_chan)
287 consumer_del_channel(free_chan);
288 }
289
290 struct lttng_consumer_stream *consumer_allocate_stream(
291 int channel_key, int stream_key,
292 int shm_fd, int wait_fd,
293 enum lttng_consumer_stream_state state,
294 uint64_t mmap_len,
295 enum lttng_event_output output,
296 const char *path_name,
297 uid_t uid,
298 gid_t gid,
299 int net_index,
300 int metadata_flag)
301 {
302 struct lttng_consumer_stream *stream;
303 int ret;
304
305 stream = zmalloc(sizeof(*stream));
306 if (stream == NULL) {
307 perror("malloc struct lttng_consumer_stream");
308 goto end;
309 }
310 stream->chan = consumer_find_channel(channel_key);
311 if (!stream->chan) {
312 perror("Unable to find channel key");
313 goto end;
314 }
315 stream->chan->refcount++;
316 stream->key = stream_key;
317 stream->shm_fd = shm_fd;
318 stream->wait_fd = wait_fd;
319 stream->out_fd = -1;
320 stream->out_fd_offset = 0;
321 stream->state = state;
322 stream->mmap_len = mmap_len;
323 stream->mmap_base = NULL;
324 stream->output = output;
325 stream->uid = uid;
326 stream->gid = gid;
327 stream->net_seq_idx = net_index;
328 stream->metadata_flag = metadata_flag;
329 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
330 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
331 lttng_ht_node_init_ulong(&stream->node, stream->key);
332 lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
333
334 switch (consumer_data.type) {
335 case LTTNG_CONSUMER_KERNEL:
336 break;
337 case LTTNG_CONSUMER32_UST:
338 case LTTNG_CONSUMER64_UST:
339 stream->cpu = stream->chan->cpucount++;
340 ret = lttng_ustconsumer_allocate_stream(stream);
341 if (ret) {
342 free(stream);
343 return NULL;
344 }
345 break;
346 default:
347 ERR("Unknown consumer_data type");
348 assert(0);
349 goto end;
350 }
351 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
352 stream->path_name, stream->key,
353 stream->shm_fd,
354 stream->wait_fd,
355 (unsigned long long) stream->mmap_len,
356 stream->out_fd,
357 stream->net_seq_idx);
358 end:
359 return stream;
360 }
361
362 /*
363 * Add a stream to the global list protected by a mutex.
364 */
365 int consumer_add_stream(struct lttng_consumer_stream *stream)
366 {
367 int ret = 0;
368 struct lttng_ht_node_ulong *node;
369 struct lttng_ht_iter iter;
370 struct consumer_relayd_sock_pair *relayd;
371
372 pthread_mutex_lock(&consumer_data.lock);
373 /* Steal stream identifier, for UST */
374 consumer_steal_stream_key(stream->key);
375
376 rcu_read_lock();
377 lttng_ht_lookup(consumer_data.stream_ht,
378 (void *)((unsigned long) stream->key), &iter);
379 node = lttng_ht_iter_get_node_ulong(&iter);
380 if (node != NULL) {
381 rcu_read_unlock();
382 /* Stream already exist. Ignore the insertion */
383 goto end;
384 }
385
386 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
387
388 /* Check and cleanup relayd */
389 relayd = consumer_find_relayd(stream->net_seq_idx);
390 if (relayd != NULL) {
391 uatomic_inc(&relayd->refcount);
392 }
393 rcu_read_unlock();
394
395 /* Update consumer data */
396 consumer_data.stream_count++;
397 consumer_data.need_update = 1;
398
399 switch (consumer_data.type) {
400 case LTTNG_CONSUMER_KERNEL:
401 break;
402 case LTTNG_CONSUMER32_UST:
403 case LTTNG_CONSUMER64_UST:
404 /* Streams are in CPU number order (we rely on this) */
405 stream->cpu = stream->chan->nr_streams++;
406 break;
407 default:
408 ERR("Unknown consumer_data type");
409 assert(0);
410 goto end;
411 }
412
413 end:
414 pthread_mutex_unlock(&consumer_data.lock);
415
416 return ret;
417 }
418
419 /*
420 * Add relayd socket to global consumer data hashtable.
421 */
422 int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
423 {
424 int ret = 0;
425 struct lttng_ht_node_ulong *node;
426 struct lttng_ht_iter iter;
427
428 if (relayd == NULL) {
429 ret = -1;
430 goto end;
431 }
432
433 rcu_read_lock();
434
435 lttng_ht_lookup(consumer_data.relayd_ht,
436 (void *)((unsigned long) relayd->net_seq_idx), &iter);
437 node = lttng_ht_iter_get_node_ulong(&iter);
438 if (node != NULL) {
439 rcu_read_unlock();
440 /* Relayd already exist. Ignore the insertion */
441 goto end;
442 }
443 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
444
445 rcu_read_unlock();
446
447 end:
448 return ret;
449 }
450
451 /*
452 * Allocate and return a consumer relayd socket.
453 */
454 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
455 int net_seq_idx)
456 {
457 struct consumer_relayd_sock_pair *obj = NULL;
458
459 /* Negative net sequence index is a failure */
460 if (net_seq_idx < 0) {
461 goto error;
462 }
463
464 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
465 if (obj == NULL) {
466 PERROR("zmalloc relayd sock");
467 goto error;
468 }
469
470 obj->net_seq_idx = net_seq_idx;
471 obj->refcount = 0;
472 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
473 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
474
475 error:
476 return obj;
477 }
478
479 /*
480 * Find a relayd socket pair in the global consumer data.
481 *
482 * Return the object if found else NULL.
483 * RCU read-side lock must be held across this call and while using the
484 * returned object.
485 */
486 struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
487 {
488 struct lttng_ht_iter iter;
489 struct lttng_ht_node_ulong *node;
490 struct consumer_relayd_sock_pair *relayd = NULL;
491
492 /* Negative keys are lookup failures */
493 if (key < 0) {
494 goto error;
495 }
496
497 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
498 &iter);
499 node = lttng_ht_iter_get_node_ulong(&iter);
500 if (node != NULL) {
501 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
502 }
503
504 error:
505 return relayd;
506 }
507
508 /*
509 * Handle stream for relayd transmission if the stream applies for network
510 * streaming where the net sequence index is set.
511 *
512 * Return destination file descriptor or negative value on error.
513 */
514 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
515 size_t data_size)
516 {
517 int outfd = -1, ret;
518 struct consumer_relayd_sock_pair *relayd;
519 struct lttcomm_relayd_data_hdr data_hdr;
520
521 /* Safety net */
522 assert(stream);
523
524 /* Reset data header */
525 memset(&data_hdr, 0, sizeof(data_hdr));
526
527 rcu_read_lock();
528 /* Get relayd reference of the stream. */
529 relayd = consumer_find_relayd(stream->net_seq_idx);
530 if (relayd == NULL) {
531 /* Stream is either local or corrupted */
532 goto error;
533 }
534
535 DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
536 if (stream->metadata_flag) {
537 /* Caller MUST acquire the relayd control socket lock */
538 ret = relayd_send_metadata(&relayd->control_sock, data_size);
539 if (ret < 0) {
540 goto error;
541 }
542
543 /* Metadata are always sent on the control socket. */
544 outfd = relayd->control_sock.fd;
545 } else {
546 /* Set header with stream information */
547 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
548 data_hdr.data_size = htobe32(data_size);
549 /* Other fields are zeroed previously */
550
551 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
552 sizeof(data_hdr));
553 if (ret < 0) {
554 goto error;
555 }
556
557 /* Set to go on data socket */
558 outfd = relayd->data_sock.fd;
559 }
560
561 error:
562 rcu_read_unlock();
563 return outfd;
564 }
565
566 /*
567 * Update a stream according to what we just received.
568 */
569 void consumer_change_stream_state(int stream_key,
570 enum lttng_consumer_stream_state state)
571 {
572 struct lttng_consumer_stream *stream;
573
574 pthread_mutex_lock(&consumer_data.lock);
575 stream = consumer_find_stream(stream_key);
576 if (stream) {
577 stream->state = state;
578 }
579 consumer_data.need_update = 1;
580 pthread_mutex_unlock(&consumer_data.lock);
581 }
582
583 static
584 void consumer_free_channel(struct rcu_head *head)
585 {
586 struct lttng_ht_node_ulong *node =
587 caa_container_of(head, struct lttng_ht_node_ulong, head);
588 struct lttng_consumer_channel *channel =
589 caa_container_of(node, struct lttng_consumer_channel, node);
590
591 free(channel);
592 }
593
594 /*
595 * Remove a channel from the global list protected by a mutex. This
596 * function is also responsible for freeing its data structures.
597 */
598 void consumer_del_channel(struct lttng_consumer_channel *channel)
599 {
600 int ret;
601 struct lttng_ht_iter iter;
602
603 pthread_mutex_lock(&consumer_data.lock);
604
605 switch (consumer_data.type) {
606 case LTTNG_CONSUMER_KERNEL:
607 break;
608 case LTTNG_CONSUMER32_UST:
609 case LTTNG_CONSUMER64_UST:
610 lttng_ustconsumer_del_channel(channel);
611 break;
612 default:
613 ERR("Unknown consumer_data type");
614 assert(0);
615 goto end;
616 }
617
618 rcu_read_lock();
619 iter.iter.node = &channel->node.node;
620 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
621 assert(!ret);
622 rcu_read_unlock();
623
624 if (channel->mmap_base != NULL) {
625 ret = munmap(channel->mmap_base, channel->mmap_len);
626 if (ret != 0) {
627 perror("munmap");
628 }
629 }
630 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
631 ret = close(channel->wait_fd);
632 if (ret) {
633 PERROR("close");
634 }
635 }
636 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
637 ret = close(channel->shm_fd);
638 if (ret) {
639 PERROR("close");
640 }
641 }
642
643 call_rcu(&channel->node.head, consumer_free_channel);
644 end:
645 pthread_mutex_unlock(&consumer_data.lock);
646 }
647
648 struct lttng_consumer_channel *consumer_allocate_channel(
649 int channel_key,
650 int shm_fd, int wait_fd,
651 uint64_t mmap_len,
652 uint64_t max_sb_size)
653 {
654 struct lttng_consumer_channel *channel;
655 int ret;
656
657 channel = zmalloc(sizeof(*channel));
658 if (channel == NULL) {
659 perror("malloc struct lttng_consumer_channel");
660 goto end;
661 }
662 channel->key = channel_key;
663 channel->shm_fd = shm_fd;
664 channel->wait_fd = wait_fd;
665 channel->mmap_len = mmap_len;
666 channel->max_sb_size = max_sb_size;
667 channel->refcount = 0;
668 channel->nr_streams = 0;
669 lttng_ht_node_init_ulong(&channel->node, channel->key);
670
671 switch (consumer_data.type) {
672 case LTTNG_CONSUMER_KERNEL:
673 channel->mmap_base = NULL;
674 channel->mmap_len = 0;
675 break;
676 case LTTNG_CONSUMER32_UST:
677 case LTTNG_CONSUMER64_UST:
678 ret = lttng_ustconsumer_allocate_channel(channel);
679 if (ret) {
680 free(channel);
681 return NULL;
682 }
683 break;
684 default:
685 ERR("Unknown consumer_data type");
686 assert(0);
687 goto end;
688 }
689 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
690 channel->key, channel->shm_fd, channel->wait_fd,
691 (unsigned long long) channel->mmap_len,
692 (unsigned long long) channel->max_sb_size);
693 end:
694 return channel;
695 }
696
697 /*
698 * Add a channel to the global list protected by a mutex.
699 */
700 int consumer_add_channel(struct lttng_consumer_channel *channel)
701 {
702 struct lttng_ht_node_ulong *node;
703 struct lttng_ht_iter iter;
704
705 pthread_mutex_lock(&consumer_data.lock);
706 /* Steal channel identifier, for UST */
707 consumer_steal_channel_key(channel->key);
708 rcu_read_lock();
709
710 lttng_ht_lookup(consumer_data.channel_ht,
711 (void *)((unsigned long) channel->key), &iter);
712 node = lttng_ht_iter_get_node_ulong(&iter);
713 if (node != NULL) {
714 /* Channel already exist. Ignore the insertion */
715 goto end;
716 }
717
718 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
719
720 end:
721 rcu_read_unlock();
722 pthread_mutex_unlock(&consumer_data.lock);
723
724 return 0;
725 }
726
727 /*
728 * Allocate the pollfd structure and the local view of the out fds to avoid
729 * doing a lookup in the linked list and concurrency issues when writing is
730 * needed. Called with consumer_data.lock held.
731 *
732 * Returns the number of fds in the structures.
733 */
734 int consumer_update_poll_array(
735 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
736 struct lttng_consumer_stream **local_stream,
737 struct lttng_ht *metadata_ht)
738 {
739 int i = 0;
740 struct lttng_ht_iter iter;
741 struct lttng_consumer_stream *stream;
742
743 DBG("Updating poll fd array");
744 rcu_read_lock();
745 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
746 node.node) {
747 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
748 continue;
749 }
750 DBG("Active FD %d", stream->wait_fd);
751 (*pollfd)[i].fd = stream->wait_fd;
752 (*pollfd)[i].events = POLLIN | POLLPRI;
753 if (stream->metadata_flag && metadata_ht) {
754 lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node);
755 DBG("Active FD added to metadata hash table");
756 }
757 local_stream[i] = stream;
758 i++;
759 }
760 rcu_read_unlock();
761
762 /*
763 * Insert the consumer_poll_pipe at the end of the array and don't
764 * increment i so nb_fd is the number of real FD.
765 */
766 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
767 (*pollfd)[i].events = POLLIN | POLLPRI;
768 return i;
769 }
770
771 /*
772 * Poll on the should_quit pipe and the command socket return -1 on error and
773 * should exit, 0 if data is available on the command socket
774 */
775 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
776 {
777 int num_rdy;
778
779 restart:
780 num_rdy = poll(consumer_sockpoll, 2, -1);
781 if (num_rdy == -1) {
782 /*
783 * Restart interrupted system call.
784 */
785 if (errno == EINTR) {
786 goto restart;
787 }
788 perror("Poll error");
789 goto exit;
790 }
791 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
792 DBG("consumer_should_quit wake up");
793 goto exit;
794 }
795 return 0;
796
797 exit:
798 return -1;
799 }
800
801 /*
802 * Set the error socket.
803 */
804 void lttng_consumer_set_error_sock(
805 struct lttng_consumer_local_data *ctx, int sock)
806 {
807 ctx->consumer_error_socket = sock;
808 }
809
810 /*
811 * Set the command socket path.
812 */
813 void lttng_consumer_set_command_sock_path(
814 struct lttng_consumer_local_data *ctx, char *sock)
815 {
816 ctx->consumer_command_sock_path = sock;
817 }
818
819 /*
820 * Send return code to the session daemon.
821 * If the socket is not defined, we return 0, it is not a fatal error
822 */
823 int lttng_consumer_send_error(
824 struct lttng_consumer_local_data *ctx, int cmd)
825 {
826 if (ctx->consumer_error_socket > 0) {
827 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
828 sizeof(enum lttcomm_sessiond_command));
829 }
830
831 return 0;
832 }
833
834 /*
835 * Close all the tracefiles and stream fds, should be called when all instances
836 * are destroyed.
837 */
838 void lttng_consumer_cleanup(void)
839 {
840 struct lttng_ht_iter iter;
841 struct lttng_ht_node_ulong *node;
842
843 rcu_read_lock();
844
845 /*
846 * close all outfd. Called when there are no more threads running (after
847 * joining on the threads), no need to protect list iteration with mutex.
848 */
849 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
850 node) {
851 struct lttng_consumer_stream *stream =
852 caa_container_of(node, struct lttng_consumer_stream, node);
853 consumer_del_stream(stream);
854 }
855
856 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
857 node) {
858 struct lttng_consumer_channel *channel =
859 caa_container_of(node, struct lttng_consumer_channel, node);
860 consumer_del_channel(channel);
861 }
862
863 rcu_read_unlock();
864
865 lttng_ht_destroy(consumer_data.stream_ht);
866 lttng_ht_destroy(consumer_data.channel_ht);
867 }
868
869 /*
870 * Called from signal handler.
871 */
872 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
873 {
874 int ret;
875 consumer_quit = 1;
876 do {
877 ret = write(ctx->consumer_should_quit[1], "4", 1);
878 } while (ret < 0 && errno == EINTR);
879 if (ret < 0) {
880 perror("write consumer quit");
881 }
882 }
883
884 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
885 off_t orig_offset)
886 {
887 int outfd = stream->out_fd;
888
889 /*
890 * This does a blocking write-and-wait on any page that belongs to the
891 * subbuffer prior to the one we just wrote.
892 * Don't care about error values, as these are just hints and ways to
893 * limit the amount of page cache used.
894 */
895 if (orig_offset < stream->chan->max_sb_size) {
896 return;
897 }
898 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
899 stream->chan->max_sb_size,
900 SYNC_FILE_RANGE_WAIT_BEFORE
901 | SYNC_FILE_RANGE_WRITE
902 | SYNC_FILE_RANGE_WAIT_AFTER);
903 /*
904 * Give hints to the kernel about how we access the file:
905 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
906 * we write it.
907 *
908 * We need to call fadvise again after the file grows because the
909 * kernel does not seem to apply fadvise to non-existing parts of the
910 * file.
911 *
912 * Call fadvise _after_ having waited for the page writeback to
913 * complete because the dirty page writeback semantic is not well
914 * defined. So it can be expected to lead to lower throughput in
915 * streaming.
916 */
917 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
918 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
919 }
920
921 /*
922 * Initialise the necessary environnement :
923 * - create a new context
924 * - create the poll_pipe
925 * - create the should_quit pipe (for signal handler)
926 * - create the thread pipe (for splice)
927 *
928 * Takes a function pointer as argument, this function is called when data is
929 * available on a buffer. This function is responsible to do the
930 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
931 * buffer configuration and then kernctl_put_next_subbuf at the end.
932 *
933 * Returns a pointer to the new context or NULL on error.
934 */
935 struct lttng_consumer_local_data *lttng_consumer_create(
936 enum lttng_consumer_type type,
937 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
938 struct lttng_consumer_local_data *ctx),
939 int (*recv_channel)(struct lttng_consumer_channel *channel),
940 int (*recv_stream)(struct lttng_consumer_stream *stream),
941 int (*update_stream)(int stream_key, uint32_t state))
942 {
943 int ret, i;
944 struct lttng_consumer_local_data *ctx;
945
946 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
947 consumer_data.type == type);
948 consumer_data.type = type;
949
950 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
951 if (ctx == NULL) {
952 perror("allocating context");
953 goto error;
954 }
955
956 ctx->consumer_error_socket = -1;
957 /* assign the callbacks */
958 ctx->on_buffer_ready = buffer_ready;
959 ctx->on_recv_channel = recv_channel;
960 ctx->on_recv_stream = recv_stream;
961 ctx->on_update_stream = update_stream;
962
963 ret = pipe(ctx->consumer_poll_pipe);
964 if (ret < 0) {
965 perror("Error creating poll pipe");
966 goto error_poll_pipe;
967 }
968
969 /* set read end of the pipe to non-blocking */
970 ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
971 if (ret < 0) {
972 perror("fcntl O_NONBLOCK");
973 goto error_poll_fcntl;
974 }
975
976 /* set write end of the pipe to non-blocking */
977 ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
978 if (ret < 0) {
979 perror("fcntl O_NONBLOCK");
980 goto error_poll_fcntl;
981 }
982
983 ret = pipe(ctx->consumer_should_quit);
984 if (ret < 0) {
985 perror("Error creating recv pipe");
986 goto error_quit_pipe;
987 }
988
989 ret = pipe(ctx->consumer_thread_pipe);
990 if (ret < 0) {
991 perror("Error creating thread pipe");
992 goto error_thread_pipe;
993 }
994
995 return ctx;
996
997
998 error_thread_pipe:
999 for (i = 0; i < 2; i++) {
1000 int err;
1001
1002 err = close(ctx->consumer_should_quit[i]);
1003 if (err) {
1004 PERROR("close");
1005 }
1006 }
1007 error_poll_fcntl:
1008 error_quit_pipe:
1009 for (i = 0; i < 2; i++) {
1010 int err;
1011
1012 err = close(ctx->consumer_poll_pipe[i]);
1013 if (err) {
1014 PERROR("close");
1015 }
1016 }
1017 error_poll_pipe:
1018 free(ctx);
1019 error:
1020 return NULL;
1021 }
1022
1023 /*
1024 * Close all fds associated with the instance and free the context.
1025 */
1026 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1027 {
1028 int ret;
1029
1030 ret = close(ctx->consumer_error_socket);
1031 if (ret) {
1032 PERROR("close");
1033 }
1034 ret = close(ctx->consumer_thread_pipe[0]);
1035 if (ret) {
1036 PERROR("close");
1037 }
1038 ret = close(ctx->consumer_thread_pipe[1]);
1039 if (ret) {
1040 PERROR("close");
1041 }
1042 ret = close(ctx->consumer_poll_pipe[0]);
1043 if (ret) {
1044 PERROR("close");
1045 }
1046 ret = close(ctx->consumer_poll_pipe[1]);
1047 if (ret) {
1048 PERROR("close");
1049 }
1050 ret = close(ctx->consumer_should_quit[0]);
1051 if (ret) {
1052 PERROR("close");
1053 }
1054 ret = close(ctx->consumer_should_quit[1]);
1055 if (ret) {
1056 PERROR("close");
1057 }
1058 unlink(ctx->consumer_command_sock_path);
1059 free(ctx);
1060 }
1061
1062 /*
1063 * Mmap the ring buffer, read it and write the data to the tracefile.
1064 *
1065 * Returns the number of bytes written
1066 */
1067 ssize_t lttng_consumer_on_read_subbuffer_mmap(
1068 struct lttng_consumer_local_data *ctx,
1069 struct lttng_consumer_stream *stream, unsigned long len)
1070 {
1071 switch (consumer_data.type) {
1072 case LTTNG_CONSUMER_KERNEL:
1073 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
1074 case LTTNG_CONSUMER32_UST:
1075 case LTTNG_CONSUMER64_UST:
1076 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
1077 default:
1078 ERR("Unknown consumer_data type");
1079 assert(0);
1080 }
1081
1082 return 0;
1083 }
1084
1085 /*
1086 * Splice the data from the ring buffer to the tracefile.
1087 *
1088 * Returns the number of bytes spliced.
1089 */
1090 ssize_t lttng_consumer_on_read_subbuffer_splice(
1091 struct lttng_consumer_local_data *ctx,
1092 struct lttng_consumer_stream *stream, unsigned long len)
1093 {
1094 switch (consumer_data.type) {
1095 case LTTNG_CONSUMER_KERNEL:
1096 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
1097 case LTTNG_CONSUMER32_UST:
1098 case LTTNG_CONSUMER64_UST:
1099 return -ENOSYS;
1100 default:
1101 ERR("Unknown consumer_data type");
1102 assert(0);
1103 return -ENOSYS;
1104 }
1105
1106 }
1107
1108 /*
1109 * Take a snapshot for a specific fd
1110 *
1111 * Returns 0 on success, < 0 on error
1112 */
1113 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1114 struct lttng_consumer_stream *stream)
1115 {
1116 switch (consumer_data.type) {
1117 case LTTNG_CONSUMER_KERNEL:
1118 return lttng_kconsumer_take_snapshot(ctx, stream);
1119 case LTTNG_CONSUMER32_UST:
1120 case LTTNG_CONSUMER64_UST:
1121 return lttng_ustconsumer_take_snapshot(ctx, stream);
1122 default:
1123 ERR("Unknown consumer_data type");
1124 assert(0);
1125 return -ENOSYS;
1126 }
1127
1128 }
1129
1130 /*
1131 * Get the produced position
1132 *
1133 * Returns 0 on success, < 0 on error
1134 */
1135 int lttng_consumer_get_produced_snapshot(
1136 struct lttng_consumer_local_data *ctx,
1137 struct lttng_consumer_stream *stream,
1138 unsigned long *pos)
1139 {
1140 switch (consumer_data.type) {
1141 case LTTNG_CONSUMER_KERNEL:
1142 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
1143 case LTTNG_CONSUMER32_UST:
1144 case LTTNG_CONSUMER64_UST:
1145 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1146 default:
1147 ERR("Unknown consumer_data type");
1148 assert(0);
1149 return -ENOSYS;
1150 }
1151 }
1152
1153 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1154 int sock, struct pollfd *consumer_sockpoll)
1155 {
1156 switch (consumer_data.type) {
1157 case LTTNG_CONSUMER_KERNEL:
1158 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1159 case LTTNG_CONSUMER32_UST:
1160 case LTTNG_CONSUMER64_UST:
1161 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1162 default:
1163 ERR("Unknown consumer_data type");
1164 assert(0);
1165 return -ENOSYS;
1166 }
1167 }
1168
1169 /*
1170 * This thread polls the fds in the set to consume the data and write
1171 * it to tracefile if necessary.
1172 */
1173 void *lttng_consumer_thread_poll_fds(void *data)
1174 {
1175 int num_rdy, num_hup, high_prio, ret, i;
1176 struct pollfd *pollfd = NULL;
1177 /* local view of the streams */
1178 struct lttng_consumer_stream **local_stream = NULL;
1179 /* local view of consumer_data.fds_count */
1180 int nb_fd = 0;
1181 struct lttng_consumer_local_data *ctx = data;
1182 struct lttng_ht *metadata_ht;
1183 struct lttng_ht_iter iter;
1184 struct lttng_ht_node_ulong *node;
1185 struct lttng_consumer_stream *metadata_stream;
1186 ssize_t len;
1187
1188 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1189
1190 rcu_register_thread();
1191
1192 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
1193
1194 while (1) {
1195 high_prio = 0;
1196 num_hup = 0;
1197
1198 /*
1199 * the fds set has been updated, we need to update our
1200 * local array as well
1201 */
1202 pthread_mutex_lock(&consumer_data.lock);
1203 if (consumer_data.need_update) {
1204 if (pollfd != NULL) {
1205 free(pollfd);
1206 pollfd = NULL;
1207 }
1208 if (local_stream != NULL) {
1209 free(local_stream);
1210 local_stream = NULL;
1211 }
1212
1213 /* allocate for all fds + 1 for the consumer_poll_pipe */
1214 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
1215 if (pollfd == NULL) {
1216 perror("pollfd malloc");
1217 pthread_mutex_unlock(&consumer_data.lock);
1218 goto end;
1219 }
1220
1221 /* allocate for all fds + 1 for the consumer_poll_pipe */
1222 local_stream = zmalloc((consumer_data.stream_count + 1) *
1223 sizeof(struct lttng_consumer_stream));
1224 if (local_stream == NULL) {
1225 perror("local_stream malloc");
1226 pthread_mutex_unlock(&consumer_data.lock);
1227 goto end;
1228 }
1229 ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
1230 metadata_ht);
1231 if (ret < 0) {
1232 ERR("Error in allocating pollfd or local_outfds");
1233 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
1234 pthread_mutex_unlock(&consumer_data.lock);
1235 goto end;
1236 }
1237 nb_fd = ret;
1238 consumer_data.need_update = 0;
1239 }
1240 pthread_mutex_unlock(&consumer_data.lock);
1241
1242 /* No FDs and consumer_quit, consumer_cleanup the thread */
1243 if (nb_fd == 0 && consumer_quit == 1) {
1244 goto end;
1245 }
1246 /* poll on the array of fds */
1247 restart:
1248 DBG("polling on %d fd", nb_fd + 1);
1249 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
1250 DBG("poll num_rdy : %d", num_rdy);
1251 if (num_rdy == -1) {
1252 /*
1253 * Restart interrupted system call.
1254 */
1255 if (errno == EINTR) {
1256 goto restart;
1257 }
1258 perror("Poll error");
1259 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
1260 goto end;
1261 } else if (num_rdy == 0) {
1262 DBG("Polling thread timed out");
1263 goto end;
1264 }
1265
1266 /*
1267 * If the consumer_poll_pipe triggered poll go directly to the
1268 * beginning of the loop to update the array. We want to prioritize
1269 * array update over low-priority reads.
1270 */
1271 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
1272 size_t pipe_readlen;
1273 char tmp;
1274
1275 DBG("consumer_poll_pipe wake up");
1276 /* Consume 1 byte of pipe data */
1277 do {
1278 pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
1279 } while (pipe_readlen == -1 && errno == EINTR);
1280 continue;
1281 }
1282
1283 /* Take care of high priority channels first. */
1284 for (i = 0; i < nb_fd; i++) {
1285 /* Lookup for metadata which is the highest priority */
1286 lttng_ht_lookup(metadata_ht,
1287 (void *)((unsigned long) pollfd[i].fd), &iter);
1288 node = lttng_ht_iter_get_node_ulong(&iter);
1289 if (node != NULL &&
1290 (pollfd[i].revents & (POLLIN | POLLPRI))) {
1291 DBG("Urgent metadata read on fd %d", pollfd[i].fd);
1292 metadata_stream = caa_container_of(node,
1293 struct lttng_consumer_stream, waitfd_node);
1294 high_prio = 1;
1295 len = ctx->on_buffer_ready(metadata_stream, ctx);
1296 /* it's ok to have an unavailable sub-buffer */
1297 if (len < 0 && len != -EAGAIN) {
1298 goto end;
1299 } else if (len > 0) {
1300 metadata_stream->data_read = 1;
1301 }
1302 } else if (pollfd[i].revents & POLLPRI) {
1303 DBG("Urgent read on fd %d", pollfd[i].fd);
1304 high_prio = 1;
1305 len = ctx->on_buffer_ready(local_stream[i], ctx);
1306 /* it's ok to have an unavailable sub-buffer */
1307 if (len < 0 && len != -EAGAIN) {
1308 goto end;
1309 } else if (len > 0) {
1310 local_stream[i]->data_read = 1;
1311 }
1312 }
1313 }
1314
1315 /*
1316 * If we read high prio channel in this loop, try again
1317 * for more high prio data.
1318 */
1319 if (high_prio) {
1320 continue;
1321 }
1322
1323 /* Take care of low priority channels. */
1324 for (i = 0; i < nb_fd; i++) {
1325 if ((pollfd[i].revents & POLLIN) ||
1326 local_stream[i]->hangup_flush_done) {
1327 DBG("Normal read on fd %d", pollfd[i].fd);
1328 len = ctx->on_buffer_ready(local_stream[i], ctx);
1329 /* it's ok to have an unavailable sub-buffer */
1330 if (len < 0 && len != -EAGAIN) {
1331 goto end;
1332 } else if (len > 0) {
1333 local_stream[i]->data_read = 1;
1334 }
1335 }
1336 }
1337
1338 /* Handle hangup and errors */
1339 for (i = 0; i < nb_fd; i++) {
1340 if (!local_stream[i]->hangup_flush_done
1341 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
1342 && (consumer_data.type == LTTNG_CONSUMER32_UST
1343 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1344 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1345 pollfd[i].fd);
1346 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
1347 /* Attempt read again, for the data we just flushed. */
1348 local_stream[i]->data_read = 1;
1349 }
1350 /*
1351 * If the poll flag is HUP/ERR/NVAL and we have
1352 * read no data in this pass, we can remove the
1353 * stream from its hash table.
1354 */
1355 if ((pollfd[i].revents & POLLHUP)) {
1356 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
1357 if (!local_stream[i]->data_read) {
1358 if (local_stream[i]->metadata_flag) {
1359 iter.iter.node = &local_stream[i]->waitfd_node.node;
1360 ret = lttng_ht_del(metadata_ht, &iter);
1361 assert(!ret);
1362 }
1363 consumer_del_stream(local_stream[i]);
1364 num_hup++;
1365 }
1366 } else if (pollfd[i].revents & POLLERR) {
1367 ERR("Error returned in polling fd %d.", pollfd[i].fd);
1368 if (!local_stream[i]->data_read) {
1369 if (local_stream[i]->metadata_flag) {
1370 iter.iter.node = &local_stream[i]->waitfd_node.node;
1371 ret = lttng_ht_del(metadata_ht, &iter);
1372 assert(!ret);
1373 }
1374 consumer_del_stream(local_stream[i]);
1375 num_hup++;
1376 }
1377 } else if (pollfd[i].revents & POLLNVAL) {
1378 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
1379 if (!local_stream[i]->data_read) {
1380 if (local_stream[i]->metadata_flag) {
1381 iter.iter.node = &local_stream[i]->waitfd_node.node;
1382 ret = lttng_ht_del(metadata_ht, &iter);
1383 assert(!ret);
1384 }
1385 consumer_del_stream(local_stream[i]);
1386 num_hup++;
1387 }
1388 }
1389 local_stream[i]->data_read = 0;
1390 }
1391 }
1392 end:
1393 DBG("polling thread exiting");
1394 if (pollfd != NULL) {
1395 free(pollfd);
1396 pollfd = NULL;
1397 }
1398 if (local_stream != NULL) {
1399 free(local_stream);
1400 local_stream = NULL;
1401 }
1402 rcu_unregister_thread();
1403 return NULL;
1404 }
1405
1406 /*
1407 * This thread listens on the consumerd socket and receives the file
1408 * descriptors from the session daemon.
1409 */
1410 void *lttng_consumer_thread_receive_fds(void *data)
1411 {
1412 int sock, client_socket, ret;
1413 /*
1414 * structure to poll for incoming data on communication socket avoids
1415 * making blocking sockets.
1416 */
1417 struct pollfd consumer_sockpoll[2];
1418 struct lttng_consumer_local_data *ctx = data;
1419
1420 rcu_register_thread();
1421
1422 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
1423 unlink(ctx->consumer_command_sock_path);
1424 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
1425 if (client_socket < 0) {
1426 ERR("Cannot create command socket");
1427 goto end;
1428 }
1429
1430 ret = lttcomm_listen_unix_sock(client_socket);
1431 if (ret < 0) {
1432 goto end;
1433 }
1434
1435 DBG("Sending ready command to lttng-sessiond");
1436 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
1437 /* return < 0 on error, but == 0 is not fatal */
1438 if (ret < 0) {
1439 ERR("Error sending ready command to lttng-sessiond");
1440 goto end;
1441 }
1442
1443 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
1444 if (ret < 0) {
1445 perror("fcntl O_NONBLOCK");
1446 goto end;
1447 }
1448
1449 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1450 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
1451 consumer_sockpoll[0].events = POLLIN | POLLPRI;
1452 consumer_sockpoll[1].fd = client_socket;
1453 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1454
1455 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1456 goto end;
1457 }
1458 DBG("Connection on client_socket");
1459
1460 /* Blocking call, waiting for transmission */
1461 sock = lttcomm_accept_unix_sock(client_socket);
1462 if (sock <= 0) {
1463 WARN("On accept");
1464 goto end;
1465 }
1466 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
1467 if (ret < 0) {
1468 perror("fcntl O_NONBLOCK");
1469 goto end;
1470 }
1471
1472 /* update the polling structure to poll on the established socket */
1473 consumer_sockpoll[1].fd = sock;
1474 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1475
1476 while (1) {
1477 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1478 goto end;
1479 }
1480 DBG("Incoming command on sock");
1481 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1482 if (ret == -ENOENT) {
1483 DBG("Received STOP command");
1484 goto end;
1485 }
1486 if (ret < 0) {
1487 ERR("Communication interrupted on command socket");
1488 goto end;
1489 }
1490 if (consumer_quit) {
1491 DBG("consumer_thread_receive_fds received quit from signal");
1492 goto end;
1493 }
1494 DBG("received fds on sock");
1495 }
1496 end:
1497 DBG("consumer_thread_receive_fds exiting");
1498
1499 /*
1500 * when all fds have hung up, the polling thread
1501 * can exit cleanly
1502 */
1503 consumer_quit = 1;
1504
1505 /*
1506 * 2s of grace period, if no polling events occur during
1507 * this period, the polling thread will exit even if there
1508 * are still open FDs (should not happen, but safety mechanism).
1509 */
1510 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1511
1512 /*
1513 * Wake-up the other end by writing a null byte in the pipe
1514 * (non-blocking). Important note: Because writing into the
1515 * pipe is non-blocking (and therefore we allow dropping wakeup
1516 * data, as long as there is wakeup data present in the pipe
1517 * buffer to wake up the other end), the other end should
1518 * perform the following sequence for waiting:
1519 * 1) empty the pipe (reads).
1520 * 2) perform update operation.
1521 * 3) wait on the pipe (poll).
1522 */
1523 do {
1524 ret = write(ctx->consumer_poll_pipe[1], "", 1);
1525 } while (ret < 0 && errno == EINTR);
1526 rcu_unregister_thread();
1527 return NULL;
1528 }
1529
1530 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
1531 struct lttng_consumer_local_data *ctx)
1532 {
1533 switch (consumer_data.type) {
1534 case LTTNG_CONSUMER_KERNEL:
1535 return lttng_kconsumer_read_subbuffer(stream, ctx);
1536 case LTTNG_CONSUMER32_UST:
1537 case LTTNG_CONSUMER64_UST:
1538 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1539 default:
1540 ERR("Unknown consumer_data type");
1541 assert(0);
1542 return -ENOSYS;
1543 }
1544 }
1545
1546 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1547 {
1548 switch (consumer_data.type) {
1549 case LTTNG_CONSUMER_KERNEL:
1550 return lttng_kconsumer_on_recv_stream(stream);
1551 case LTTNG_CONSUMER32_UST:
1552 case LTTNG_CONSUMER64_UST:
1553 return lttng_ustconsumer_on_recv_stream(stream);
1554 default:
1555 ERR("Unknown consumer_data type");
1556 assert(0);
1557 return -ENOSYS;
1558 }
1559 }
1560
1561 /*
1562 * Allocate and set consumer data hash tables.
1563 */
1564 void lttng_consumer_init(void)
1565 {
1566 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1567 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1568 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1569 }
This page took 0.09265 seconds and 4 git commands to generate.