Add time validation to health check
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30
990570ed 31#include <common/common.h>
10a8a223 32#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 33#include <common/sessiond-comm/relayd.h>
10a8a223
DG
34#include <common/sessiond-comm/sessiond-comm.h>
35#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 36#include <common/relayd/relayd.h>
10a8a223
DG
37#include <common/ust-consumer/ust-consumer.h>
38
39#include "consumer.h"
3bd1e081
MD
40
41struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
42 .stream_count = 0,
43 .need_update = 1,
44 .type = LTTNG_CONSUMER_UNKNOWN,
45};
46
47/* timeout parameter, to control the polling thread grace period. */
48int consumer_poll_timeout = -1;
49
50/*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56volatile int consumer_quit = 0;
57
58/*
59 * Find a stream. The consumer_data.lock must be locked during this
60 * call.
61 */
62static struct lttng_consumer_stream *consumer_find_stream(int key)
63{
e4421fec
DG
64 struct lttng_ht_iter iter;
65 struct lttng_ht_node_ulong *node;
66 struct lttng_consumer_stream *stream = NULL;
3bd1e081 67
7ad0a0cb
MD
68 /* Negative keys are lookup failures */
69 if (key < 0)
70 return NULL;
e4421fec 71
6065ceec
DG
72 rcu_read_lock();
73
e4421fec
DG
74 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
75 &iter);
76 node = lttng_ht_iter_get_node_ulong(&iter);
77 if (node != NULL) {
78 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 79 }
e4421fec 80
6065ceec
DG
81 rcu_read_unlock();
82
e4421fec 83 return stream;
3bd1e081
MD
84}
85
7ad0a0cb
MD
86static void consumer_steal_stream_key(int key)
87{
88 struct lttng_consumer_stream *stream;
89
04253271 90 rcu_read_lock();
7ad0a0cb 91 stream = consumer_find_stream(key);
04253271 92 if (stream) {
7ad0a0cb 93 stream->key = -1;
04253271
MD
94 /*
95 * We don't want the lookup to match, but we still need
96 * to iterate on this stream when iterating over the hash table. Just
97 * change the node key.
98 */
99 stream->node.key = -1;
100 }
101 rcu_read_unlock();
7ad0a0cb
MD
102}
103
3bd1e081
MD
104static struct lttng_consumer_channel *consumer_find_channel(int key)
105{
e4421fec
DG
106 struct lttng_ht_iter iter;
107 struct lttng_ht_node_ulong *node;
108 struct lttng_consumer_channel *channel = NULL;
3bd1e081 109
7ad0a0cb
MD
110 /* Negative keys are lookup failures */
111 if (key < 0)
112 return NULL;
e4421fec 113
6065ceec
DG
114 rcu_read_lock();
115
e4421fec
DG
116 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
117 &iter);
118 node = lttng_ht_iter_get_node_ulong(&iter);
119 if (node != NULL) {
120 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 121 }
e4421fec 122
6065ceec
DG
123 rcu_read_unlock();
124
e4421fec 125 return channel;
3bd1e081
MD
126}
127
7ad0a0cb
MD
128static void consumer_steal_channel_key(int key)
129{
130 struct lttng_consumer_channel *channel;
131
04253271 132 rcu_read_lock();
7ad0a0cb 133 channel = consumer_find_channel(key);
04253271 134 if (channel) {
7ad0a0cb 135 channel->key = -1;
04253271
MD
136 /*
137 * We don't want the lookup to match, but we still need
138 * to iterate on this channel when iterating over the hash table. Just
139 * change the node key.
140 */
141 channel->node.key = -1;
142 }
143 rcu_read_unlock();
7ad0a0cb
MD
144}
145
702b1ea4
MD
146static
147void consumer_free_stream(struct rcu_head *head)
148{
149 struct lttng_ht_node_ulong *node =
150 caa_container_of(head, struct lttng_ht_node_ulong, head);
151 struct lttng_consumer_stream *stream =
152 caa_container_of(node, struct lttng_consumer_stream, node);
153
154 free(stream);
155}
156
00e2e675
DG
157/*
158 * RCU protected relayd socket pair free.
159 */
160static void consumer_rcu_free_relayd(struct rcu_head *head)
161{
162 struct lttng_ht_node_ulong *node =
163 caa_container_of(head, struct lttng_ht_node_ulong, head);
164 struct consumer_relayd_sock_pair *relayd =
165 caa_container_of(node, struct consumer_relayd_sock_pair, node);
166
167 free(relayd);
168}
169
170/*
171 * Destroy and free relayd socket pair object.
172 *
173 * This function MUST be called with the consumer_data lock acquired.
174 */
175void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
176{
177 int ret;
178 struct lttng_ht_iter iter;
179
180 DBG("Consumer destroy and close relayd socket pair");
181
182 iter.iter.node = &relayd->node.node;
183 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
184 assert(!ret);
185
186 /* Close all sockets */
187 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
188 (void) relayd_close(&relayd->control_sock);
189 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
190 (void) relayd_close(&relayd->data_sock);
191
192 /* RCU free() call */
193 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
194}
195
3bd1e081
MD
196/*
197 * Remove a stream from the global list protected by a mutex. This
198 * function is also responsible for freeing its data structures.
199 */
200void consumer_del_stream(struct lttng_consumer_stream *stream)
201{
202 int ret;
e4421fec 203 struct lttng_ht_iter iter;
3bd1e081 204 struct lttng_consumer_channel *free_chan = NULL;
00e2e675
DG
205 struct consumer_relayd_sock_pair *relayd;
206
207 assert(stream);
3bd1e081
MD
208
209 pthread_mutex_lock(&consumer_data.lock);
210
211 switch (consumer_data.type) {
212 case LTTNG_CONSUMER_KERNEL:
213 if (stream->mmap_base != NULL) {
214 ret = munmap(stream->mmap_base, stream->mmap_len);
215 if (ret != 0) {
216 perror("munmap");
217 }
218 }
219 break;
7753dea8
MD
220 case LTTNG_CONSUMER32_UST:
221 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
222 lttng_ustconsumer_del_stream(stream);
223 break;
224 default:
225 ERR("Unknown consumer_data type");
226 assert(0);
227 goto end;
228 }
229
6065ceec 230 rcu_read_lock();
04253271
MD
231 iter.iter.node = &stream->node.node;
232 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
233 assert(!ret);
e4421fec 234
6065ceec
DG
235 rcu_read_unlock();
236
3bd1e081
MD
237 if (consumer_data.stream_count <= 0) {
238 goto end;
239 }
240 consumer_data.stream_count--;
241 if (!stream) {
242 goto end;
243 }
244 if (stream->out_fd >= 0) {
4c462e79
MD
245 ret = close(stream->out_fd);
246 if (ret) {
247 PERROR("close");
248 }
3bd1e081 249 }
b5c5fc29 250 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
4c462e79
MD
251 ret = close(stream->wait_fd);
252 if (ret) {
253 PERROR("close");
254 }
3bd1e081 255 }
2c1dd183 256 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
4c462e79
MD
257 ret = close(stream->shm_fd);
258 if (ret) {
259 PERROR("close");
260 }
3bd1e081 261 }
00e2e675
DG
262
263 /* Check and cleanup relayd */
264 relayd = consumer_find_relayd(stream->net_seq_idx);
265 if (relayd != NULL) {
266 /* We are about to modify the relayd refcount */
267 rcu_read_lock();
268 if (!--relayd->refcount) {
269 /* Refcount of the relayd struct is 0, destroy it */
270 consumer_destroy_relayd(relayd);
271 }
272 rcu_read_unlock();
273 }
274
275 if (!--stream->chan->refcount) {
3bd1e081 276 free_chan = stream->chan;
00e2e675
DG
277 }
278
702b1ea4
MD
279
280 call_rcu(&stream->node.head, consumer_free_stream);
3bd1e081
MD
281end:
282 consumer_data.need_update = 1;
283 pthread_mutex_unlock(&consumer_data.lock);
284
285 if (free_chan)
286 consumer_del_channel(free_chan);
287}
288
289struct lttng_consumer_stream *consumer_allocate_stream(
290 int channel_key, int stream_key,
291 int shm_fd, int wait_fd,
292 enum lttng_consumer_stream_state state,
293 uint64_t mmap_len,
294 enum lttng_event_output output,
6df2e2c9
MD
295 const char *path_name,
296 uid_t uid,
00e2e675
DG
297 gid_t gid,
298 int net_index,
299 int metadata_flag)
3bd1e081
MD
300{
301 struct lttng_consumer_stream *stream;
302 int ret;
303
effcf122 304 stream = zmalloc(sizeof(*stream));
3bd1e081
MD
305 if (stream == NULL) {
306 perror("malloc struct lttng_consumer_stream");
307 goto end;
308 }
309 stream->chan = consumer_find_channel(channel_key);
310 if (!stream->chan) {
311 perror("Unable to find channel key");
312 goto end;
313 }
314 stream->chan->refcount++;
315 stream->key = stream_key;
316 stream->shm_fd = shm_fd;
317 stream->wait_fd = wait_fd;
318 stream->out_fd = -1;
319 stream->out_fd_offset = 0;
320 stream->state = state;
321 stream->mmap_len = mmap_len;
322 stream->mmap_base = NULL;
323 stream->output = output;
6df2e2c9
MD
324 stream->uid = uid;
325 stream->gid = gid;
00e2e675
DG
326 stream->net_seq_idx = net_index;
327 stream->metadata_flag = metadata_flag;
328 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
329 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
e4421fec 330 lttng_ht_node_init_ulong(&stream->node, stream->key);
00e2e675 331 lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
3bd1e081
MD
332
333 switch (consumer_data.type) {
334 case LTTNG_CONSUMER_KERNEL:
335 break;
7753dea8
MD
336 case LTTNG_CONSUMER32_UST:
337 case LTTNG_CONSUMER64_UST:
5af2f756 338 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
339 ret = lttng_ustconsumer_allocate_stream(stream);
340 if (ret) {
341 free(stream);
342 return NULL;
343 }
344 break;
345 default:
346 ERR("Unknown consumer_data type");
347 assert(0);
348 goto end;
349 }
00e2e675 350 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
3bd1e081
MD
351 stream->path_name, stream->key,
352 stream->shm_fd,
353 stream->wait_fd,
354 (unsigned long long) stream->mmap_len,
00e2e675
DG
355 stream->out_fd,
356 stream->net_seq_idx);
3bd1e081
MD
357end:
358 return stream;
359}
360
361/*
362 * Add a stream to the global list protected by a mutex.
363 */
364int consumer_add_stream(struct lttng_consumer_stream *stream)
365{
366 int ret = 0;
c77fc10a
DG
367 struct lttng_ht_node_ulong *node;
368 struct lttng_ht_iter iter;
00e2e675 369 struct consumer_relayd_sock_pair *relayd;
3bd1e081
MD
370
371 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
372 /* Steal stream identifier, for UST */
373 consumer_steal_stream_key(stream->key);
6065ceec 374 rcu_read_lock();
c77fc10a
DG
375
376 lttng_ht_lookup(consumer_data.stream_ht,
377 (void *)((unsigned long) stream->key), &iter);
378 node = lttng_ht_iter_get_node_ulong(&iter);
379 if (node != NULL) {
380 rcu_read_unlock();
381 /* Stream already exist. Ignore the insertion */
382 goto end;
383 }
384
04253271 385 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
6065ceec 386 rcu_read_unlock();
00e2e675
DG
387
388 /* Check and cleanup relayd */
389 relayd = consumer_find_relayd(stream->net_seq_idx);
390 if (relayd != NULL) {
391 /* We are about to modify the relayd refcount */
392 rcu_read_lock();
393 relayd->refcount++;
394 rcu_read_unlock();
395 }
396
397 /* Update consumer data */
3bd1e081
MD
398 consumer_data.stream_count++;
399 consumer_data.need_update = 1;
400
401 switch (consumer_data.type) {
402 case LTTNG_CONSUMER_KERNEL:
403 break;
7753dea8
MD
404 case LTTNG_CONSUMER32_UST:
405 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
406 /* Streams are in CPU number order (we rely on this) */
407 stream->cpu = stream->chan->nr_streams++;
408 break;
409 default:
410 ERR("Unknown consumer_data type");
411 assert(0);
412 goto end;
413 }
414
415end:
416 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 417
3bd1e081
MD
418 return ret;
419}
420
00e2e675
DG
421/*
422 * Add relayd socket to global consumer data hashtable.
423 */
424int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
425{
426 int ret = 0;
427 struct lttng_ht_node_ulong *node;
428 struct lttng_ht_iter iter;
429
430 if (relayd == NULL) {
431 ret = -1;
432 goto end;
433 }
434
435 rcu_read_lock();
436
437 lttng_ht_lookup(consumer_data.relayd_ht,
438 (void *)((unsigned long) relayd->net_seq_idx), &iter);
439 node = lttng_ht_iter_get_node_ulong(&iter);
440 if (node != NULL) {
441 rcu_read_unlock();
442 /* Relayd already exist. Ignore the insertion */
443 goto end;
444 }
445 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
446
447 rcu_read_unlock();
448
449end:
450 return ret;
451}
452
453/*
454 * Allocate and return a consumer relayd socket.
455 */
456struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
457 int net_seq_idx)
458{
459 struct consumer_relayd_sock_pair *obj = NULL;
460
461 /* Negative net sequence index is a failure */
462 if (net_seq_idx < 0) {
463 goto error;
464 }
465
466 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
467 if (obj == NULL) {
468 PERROR("zmalloc relayd sock");
469 goto error;
470 }
471
472 obj->net_seq_idx = net_seq_idx;
473 obj->refcount = 0;
474 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
475 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
476
477error:
478 return obj;
479}
480
481/*
482 * Find a relayd socket pair in the global consumer data.
483 *
484 * Return the object if found else NULL.
485 */
486struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
487{
488 struct lttng_ht_iter iter;
489 struct lttng_ht_node_ulong *node;
490 struct consumer_relayd_sock_pair *relayd = NULL;
491
492 /* Negative keys are lookup failures */
493 if (key < 0) {
494 goto error;
495 }
496
497 rcu_read_lock();
498
499 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
500 &iter);
501 node = lttng_ht_iter_get_node_ulong(&iter);
502 if (node != NULL) {
503 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
504 }
505
506 rcu_read_unlock();
507
508error:
509 return relayd;
510}
511
512/*
513 * Handle stream for relayd transmission if the stream applies for network
514 * streaming where the net sequence index is set.
515 *
516 * Return destination file descriptor or negative value on error.
517 */
518int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
519 size_t data_size)
520{
521 int outfd = -1, ret;
522 struct consumer_relayd_sock_pair *relayd;
523 struct lttcomm_relayd_data_hdr data_hdr;
524
525 /* Safety net */
526 assert(stream);
527
528 /* Reset data header */
529 memset(&data_hdr, 0, sizeof(data_hdr));
530
531 /* Get relayd reference of the stream. */
532 relayd = consumer_find_relayd(stream->net_seq_idx);
533 if (relayd == NULL) {
534 /* Stream is either local or corrupted */
535 goto error;
536 }
537
538 DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
539 if (stream->metadata_flag) {
540 /* Caller MUST acquire the relayd control socket lock */
541 ret = relayd_send_metadata(&relayd->control_sock, data_size);
542 if (ret < 0) {
543 goto error;
544 }
545
546 /* Metadata are always sent on the control socket. */
547 outfd = relayd->control_sock.fd;
548 } else {
549 /* Set header with stream information */
550 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
551 data_hdr.data_size = htobe32(data_size);
552 /* Other fields are zeroed previously */
553
554 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
555 sizeof(data_hdr));
556 if (ret < 0) {
557 goto error;
558 }
559
560 /* Set to go on data socket */
561 outfd = relayd->data_sock.fd;
562 }
563
564error:
565 return outfd;
566}
567
3bd1e081
MD
568/*
569 * Update a stream according to what we just received.
570 */
571void consumer_change_stream_state(int stream_key,
572 enum lttng_consumer_stream_state state)
573{
574 struct lttng_consumer_stream *stream;
575
576 pthread_mutex_lock(&consumer_data.lock);
577 stream = consumer_find_stream(stream_key);
578 if (stream) {
579 stream->state = state;
580 }
581 consumer_data.need_update = 1;
582 pthread_mutex_unlock(&consumer_data.lock);
583}
584
702b1ea4
MD
585static
586void consumer_free_channel(struct rcu_head *head)
587{
588 struct lttng_ht_node_ulong *node =
589 caa_container_of(head, struct lttng_ht_node_ulong, head);
590 struct lttng_consumer_channel *channel =
591 caa_container_of(node, struct lttng_consumer_channel, node);
592
593 free(channel);
594}
595
3bd1e081
MD
596/*
597 * Remove a channel from the global list protected by a mutex. This
598 * function is also responsible for freeing its data structures.
599 */
600void consumer_del_channel(struct lttng_consumer_channel *channel)
601{
602 int ret;
e4421fec 603 struct lttng_ht_iter iter;
3bd1e081
MD
604
605 pthread_mutex_lock(&consumer_data.lock);
606
607 switch (consumer_data.type) {
608 case LTTNG_CONSUMER_KERNEL:
609 break;
7753dea8
MD
610 case LTTNG_CONSUMER32_UST:
611 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
612 lttng_ustconsumer_del_channel(channel);
613 break;
614 default:
615 ERR("Unknown consumer_data type");
616 assert(0);
617 goto end;
618 }
619
6065ceec 620 rcu_read_lock();
04253271
MD
621 iter.iter.node = &channel->node.node;
622 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
623 assert(!ret);
6065ceec
DG
624 rcu_read_unlock();
625
3bd1e081
MD
626 if (channel->mmap_base != NULL) {
627 ret = munmap(channel->mmap_base, channel->mmap_len);
628 if (ret != 0) {
629 perror("munmap");
630 }
631 }
b5c5fc29 632 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
4c462e79
MD
633 ret = close(channel->wait_fd);
634 if (ret) {
635 PERROR("close");
636 }
3bd1e081 637 }
2c1dd183 638 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
4c462e79
MD
639 ret = close(channel->shm_fd);
640 if (ret) {
641 PERROR("close");
642 }
3bd1e081 643 }
702b1ea4
MD
644
645 call_rcu(&channel->node.head, consumer_free_channel);
3bd1e081
MD
646end:
647 pthread_mutex_unlock(&consumer_data.lock);
648}
649
650struct lttng_consumer_channel *consumer_allocate_channel(
651 int channel_key,
652 int shm_fd, int wait_fd,
653 uint64_t mmap_len,
654 uint64_t max_sb_size)
655{
656 struct lttng_consumer_channel *channel;
657 int ret;
658
276b26d1 659 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
660 if (channel == NULL) {
661 perror("malloc struct lttng_consumer_channel");
662 goto end;
663 }
664 channel->key = channel_key;
665 channel->shm_fd = shm_fd;
666 channel->wait_fd = wait_fd;
667 channel->mmap_len = mmap_len;
668 channel->max_sb_size = max_sb_size;
669 channel->refcount = 0;
670 channel->nr_streams = 0;
e4421fec 671 lttng_ht_node_init_ulong(&channel->node, channel->key);
3bd1e081
MD
672
673 switch (consumer_data.type) {
674 case LTTNG_CONSUMER_KERNEL:
675 channel->mmap_base = NULL;
676 channel->mmap_len = 0;
677 break;
7753dea8
MD
678 case LTTNG_CONSUMER32_UST:
679 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
680 ret = lttng_ustconsumer_allocate_channel(channel);
681 if (ret) {
682 free(channel);
683 return NULL;
684 }
685 break;
686 default:
687 ERR("Unknown consumer_data type");
688 assert(0);
689 goto end;
690 }
691 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
00e2e675 692 channel->key, channel->shm_fd, channel->wait_fd,
3bd1e081
MD
693 (unsigned long long) channel->mmap_len,
694 (unsigned long long) channel->max_sb_size);
695end:
696 return channel;
697}
698
699/*
700 * Add a channel to the global list protected by a mutex.
701 */
702int consumer_add_channel(struct lttng_consumer_channel *channel)
703{
c77fc10a
DG
704 struct lttng_ht_node_ulong *node;
705 struct lttng_ht_iter iter;
706
3bd1e081 707 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
708 /* Steal channel identifier, for UST */
709 consumer_steal_channel_key(channel->key);
6065ceec 710 rcu_read_lock();
c77fc10a
DG
711
712 lttng_ht_lookup(consumer_data.channel_ht,
713 (void *)((unsigned long) channel->key), &iter);
714 node = lttng_ht_iter_get_node_ulong(&iter);
715 if (node != NULL) {
716 /* Channel already exist. Ignore the insertion */
717 goto end;
718 }
719
04253271 720 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
c77fc10a
DG
721
722end:
6065ceec 723 rcu_read_unlock();
3bd1e081 724 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 725
7ad0a0cb 726 return 0;
3bd1e081
MD
727}
728
729/*
730 * Allocate the pollfd structure and the local view of the out fds to avoid
731 * doing a lookup in the linked list and concurrency issues when writing is
732 * needed. Called with consumer_data.lock held.
733 *
734 * Returns the number of fds in the structures.
735 */
736int consumer_update_poll_array(
737 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
00e2e675
DG
738 struct lttng_consumer_stream **local_stream,
739 struct lttng_ht *metadata_ht)
3bd1e081 740{
3bd1e081 741 int i = 0;
e4421fec
DG
742 struct lttng_ht_iter iter;
743 struct lttng_consumer_stream *stream;
3bd1e081
MD
744
745 DBG("Updating poll fd array");
481d6c57 746 rcu_read_lock();
e4421fec
DG
747 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
748 node.node) {
749 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
3bd1e081
MD
750 continue;
751 }
e4421fec
DG
752 DBG("Active FD %d", stream->wait_fd);
753 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 754 (*pollfd)[i].events = POLLIN | POLLPRI;
00e2e675
DG
755 if (stream->metadata_flag && metadata_ht) {
756 lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node);
757 DBG("Active FD added to metadata hash table");
758 }
e4421fec 759 local_stream[i] = stream;
3bd1e081
MD
760 i++;
761 }
481d6c57 762 rcu_read_unlock();
3bd1e081
MD
763
764 /*
765 * Insert the consumer_poll_pipe at the end of the array and don't
766 * increment i so nb_fd is the number of real FD.
767 */
768 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
509bb1cf 769 (*pollfd)[i].events = POLLIN | POLLPRI;
3bd1e081
MD
770 return i;
771}
772
773/*
774 * Poll on the should_quit pipe and the command socket return -1 on error and
775 * should exit, 0 if data is available on the command socket
776 */
777int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
778{
779 int num_rdy;
780
88f2b785 781restart:
3bd1e081
MD
782 num_rdy = poll(consumer_sockpoll, 2, -1);
783 if (num_rdy == -1) {
88f2b785
MD
784 /*
785 * Restart interrupted system call.
786 */
787 if (errno == EINTR) {
788 goto restart;
789 }
3bd1e081
MD
790 perror("Poll error");
791 goto exit;
792 }
509bb1cf 793 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081
MD
794 DBG("consumer_should_quit wake up");
795 goto exit;
796 }
797 return 0;
798
799exit:
800 return -1;
801}
802
803/*
804 * Set the error socket.
805 */
806void lttng_consumer_set_error_sock(
807 struct lttng_consumer_local_data *ctx, int sock)
808{
809 ctx->consumer_error_socket = sock;
810}
811
812/*
813 * Set the command socket path.
814 */
3bd1e081
MD
815void lttng_consumer_set_command_sock_path(
816 struct lttng_consumer_local_data *ctx, char *sock)
817{
818 ctx->consumer_command_sock_path = sock;
819}
820
821/*
822 * Send return code to the session daemon.
823 * If the socket is not defined, we return 0, it is not a fatal error
824 */
825int lttng_consumer_send_error(
826 struct lttng_consumer_local_data *ctx, int cmd)
827{
828 if (ctx->consumer_error_socket > 0) {
829 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
830 sizeof(enum lttcomm_sessiond_command));
831 }
832
833 return 0;
834}
835
836/*
837 * Close all the tracefiles and stream fds, should be called when all instances
838 * are destroyed.
839 */
840void lttng_consumer_cleanup(void)
841{
e4421fec 842 struct lttng_ht_iter iter;
6065ceec
DG
843 struct lttng_ht_node_ulong *node;
844
845 rcu_read_lock();
3bd1e081
MD
846
847 /*
6065ceec
DG
848 * close all outfd. Called when there are no more threads running (after
849 * joining on the threads), no need to protect list iteration with mutex.
3bd1e081 850 */
6065ceec
DG
851 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
852 node) {
702b1ea4
MD
853 struct lttng_consumer_stream *stream =
854 caa_container_of(node, struct lttng_consumer_stream, node);
855 consumer_del_stream(stream);
3bd1e081 856 }
e4421fec 857
6065ceec
DG
858 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
859 node) {
702b1ea4
MD
860 struct lttng_consumer_channel *channel =
861 caa_container_of(node, struct lttng_consumer_channel, node);
862 consumer_del_channel(channel);
3bd1e081 863 }
6065ceec
DG
864
865 rcu_read_unlock();
d6ce1df2
MD
866
867 lttng_ht_destroy(consumer_data.stream_ht);
868 lttng_ht_destroy(consumer_data.channel_ht);
3bd1e081
MD
869}
870
871/*
872 * Called from signal handler.
873 */
874void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
875{
876 int ret;
877 consumer_quit = 1;
6f94560a
MD
878 do {
879 ret = write(ctx->consumer_should_quit[1], "4", 1);
880 } while (ret < 0 && errno == EINTR);
3bd1e081
MD
881 if (ret < 0) {
882 perror("write consumer quit");
883 }
884}
885
00e2e675
DG
886void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
887 off_t orig_offset)
3bd1e081
MD
888{
889 int outfd = stream->out_fd;
890
891 /*
892 * This does a blocking write-and-wait on any page that belongs to the
893 * subbuffer prior to the one we just wrote.
894 * Don't care about error values, as these are just hints and ways to
895 * limit the amount of page cache used.
896 */
897 if (orig_offset < stream->chan->max_sb_size) {
898 return;
899 }
b9182dd9 900 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
3bd1e081
MD
901 stream->chan->max_sb_size,
902 SYNC_FILE_RANGE_WAIT_BEFORE
903 | SYNC_FILE_RANGE_WRITE
904 | SYNC_FILE_RANGE_WAIT_AFTER);
905 /*
906 * Give hints to the kernel about how we access the file:
907 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
908 * we write it.
909 *
910 * We need to call fadvise again after the file grows because the
911 * kernel does not seem to apply fadvise to non-existing parts of the
912 * file.
913 *
914 * Call fadvise _after_ having waited for the page writeback to
915 * complete because the dirty page writeback semantic is not well
916 * defined. So it can be expected to lead to lower throughput in
917 * streaming.
918 */
919 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
920 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
921}
922
923/*
924 * Initialise the necessary environnement :
925 * - create a new context
926 * - create the poll_pipe
927 * - create the should_quit pipe (for signal handler)
928 * - create the thread pipe (for splice)
929 *
930 * Takes a function pointer as argument, this function is called when data is
931 * available on a buffer. This function is responsible to do the
932 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
933 * buffer configuration and then kernctl_put_next_subbuf at the end.
934 *
935 * Returns a pointer to the new context or NULL on error.
936 */
937struct lttng_consumer_local_data *lttng_consumer_create(
938 enum lttng_consumer_type type,
4078b776 939 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 940 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
941 int (*recv_channel)(struct lttng_consumer_channel *channel),
942 int (*recv_stream)(struct lttng_consumer_stream *stream),
943 int (*update_stream)(int stream_key, uint32_t state))
944{
945 int ret, i;
946 struct lttng_consumer_local_data *ctx;
947
948 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
949 consumer_data.type == type);
950 consumer_data.type = type;
951
effcf122 952 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081
MD
953 if (ctx == NULL) {
954 perror("allocating context");
955 goto error;
956 }
957
958 ctx->consumer_error_socket = -1;
959 /* assign the callbacks */
960 ctx->on_buffer_ready = buffer_ready;
961 ctx->on_recv_channel = recv_channel;
962 ctx->on_recv_stream = recv_stream;
963 ctx->on_update_stream = update_stream;
964
965 ret = pipe(ctx->consumer_poll_pipe);
966 if (ret < 0) {
967 perror("Error creating poll pipe");
968 goto error_poll_pipe;
969 }
970
04fdd819
MD
971 /* set read end of the pipe to non-blocking */
972 ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
973 if (ret < 0) {
974 perror("fcntl O_NONBLOCK");
975 goto error_poll_fcntl;
976 }
977
978 /* set write end of the pipe to non-blocking */
979 ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
980 if (ret < 0) {
981 perror("fcntl O_NONBLOCK");
982 goto error_poll_fcntl;
983 }
984
3bd1e081
MD
985 ret = pipe(ctx->consumer_should_quit);
986 if (ret < 0) {
987 perror("Error creating recv pipe");
988 goto error_quit_pipe;
989 }
990
991 ret = pipe(ctx->consumer_thread_pipe);
992 if (ret < 0) {
993 perror("Error creating thread pipe");
994 goto error_thread_pipe;
995 }
996
997 return ctx;
998
999
1000error_thread_pipe:
1001 for (i = 0; i < 2; i++) {
1002 int err;
1003
1004 err = close(ctx->consumer_should_quit[i]);
4c462e79
MD
1005 if (err) {
1006 PERROR("close");
1007 }
3bd1e081 1008 }
04fdd819 1009error_poll_fcntl:
3bd1e081
MD
1010error_quit_pipe:
1011 for (i = 0; i < 2; i++) {
1012 int err;
1013
1014 err = close(ctx->consumer_poll_pipe[i]);
4c462e79
MD
1015 if (err) {
1016 PERROR("close");
1017 }
3bd1e081
MD
1018 }
1019error_poll_pipe:
1020 free(ctx);
1021error:
1022 return NULL;
1023}
1024
1025/*
1026 * Close all fds associated with the instance and free the context.
1027 */
1028void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1029{
4c462e79
MD
1030 int ret;
1031
1032 ret = close(ctx->consumer_error_socket);
1033 if (ret) {
1034 PERROR("close");
1035 }
1036 ret = close(ctx->consumer_thread_pipe[0]);
1037 if (ret) {
1038 PERROR("close");
1039 }
1040 ret = close(ctx->consumer_thread_pipe[1]);
1041 if (ret) {
1042 PERROR("close");
1043 }
1044 ret = close(ctx->consumer_poll_pipe[0]);
1045 if (ret) {
1046 PERROR("close");
1047 }
1048 ret = close(ctx->consumer_poll_pipe[1]);
1049 if (ret) {
1050 PERROR("close");
1051 }
1052 ret = close(ctx->consumer_should_quit[0]);
1053 if (ret) {
1054 PERROR("close");
1055 }
1056 ret = close(ctx->consumer_should_quit[1]);
1057 if (ret) {
1058 PERROR("close");
1059 }
3bd1e081
MD
1060 unlink(ctx->consumer_command_sock_path);
1061 free(ctx);
1062}
1063
1064/*
1065 * Mmap the ring buffer, read it and write the data to the tracefile.
1066 *
1067 * Returns the number of bytes written
1068 */
4078b776 1069ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081
MD
1070 struct lttng_consumer_local_data *ctx,
1071 struct lttng_consumer_stream *stream, unsigned long len)
1072{
1073 switch (consumer_data.type) {
1074 case LTTNG_CONSUMER_KERNEL:
1075 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
7753dea8
MD
1076 case LTTNG_CONSUMER32_UST:
1077 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1078 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
1079 default:
1080 ERR("Unknown consumer_data type");
1081 assert(0);
1082 }
b9182dd9
DG
1083
1084 return 0;
3bd1e081
MD
1085}
1086
1087/*
1088 * Splice the data from the ring buffer to the tracefile.
1089 *
1090 * Returns the number of bytes spliced.
1091 */
4078b776 1092ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081
MD
1093 struct lttng_consumer_local_data *ctx,
1094 struct lttng_consumer_stream *stream, unsigned long len)
1095{
1096 switch (consumer_data.type) {
1097 case LTTNG_CONSUMER_KERNEL:
1098 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
7753dea8
MD
1099 case LTTNG_CONSUMER32_UST:
1100 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1101 return -ENOSYS;
1102 default:
1103 ERR("Unknown consumer_data type");
1104 assert(0);
1105 return -ENOSYS;
1106 }
1107
1108}
1109
1110/*
1111 * Take a snapshot for a specific fd
1112 *
1113 * Returns 0 on success, < 0 on error
1114 */
1115int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1116 struct lttng_consumer_stream *stream)
1117{
1118 switch (consumer_data.type) {
1119 case LTTNG_CONSUMER_KERNEL:
1120 return lttng_kconsumer_take_snapshot(ctx, stream);
7753dea8
MD
1121 case LTTNG_CONSUMER32_UST:
1122 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1123 return lttng_ustconsumer_take_snapshot(ctx, stream);
1124 default:
1125 ERR("Unknown consumer_data type");
1126 assert(0);
1127 return -ENOSYS;
1128 }
1129
1130}
1131
1132/*
1133 * Get the produced position
1134 *
1135 * Returns 0 on success, < 0 on error
1136 */
1137int lttng_consumer_get_produced_snapshot(
1138 struct lttng_consumer_local_data *ctx,
1139 struct lttng_consumer_stream *stream,
1140 unsigned long *pos)
1141{
1142 switch (consumer_data.type) {
1143 case LTTNG_CONSUMER_KERNEL:
1144 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
7753dea8
MD
1145 case LTTNG_CONSUMER32_UST:
1146 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1147 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1148 default:
1149 ERR("Unknown consumer_data type");
1150 assert(0);
1151 return -ENOSYS;
1152 }
1153}
1154
1155int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1156 int sock, struct pollfd *consumer_sockpoll)
1157{
1158 switch (consumer_data.type) {
1159 case LTTNG_CONSUMER_KERNEL:
1160 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
1161 case LTTNG_CONSUMER32_UST:
1162 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1163 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1164 default:
1165 ERR("Unknown consumer_data type");
1166 assert(0);
1167 return -ENOSYS;
1168 }
1169}
1170
1171/*
e4421fec 1172 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
1173 * it to tracefile if necessary.
1174 */
1175void *lttng_consumer_thread_poll_fds(void *data)
1176{
1177 int num_rdy, num_hup, high_prio, ret, i;
1178 struct pollfd *pollfd = NULL;
1179 /* local view of the streams */
1180 struct lttng_consumer_stream **local_stream = NULL;
1181 /* local view of consumer_data.fds_count */
1182 int nb_fd = 0;
3bd1e081 1183 struct lttng_consumer_local_data *ctx = data;
00e2e675
DG
1184 struct lttng_ht *metadata_ht;
1185 struct lttng_ht_iter iter;
1186 struct lttng_ht_node_ulong *node;
1187 struct lttng_consumer_stream *metadata_stream;
1188 ssize_t len;
1189
1190 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
3bd1e081 1191
e7b994a3
DG
1192 rcu_register_thread();
1193
effcf122 1194 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
1195
1196 while (1) {
1197 high_prio = 0;
1198 num_hup = 0;
1199
1200 /*
e4421fec 1201 * the fds set has been updated, we need to update our
3bd1e081
MD
1202 * local array as well
1203 */
1204 pthread_mutex_lock(&consumer_data.lock);
1205 if (consumer_data.need_update) {
1206 if (pollfd != NULL) {
1207 free(pollfd);
1208 pollfd = NULL;
1209 }
1210 if (local_stream != NULL) {
1211 free(local_stream);
1212 local_stream = NULL;
1213 }
1214
1215 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 1216 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081
MD
1217 if (pollfd == NULL) {
1218 perror("pollfd malloc");
1219 pthread_mutex_unlock(&consumer_data.lock);
1220 goto end;
1221 }
1222
1223 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 1224 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
1225 sizeof(struct lttng_consumer_stream));
1226 if (local_stream == NULL) {
1227 perror("local_stream malloc");
1228 pthread_mutex_unlock(&consumer_data.lock);
1229 goto end;
1230 }
00e2e675
DG
1231 ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
1232 metadata_ht);
3bd1e081
MD
1233 if (ret < 0) {
1234 ERR("Error in allocating pollfd or local_outfds");
1235 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
1236 pthread_mutex_unlock(&consumer_data.lock);
1237 goto end;
1238 }
1239 nb_fd = ret;
1240 consumer_data.need_update = 0;
1241 }
1242 pthread_mutex_unlock(&consumer_data.lock);
1243
4078b776
MD
1244 /* No FDs and consumer_quit, consumer_cleanup the thread */
1245 if (nb_fd == 0 && consumer_quit == 1) {
1246 goto end;
1247 }
3bd1e081 1248 /* poll on the array of fds */
88f2b785 1249 restart:
3bd1e081
MD
1250 DBG("polling on %d fd", nb_fd + 1);
1251 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
1252 DBG("poll num_rdy : %d", num_rdy);
1253 if (num_rdy == -1) {
88f2b785
MD
1254 /*
1255 * Restart interrupted system call.
1256 */
1257 if (errno == EINTR) {
1258 goto restart;
1259 }
3bd1e081
MD
1260 perror("Poll error");
1261 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
1262 goto end;
1263 } else if (num_rdy == 0) {
1264 DBG("Polling thread timed out");
1265 goto end;
1266 }
1267
3bd1e081 1268 /*
00e2e675
DG
1269 * If the consumer_poll_pipe triggered poll go directly to the
1270 * beginning of the loop to update the array. We want to prioritize
1271 * array update over low-priority reads.
3bd1e081 1272 */
509bb1cf 1273 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
04fdd819
MD
1274 size_t pipe_readlen;
1275 char tmp;
1276
3bd1e081 1277 DBG("consumer_poll_pipe wake up");
04fdd819
MD
1278 /* Consume 1 byte of pipe data */
1279 do {
1280 pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
1281 } while (pipe_readlen == -1 && errno == EINTR);
3bd1e081
MD
1282 continue;
1283 }
1284
1285 /* Take care of high priority channels first. */
1286 for (i = 0; i < nb_fd; i++) {
00e2e675
DG
1287 /* Lookup for metadata which is the highest priority */
1288 lttng_ht_lookup(metadata_ht,
1289 (void *)((unsigned long) pollfd[i].fd), &iter);
1290 node = lttng_ht_iter_get_node_ulong(&iter);
1291 if (node != NULL &&
1292 (pollfd[i].revents & (POLLIN | POLLPRI))) {
1293 DBG("Urgent metadata read on fd %d", pollfd[i].fd);
1294 metadata_stream = caa_container_of(node,
1295 struct lttng_consumer_stream, waitfd_node);
1296 high_prio = 1;
1297 len = ctx->on_buffer_ready(metadata_stream, ctx);
1298 /* it's ok to have an unavailable sub-buffer */
1299 if (len < 0 && len != -EAGAIN) {
1300 goto end;
1301 } else if (len > 0) {
1302 metadata_stream->data_read = 1;
1303 }
1304 } else if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
1305 DBG("Urgent read on fd %d", pollfd[i].fd);
1306 high_prio = 1;
4078b776 1307 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 1308 /* it's ok to have an unavailable sub-buffer */
4078b776
MD
1309 if (len < 0 && len != -EAGAIN) {
1310 goto end;
1311 } else if (len > 0) {
1312 local_stream[i]->data_read = 1;
d41f73b7 1313 }
3bd1e081
MD
1314 }
1315 }
1316
4078b776
MD
1317 /*
1318 * If we read high prio channel in this loop, try again
1319 * for more high prio data.
1320 */
1321 if (high_prio) {
3bd1e081
MD
1322 continue;
1323 }
1324
1325 /* Take care of low priority channels. */
4078b776
MD
1326 for (i = 0; i < nb_fd; i++) {
1327 if ((pollfd[i].revents & POLLIN) ||
1328 local_stream[i]->hangup_flush_done) {
4078b776
MD
1329 DBG("Normal read on fd %d", pollfd[i].fd);
1330 len = ctx->on_buffer_ready(local_stream[i], ctx);
1331 /* it's ok to have an unavailable sub-buffer */
1332 if (len < 0 && len != -EAGAIN) {
1333 goto end;
1334 } else if (len > 0) {
1335 local_stream[i]->data_read = 1;
1336 }
1337 }
1338 }
1339
1340 /* Handle hangup and errors */
1341 for (i = 0; i < nb_fd; i++) {
1342 if (!local_stream[i]->hangup_flush_done
1343 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
1344 && (consumer_data.type == LTTNG_CONSUMER32_UST
1345 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1346 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1347 pollfd[i].fd);
1348 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
1349 /* Attempt read again, for the data we just flushed. */
1350 local_stream[i]->data_read = 1;
1351 }
1352 /*
1353 * If the poll flag is HUP/ERR/NVAL and we have
1354 * read no data in this pass, we can remove the
1355 * stream from its hash table.
1356 */
1357 if ((pollfd[i].revents & POLLHUP)) {
1358 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
1359 if (!local_stream[i]->data_read) {
00e2e675
DG
1360 if (local_stream[i]->metadata_flag) {
1361 iter.iter.node = &local_stream[i]->waitfd_node.node;
1362 ret = lttng_ht_del(metadata_ht, &iter);
1363 assert(!ret);
1364 }
702b1ea4 1365 consumer_del_stream(local_stream[i]);
4078b776
MD
1366 num_hup++;
1367 }
1368 } else if (pollfd[i].revents & POLLERR) {
1369 ERR("Error returned in polling fd %d.", pollfd[i].fd);
1370 if (!local_stream[i]->data_read) {
00e2e675
DG
1371 if (local_stream[i]->metadata_flag) {
1372 iter.iter.node = &local_stream[i]->waitfd_node.node;
1373 ret = lttng_ht_del(metadata_ht, &iter);
1374 assert(!ret);
1375 }
702b1ea4 1376 consumer_del_stream(local_stream[i]);
4078b776
MD
1377 num_hup++;
1378 }
1379 } else if (pollfd[i].revents & POLLNVAL) {
1380 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
1381 if (!local_stream[i]->data_read) {
00e2e675
DG
1382 if (local_stream[i]->metadata_flag) {
1383 iter.iter.node = &local_stream[i]->waitfd_node.node;
1384 ret = lttng_ht_del(metadata_ht, &iter);
1385 assert(!ret);
1386 }
702b1ea4 1387 consumer_del_stream(local_stream[i]);
4078b776 1388 num_hup++;
3bd1e081
MD
1389 }
1390 }
4078b776 1391 local_stream[i]->data_read = 0;
3bd1e081
MD
1392 }
1393 }
1394end:
1395 DBG("polling thread exiting");
1396 if (pollfd != NULL) {
1397 free(pollfd);
1398 pollfd = NULL;
1399 }
1400 if (local_stream != NULL) {
1401 free(local_stream);
1402 local_stream = NULL;
1403 }
e7b994a3 1404 rcu_unregister_thread();
3bd1e081
MD
1405 return NULL;
1406}
1407
1408/*
1409 * This thread listens on the consumerd socket and receives the file
1410 * descriptors from the session daemon.
1411 */
1412void *lttng_consumer_thread_receive_fds(void *data)
1413{
1414 int sock, client_socket, ret;
1415 /*
1416 * structure to poll for incoming data on communication socket avoids
1417 * making blocking sockets.
1418 */
1419 struct pollfd consumer_sockpoll[2];
1420 struct lttng_consumer_local_data *ctx = data;
1421
e7b994a3
DG
1422 rcu_register_thread();
1423
3bd1e081
MD
1424 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
1425 unlink(ctx->consumer_command_sock_path);
1426 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
1427 if (client_socket < 0) {
1428 ERR("Cannot create command socket");
1429 goto end;
1430 }
1431
1432 ret = lttcomm_listen_unix_sock(client_socket);
1433 if (ret < 0) {
1434 goto end;
1435 }
1436
32258573 1437 DBG("Sending ready command to lttng-sessiond");
3bd1e081
MD
1438 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
1439 /* return < 0 on error, but == 0 is not fatal */
1440 if (ret < 0) {
32258573 1441 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
1442 goto end;
1443 }
1444
1445 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
1446 if (ret < 0) {
1447 perror("fcntl O_NONBLOCK");
1448 goto end;
1449 }
1450
1451 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1452 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
1453 consumer_sockpoll[0].events = POLLIN | POLLPRI;
1454 consumer_sockpoll[1].fd = client_socket;
1455 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1456
1457 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1458 goto end;
1459 }
1460 DBG("Connection on client_socket");
1461
1462 /* Blocking call, waiting for transmission */
1463 sock = lttcomm_accept_unix_sock(client_socket);
1464 if (sock <= 0) {
1465 WARN("On accept");
1466 goto end;
1467 }
1468 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
1469 if (ret < 0) {
1470 perror("fcntl O_NONBLOCK");
1471 goto end;
1472 }
1473
1474 /* update the polling structure to poll on the established socket */
1475 consumer_sockpoll[1].fd = sock;
1476 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1477
1478 while (1) {
1479 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1480 goto end;
1481 }
1482 DBG("Incoming command on sock");
1483 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1484 if (ret == -ENOENT) {
1485 DBG("Received STOP command");
1486 goto end;
1487 }
1488 if (ret < 0) {
1489 ERR("Communication interrupted on command socket");
1490 goto end;
1491 }
1492 if (consumer_quit) {
1493 DBG("consumer_thread_receive_fds received quit from signal");
1494 goto end;
1495 }
1496 DBG("received fds on sock");
1497 }
1498end:
1499 DBG("consumer_thread_receive_fds exiting");
1500
1501 /*
1502 * when all fds have hung up, the polling thread
1503 * can exit cleanly
1504 */
1505 consumer_quit = 1;
1506
1507 /*
1508 * 2s of grace period, if no polling events occur during
1509 * this period, the polling thread will exit even if there
1510 * are still open FDs (should not happen, but safety mechanism).
1511 */
1512 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1513
04fdd819
MD
1514 /*
1515 * Wake-up the other end by writing a null byte in the pipe
1516 * (non-blocking). Important note: Because writing into the
1517 * pipe is non-blocking (and therefore we allow dropping wakeup
1518 * data, as long as there is wakeup data present in the pipe
1519 * buffer to wake up the other end), the other end should
1520 * perform the following sequence for waiting:
1521 * 1) empty the pipe (reads).
1522 * 2) perform update operation.
1523 * 3) wait on the pipe (poll).
1524 */
1525 do {
1526 ret = write(ctx->consumer_poll_pipe[1], "", 1);
6f94560a 1527 } while (ret < 0 && errno == EINTR);
e7b994a3 1528 rcu_unregister_thread();
3bd1e081
MD
1529 return NULL;
1530}
d41f73b7 1531
4078b776 1532ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
1533 struct lttng_consumer_local_data *ctx)
1534{
1535 switch (consumer_data.type) {
1536 case LTTNG_CONSUMER_KERNEL:
1537 return lttng_kconsumer_read_subbuffer(stream, ctx);
7753dea8
MD
1538 case LTTNG_CONSUMER32_UST:
1539 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1540 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1541 default:
1542 ERR("Unknown consumer_data type");
1543 assert(0);
1544 return -ENOSYS;
1545 }
1546}
1547
1548int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1549{
1550 switch (consumer_data.type) {
1551 case LTTNG_CONSUMER_KERNEL:
1552 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
1553 case LTTNG_CONSUMER32_UST:
1554 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1555 return lttng_ustconsumer_on_recv_stream(stream);
1556 default:
1557 ERR("Unknown consumer_data type");
1558 assert(0);
1559 return -ENOSYS;
1560 }
1561}
e4421fec
DG
1562
1563/*
1564 * Allocate and set consumer data hash tables.
1565 */
1566void lttng_consumer_init(void)
1567{
1568 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1569 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675 1570 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
e4421fec 1571}
This page took 0.09771 seconds and 4 git commands to generate.