Fix: relayd metadata size
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30
990570ed 31#include <common/common.h>
10a8a223 32#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 33#include <common/sessiond-comm/relayd.h>
10a8a223
DG
34#include <common/sessiond-comm/sessiond-comm.h>
35#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 36#include <common/relayd/relayd.h>
10a8a223
DG
37#include <common/ust-consumer/ust-consumer.h>
38
39#include "consumer.h"
3bd1e081
MD
40
41struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
42 .stream_count = 0,
43 .need_update = 1,
44 .type = LTTNG_CONSUMER_UNKNOWN,
45};
46
47/* timeout parameter, to control the polling thread grace period. */
48int consumer_poll_timeout = -1;
49
50/*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56volatile int consumer_quit = 0;
57
58/*
59 * Find a stream. The consumer_data.lock must be locked during this
60 * call.
61 */
62static struct lttng_consumer_stream *consumer_find_stream(int key)
63{
e4421fec
DG
64 struct lttng_ht_iter iter;
65 struct lttng_ht_node_ulong *node;
66 struct lttng_consumer_stream *stream = NULL;
3bd1e081 67
7ad0a0cb
MD
68 /* Negative keys are lookup failures */
69 if (key < 0)
70 return NULL;
e4421fec 71
6065ceec
DG
72 rcu_read_lock();
73
e4421fec
DG
74 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
75 &iter);
76 node = lttng_ht_iter_get_node_ulong(&iter);
77 if (node != NULL) {
78 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 79 }
e4421fec 80
6065ceec
DG
81 rcu_read_unlock();
82
e4421fec 83 return stream;
3bd1e081
MD
84}
85
7ad0a0cb
MD
86static void consumer_steal_stream_key(int key)
87{
88 struct lttng_consumer_stream *stream;
89
04253271 90 rcu_read_lock();
7ad0a0cb 91 stream = consumer_find_stream(key);
04253271 92 if (stream) {
7ad0a0cb 93 stream->key = -1;
04253271
MD
94 /*
95 * We don't want the lookup to match, but we still need
96 * to iterate on this stream when iterating over the hash table. Just
97 * change the node key.
98 */
99 stream->node.key = -1;
100 }
101 rcu_read_unlock();
7ad0a0cb
MD
102}
103
3bd1e081
MD
104static struct lttng_consumer_channel *consumer_find_channel(int key)
105{
e4421fec
DG
106 struct lttng_ht_iter iter;
107 struct lttng_ht_node_ulong *node;
108 struct lttng_consumer_channel *channel = NULL;
3bd1e081 109
7ad0a0cb
MD
110 /* Negative keys are lookup failures */
111 if (key < 0)
112 return NULL;
e4421fec 113
6065ceec
DG
114 rcu_read_lock();
115
e4421fec
DG
116 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
117 &iter);
118 node = lttng_ht_iter_get_node_ulong(&iter);
119 if (node != NULL) {
120 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 121 }
e4421fec 122
6065ceec
DG
123 rcu_read_unlock();
124
e4421fec 125 return channel;
3bd1e081
MD
126}
127
7ad0a0cb
MD
128static void consumer_steal_channel_key(int key)
129{
130 struct lttng_consumer_channel *channel;
131
04253271 132 rcu_read_lock();
7ad0a0cb 133 channel = consumer_find_channel(key);
04253271 134 if (channel) {
7ad0a0cb 135 channel->key = -1;
04253271
MD
136 /*
137 * We don't want the lookup to match, but we still need
138 * to iterate on this channel when iterating over the hash table. Just
139 * change the node key.
140 */
141 channel->node.key = -1;
142 }
143 rcu_read_unlock();
7ad0a0cb
MD
144}
145
702b1ea4
MD
146static
147void consumer_free_stream(struct rcu_head *head)
148{
149 struct lttng_ht_node_ulong *node =
150 caa_container_of(head, struct lttng_ht_node_ulong, head);
151 struct lttng_consumer_stream *stream =
152 caa_container_of(node, struct lttng_consumer_stream, node);
153
154 free(stream);
155}
156
00e2e675
DG
157/*
158 * RCU protected relayd socket pair free.
159 */
160static void consumer_rcu_free_relayd(struct rcu_head *head)
161{
162 struct lttng_ht_node_ulong *node =
163 caa_container_of(head, struct lttng_ht_node_ulong, head);
164 struct consumer_relayd_sock_pair *relayd =
165 caa_container_of(node, struct consumer_relayd_sock_pair, node);
166
167 free(relayd);
168}
169
170/*
171 * Destroy and free relayd socket pair object.
172 *
173 * This function MUST be called with the consumer_data lock acquired.
174 */
175void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
176{
177 int ret;
178 struct lttng_ht_iter iter;
179
180 DBG("Consumer destroy and close relayd socket pair");
181
182 iter.iter.node = &relayd->node.node;
183 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
184 assert(!ret);
185
186 /* Close all sockets */
187 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
188 (void) relayd_close(&relayd->control_sock);
189 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
190 (void) relayd_close(&relayd->data_sock);
191
192 /* RCU free() call */
193 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
194}
195
3bd1e081
MD
196/*
197 * Remove a stream from the global list protected by a mutex. This
198 * function is also responsible for freeing its data structures.
199 */
200void consumer_del_stream(struct lttng_consumer_stream *stream)
201{
202 int ret;
e4421fec 203 struct lttng_ht_iter iter;
3bd1e081 204 struct lttng_consumer_channel *free_chan = NULL;
00e2e675
DG
205 struct consumer_relayd_sock_pair *relayd;
206
207 assert(stream);
3bd1e081
MD
208
209 pthread_mutex_lock(&consumer_data.lock);
210
211 switch (consumer_data.type) {
212 case LTTNG_CONSUMER_KERNEL:
213 if (stream->mmap_base != NULL) {
214 ret = munmap(stream->mmap_base, stream->mmap_len);
215 if (ret != 0) {
216 perror("munmap");
217 }
218 }
219 break;
7753dea8
MD
220 case LTTNG_CONSUMER32_UST:
221 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
222 lttng_ustconsumer_del_stream(stream);
223 break;
224 default:
225 ERR("Unknown consumer_data type");
226 assert(0);
227 goto end;
228 }
229
6065ceec 230 rcu_read_lock();
04253271
MD
231 iter.iter.node = &stream->node.node;
232 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
233 assert(!ret);
e4421fec 234
6065ceec
DG
235 rcu_read_unlock();
236
3bd1e081
MD
237 if (consumer_data.stream_count <= 0) {
238 goto end;
239 }
240 consumer_data.stream_count--;
241 if (!stream) {
242 goto end;
243 }
244 if (stream->out_fd >= 0) {
4c462e79
MD
245 ret = close(stream->out_fd);
246 if (ret) {
247 PERROR("close");
248 }
3bd1e081 249 }
b5c5fc29 250 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
4c462e79
MD
251 ret = close(stream->wait_fd);
252 if (ret) {
253 PERROR("close");
254 }
3bd1e081 255 }
2c1dd183 256 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
4c462e79
MD
257 ret = close(stream->shm_fd);
258 if (ret) {
259 PERROR("close");
260 }
3bd1e081 261 }
00e2e675
DG
262
263 /* Check and cleanup relayd */
b0b335c8 264 rcu_read_lock();
00e2e675
DG
265 relayd = consumer_find_relayd(stream->net_seq_idx);
266 if (relayd != NULL) {
b0b335c8
MD
267 uatomic_dec(&relayd->refcount);
268 assert(uatomic_read(&relayd->refcount) >= 0);
269 if (uatomic_read(&relayd->refcount) == 0) {
00e2e675
DG
270 /* Refcount of the relayd struct is 0, destroy it */
271 consumer_destroy_relayd(relayd);
272 }
00e2e675 273 }
b0b335c8 274 rcu_read_unlock();
00e2e675
DG
275
276 if (!--stream->chan->refcount) {
3bd1e081 277 free_chan = stream->chan;
00e2e675
DG
278 }
279
702b1ea4
MD
280
281 call_rcu(&stream->node.head, consumer_free_stream);
3bd1e081
MD
282end:
283 consumer_data.need_update = 1;
284 pthread_mutex_unlock(&consumer_data.lock);
285
286 if (free_chan)
287 consumer_del_channel(free_chan);
288}
289
290struct lttng_consumer_stream *consumer_allocate_stream(
291 int channel_key, int stream_key,
292 int shm_fd, int wait_fd,
293 enum lttng_consumer_stream_state state,
294 uint64_t mmap_len,
295 enum lttng_event_output output,
6df2e2c9
MD
296 const char *path_name,
297 uid_t uid,
00e2e675
DG
298 gid_t gid,
299 int net_index,
300 int metadata_flag)
3bd1e081
MD
301{
302 struct lttng_consumer_stream *stream;
303 int ret;
304
effcf122 305 stream = zmalloc(sizeof(*stream));
3bd1e081
MD
306 if (stream == NULL) {
307 perror("malloc struct lttng_consumer_stream");
308 goto end;
309 }
310 stream->chan = consumer_find_channel(channel_key);
311 if (!stream->chan) {
312 perror("Unable to find channel key");
313 goto end;
314 }
315 stream->chan->refcount++;
316 stream->key = stream_key;
317 stream->shm_fd = shm_fd;
318 stream->wait_fd = wait_fd;
319 stream->out_fd = -1;
320 stream->out_fd_offset = 0;
321 stream->state = state;
322 stream->mmap_len = mmap_len;
323 stream->mmap_base = NULL;
324 stream->output = output;
6df2e2c9
MD
325 stream->uid = uid;
326 stream->gid = gid;
00e2e675
DG
327 stream->net_seq_idx = net_index;
328 stream->metadata_flag = metadata_flag;
329 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
330 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
e4421fec 331 lttng_ht_node_init_ulong(&stream->node, stream->key);
00e2e675 332 lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
3bd1e081
MD
333
334 switch (consumer_data.type) {
335 case LTTNG_CONSUMER_KERNEL:
336 break;
7753dea8
MD
337 case LTTNG_CONSUMER32_UST:
338 case LTTNG_CONSUMER64_UST:
5af2f756 339 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
340 ret = lttng_ustconsumer_allocate_stream(stream);
341 if (ret) {
342 free(stream);
343 return NULL;
344 }
345 break;
346 default:
347 ERR("Unknown consumer_data type");
348 assert(0);
349 goto end;
350 }
00e2e675 351 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
3bd1e081
MD
352 stream->path_name, stream->key,
353 stream->shm_fd,
354 stream->wait_fd,
355 (unsigned long long) stream->mmap_len,
00e2e675
DG
356 stream->out_fd,
357 stream->net_seq_idx);
3bd1e081
MD
358end:
359 return stream;
360}
361
362/*
363 * Add a stream to the global list protected by a mutex.
364 */
365int consumer_add_stream(struct lttng_consumer_stream *stream)
366{
367 int ret = 0;
c77fc10a
DG
368 struct lttng_ht_node_ulong *node;
369 struct lttng_ht_iter iter;
00e2e675 370 struct consumer_relayd_sock_pair *relayd;
3bd1e081
MD
371
372 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
373 /* Steal stream identifier, for UST */
374 consumer_steal_stream_key(stream->key);
c77fc10a 375
b0b335c8 376 rcu_read_lock();
c77fc10a
DG
377 lttng_ht_lookup(consumer_data.stream_ht,
378 (void *)((unsigned long) stream->key), &iter);
379 node = lttng_ht_iter_get_node_ulong(&iter);
380 if (node != NULL) {
381 rcu_read_unlock();
382 /* Stream already exist. Ignore the insertion */
383 goto end;
384 }
385
04253271 386 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
00e2e675
DG
387
388 /* Check and cleanup relayd */
389 relayd = consumer_find_relayd(stream->net_seq_idx);
390 if (relayd != NULL) {
b0b335c8 391 uatomic_inc(&relayd->refcount);
00e2e675 392 }
b0b335c8 393 rcu_read_unlock();
00e2e675
DG
394
395 /* Update consumer data */
3bd1e081
MD
396 consumer_data.stream_count++;
397 consumer_data.need_update = 1;
398
399 switch (consumer_data.type) {
400 case LTTNG_CONSUMER_KERNEL:
401 break;
7753dea8
MD
402 case LTTNG_CONSUMER32_UST:
403 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
404 /* Streams are in CPU number order (we rely on this) */
405 stream->cpu = stream->chan->nr_streams++;
406 break;
407 default:
408 ERR("Unknown consumer_data type");
409 assert(0);
410 goto end;
411 }
412
413end:
414 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 415
3bd1e081
MD
416 return ret;
417}
418
00e2e675
DG
419/*
420 * Add relayd socket to global consumer data hashtable.
421 */
422int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
423{
424 int ret = 0;
425 struct lttng_ht_node_ulong *node;
426 struct lttng_ht_iter iter;
427
428 if (relayd == NULL) {
429 ret = -1;
430 goto end;
431 }
432
433 rcu_read_lock();
434
435 lttng_ht_lookup(consumer_data.relayd_ht,
436 (void *)((unsigned long) relayd->net_seq_idx), &iter);
437 node = lttng_ht_iter_get_node_ulong(&iter);
438 if (node != NULL) {
439 rcu_read_unlock();
440 /* Relayd already exist. Ignore the insertion */
441 goto end;
442 }
443 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
444
445 rcu_read_unlock();
446
447end:
448 return ret;
449}
450
451/*
452 * Allocate and return a consumer relayd socket.
453 */
454struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
455 int net_seq_idx)
456{
457 struct consumer_relayd_sock_pair *obj = NULL;
458
459 /* Negative net sequence index is a failure */
460 if (net_seq_idx < 0) {
461 goto error;
462 }
463
464 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
465 if (obj == NULL) {
466 PERROR("zmalloc relayd sock");
467 goto error;
468 }
469
470 obj->net_seq_idx = net_seq_idx;
471 obj->refcount = 0;
472 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
473 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
474
475error:
476 return obj;
477}
478
479/*
480 * Find a relayd socket pair in the global consumer data.
481 *
482 * Return the object if found else NULL.
b0b335c8
MD
483 * RCU read-side lock must be held across this call and while using the
484 * returned object.
00e2e675
DG
485 */
486struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
487{
488 struct lttng_ht_iter iter;
489 struct lttng_ht_node_ulong *node;
490 struct consumer_relayd_sock_pair *relayd = NULL;
491
492 /* Negative keys are lookup failures */
493 if (key < 0) {
494 goto error;
495 }
496
00e2e675
DG
497 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
498 &iter);
499 node = lttng_ht_iter_get_node_ulong(&iter);
500 if (node != NULL) {
501 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
502 }
503
00e2e675
DG
504error:
505 return relayd;
506}
507
508/*
509 * Handle stream for relayd transmission if the stream applies for network
510 * streaming where the net sequence index is set.
511 *
512 * Return destination file descriptor or negative value on error.
513 */
514int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
515 size_t data_size)
516{
517 int outfd = -1, ret;
518 struct consumer_relayd_sock_pair *relayd;
519 struct lttcomm_relayd_data_hdr data_hdr;
520
521 /* Safety net */
522 assert(stream);
523
524 /* Reset data header */
525 memset(&data_hdr, 0, sizeof(data_hdr));
526
b0b335c8 527 rcu_read_lock();
00e2e675
DG
528 /* Get relayd reference of the stream. */
529 relayd = consumer_find_relayd(stream->net_seq_idx);
530 if (relayd == NULL) {
531 /* Stream is either local or corrupted */
532 goto error;
533 }
534
535 DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
536 if (stream->metadata_flag) {
537 /* Caller MUST acquire the relayd control socket lock */
538 ret = relayd_send_metadata(&relayd->control_sock, data_size);
539 if (ret < 0) {
540 goto error;
541 }
542
543 /* Metadata are always sent on the control socket. */
544 outfd = relayd->control_sock.fd;
545 } else {
546 /* Set header with stream information */
547 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
548 data_hdr.data_size = htobe32(data_size);
549 /* Other fields are zeroed previously */
550
551 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
552 sizeof(data_hdr));
553 if (ret < 0) {
554 goto error;
555 }
556
557 /* Set to go on data socket */
558 outfd = relayd->data_sock.fd;
559 }
560
561error:
b0b335c8 562 rcu_read_unlock();
00e2e675
DG
563 return outfd;
564}
565
3bd1e081
MD
566/*
567 * Update a stream according to what we just received.
568 */
569void consumer_change_stream_state(int stream_key,
570 enum lttng_consumer_stream_state state)
571{
572 struct lttng_consumer_stream *stream;
573
574 pthread_mutex_lock(&consumer_data.lock);
575 stream = consumer_find_stream(stream_key);
576 if (stream) {
577 stream->state = state;
578 }
579 consumer_data.need_update = 1;
580 pthread_mutex_unlock(&consumer_data.lock);
581}
582
702b1ea4
MD
583static
584void consumer_free_channel(struct rcu_head *head)
585{
586 struct lttng_ht_node_ulong *node =
587 caa_container_of(head, struct lttng_ht_node_ulong, head);
588 struct lttng_consumer_channel *channel =
589 caa_container_of(node, struct lttng_consumer_channel, node);
590
591 free(channel);
592}
593
3bd1e081
MD
594/*
595 * Remove a channel from the global list protected by a mutex. This
596 * function is also responsible for freeing its data structures.
597 */
598void consumer_del_channel(struct lttng_consumer_channel *channel)
599{
600 int ret;
e4421fec 601 struct lttng_ht_iter iter;
3bd1e081
MD
602
603 pthread_mutex_lock(&consumer_data.lock);
604
605 switch (consumer_data.type) {
606 case LTTNG_CONSUMER_KERNEL:
607 break;
7753dea8
MD
608 case LTTNG_CONSUMER32_UST:
609 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
610 lttng_ustconsumer_del_channel(channel);
611 break;
612 default:
613 ERR("Unknown consumer_data type");
614 assert(0);
615 goto end;
616 }
617
6065ceec 618 rcu_read_lock();
04253271
MD
619 iter.iter.node = &channel->node.node;
620 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
621 assert(!ret);
6065ceec
DG
622 rcu_read_unlock();
623
3bd1e081
MD
624 if (channel->mmap_base != NULL) {
625 ret = munmap(channel->mmap_base, channel->mmap_len);
626 if (ret != 0) {
627 perror("munmap");
628 }
629 }
b5c5fc29 630 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
4c462e79
MD
631 ret = close(channel->wait_fd);
632 if (ret) {
633 PERROR("close");
634 }
3bd1e081 635 }
2c1dd183 636 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
4c462e79
MD
637 ret = close(channel->shm_fd);
638 if (ret) {
639 PERROR("close");
640 }
3bd1e081 641 }
702b1ea4
MD
642
643 call_rcu(&channel->node.head, consumer_free_channel);
3bd1e081
MD
644end:
645 pthread_mutex_unlock(&consumer_data.lock);
646}
647
648struct lttng_consumer_channel *consumer_allocate_channel(
649 int channel_key,
650 int shm_fd, int wait_fd,
651 uint64_t mmap_len,
652 uint64_t max_sb_size)
653{
654 struct lttng_consumer_channel *channel;
655 int ret;
656
276b26d1 657 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
658 if (channel == NULL) {
659 perror("malloc struct lttng_consumer_channel");
660 goto end;
661 }
662 channel->key = channel_key;
663 channel->shm_fd = shm_fd;
664 channel->wait_fd = wait_fd;
665 channel->mmap_len = mmap_len;
666 channel->max_sb_size = max_sb_size;
667 channel->refcount = 0;
668 channel->nr_streams = 0;
e4421fec 669 lttng_ht_node_init_ulong(&channel->node, channel->key);
3bd1e081
MD
670
671 switch (consumer_data.type) {
672 case LTTNG_CONSUMER_KERNEL:
673 channel->mmap_base = NULL;
674 channel->mmap_len = 0;
675 break;
7753dea8
MD
676 case LTTNG_CONSUMER32_UST:
677 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
678 ret = lttng_ustconsumer_allocate_channel(channel);
679 if (ret) {
680 free(channel);
681 return NULL;
682 }
683 break;
684 default:
685 ERR("Unknown consumer_data type");
686 assert(0);
687 goto end;
688 }
689 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
00e2e675 690 channel->key, channel->shm_fd, channel->wait_fd,
3bd1e081
MD
691 (unsigned long long) channel->mmap_len,
692 (unsigned long long) channel->max_sb_size);
693end:
694 return channel;
695}
696
697/*
698 * Add a channel to the global list protected by a mutex.
699 */
700int consumer_add_channel(struct lttng_consumer_channel *channel)
701{
c77fc10a
DG
702 struct lttng_ht_node_ulong *node;
703 struct lttng_ht_iter iter;
704
3bd1e081 705 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
706 /* Steal channel identifier, for UST */
707 consumer_steal_channel_key(channel->key);
6065ceec 708 rcu_read_lock();
c77fc10a
DG
709
710 lttng_ht_lookup(consumer_data.channel_ht,
711 (void *)((unsigned long) channel->key), &iter);
712 node = lttng_ht_iter_get_node_ulong(&iter);
713 if (node != NULL) {
714 /* Channel already exist. Ignore the insertion */
715 goto end;
716 }
717
04253271 718 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
c77fc10a
DG
719
720end:
6065ceec 721 rcu_read_unlock();
3bd1e081 722 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 723
7ad0a0cb 724 return 0;
3bd1e081
MD
725}
726
727/*
728 * Allocate the pollfd structure and the local view of the out fds to avoid
729 * doing a lookup in the linked list and concurrency issues when writing is
730 * needed. Called with consumer_data.lock held.
731 *
732 * Returns the number of fds in the structures.
733 */
734int consumer_update_poll_array(
735 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
00e2e675
DG
736 struct lttng_consumer_stream **local_stream,
737 struct lttng_ht *metadata_ht)
3bd1e081 738{
3bd1e081 739 int i = 0;
e4421fec
DG
740 struct lttng_ht_iter iter;
741 struct lttng_consumer_stream *stream;
3bd1e081
MD
742
743 DBG("Updating poll fd array");
481d6c57 744 rcu_read_lock();
e4421fec
DG
745 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
746 node.node) {
747 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
3bd1e081
MD
748 continue;
749 }
e4421fec
DG
750 DBG("Active FD %d", stream->wait_fd);
751 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 752 (*pollfd)[i].events = POLLIN | POLLPRI;
00e2e675
DG
753 if (stream->metadata_flag && metadata_ht) {
754 lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node);
755 DBG("Active FD added to metadata hash table");
756 }
e4421fec 757 local_stream[i] = stream;
3bd1e081
MD
758 i++;
759 }
481d6c57 760 rcu_read_unlock();
3bd1e081
MD
761
762 /*
763 * Insert the consumer_poll_pipe at the end of the array and don't
764 * increment i so nb_fd is the number of real FD.
765 */
766 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
509bb1cf 767 (*pollfd)[i].events = POLLIN | POLLPRI;
3bd1e081
MD
768 return i;
769}
770
771/*
772 * Poll on the should_quit pipe and the command socket return -1 on error and
773 * should exit, 0 if data is available on the command socket
774 */
775int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
776{
777 int num_rdy;
778
88f2b785 779restart:
3bd1e081
MD
780 num_rdy = poll(consumer_sockpoll, 2, -1);
781 if (num_rdy == -1) {
88f2b785
MD
782 /*
783 * Restart interrupted system call.
784 */
785 if (errno == EINTR) {
786 goto restart;
787 }
3bd1e081
MD
788 perror("Poll error");
789 goto exit;
790 }
509bb1cf 791 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081
MD
792 DBG("consumer_should_quit wake up");
793 goto exit;
794 }
795 return 0;
796
797exit:
798 return -1;
799}
800
801/*
802 * Set the error socket.
803 */
804void lttng_consumer_set_error_sock(
805 struct lttng_consumer_local_data *ctx, int sock)
806{
807 ctx->consumer_error_socket = sock;
808}
809
810/*
811 * Set the command socket path.
812 */
3bd1e081
MD
813void lttng_consumer_set_command_sock_path(
814 struct lttng_consumer_local_data *ctx, char *sock)
815{
816 ctx->consumer_command_sock_path = sock;
817}
818
819/*
820 * Send return code to the session daemon.
821 * If the socket is not defined, we return 0, it is not a fatal error
822 */
823int lttng_consumer_send_error(
824 struct lttng_consumer_local_data *ctx, int cmd)
825{
826 if (ctx->consumer_error_socket > 0) {
827 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
828 sizeof(enum lttcomm_sessiond_command));
829 }
830
831 return 0;
832}
833
834/*
835 * Close all the tracefiles and stream fds, should be called when all instances
836 * are destroyed.
837 */
838void lttng_consumer_cleanup(void)
839{
e4421fec 840 struct lttng_ht_iter iter;
6065ceec
DG
841 struct lttng_ht_node_ulong *node;
842
843 rcu_read_lock();
3bd1e081
MD
844
845 /*
6065ceec
DG
846 * close all outfd. Called when there are no more threads running (after
847 * joining on the threads), no need to protect list iteration with mutex.
3bd1e081 848 */
6065ceec
DG
849 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
850 node) {
702b1ea4
MD
851 struct lttng_consumer_stream *stream =
852 caa_container_of(node, struct lttng_consumer_stream, node);
853 consumer_del_stream(stream);
3bd1e081 854 }
e4421fec 855
6065ceec
DG
856 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
857 node) {
702b1ea4
MD
858 struct lttng_consumer_channel *channel =
859 caa_container_of(node, struct lttng_consumer_channel, node);
860 consumer_del_channel(channel);
3bd1e081 861 }
6065ceec
DG
862
863 rcu_read_unlock();
d6ce1df2
MD
864
865 lttng_ht_destroy(consumer_data.stream_ht);
866 lttng_ht_destroy(consumer_data.channel_ht);
3bd1e081
MD
867}
868
869/*
870 * Called from signal handler.
871 */
872void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
873{
874 int ret;
875 consumer_quit = 1;
6f94560a
MD
876 do {
877 ret = write(ctx->consumer_should_quit[1], "4", 1);
878 } while (ret < 0 && errno == EINTR);
3bd1e081
MD
879 if (ret < 0) {
880 perror("write consumer quit");
881 }
882}
883
00e2e675
DG
884void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
885 off_t orig_offset)
3bd1e081
MD
886{
887 int outfd = stream->out_fd;
888
889 /*
890 * This does a blocking write-and-wait on any page that belongs to the
891 * subbuffer prior to the one we just wrote.
892 * Don't care about error values, as these are just hints and ways to
893 * limit the amount of page cache used.
894 */
895 if (orig_offset < stream->chan->max_sb_size) {
896 return;
897 }
b9182dd9 898 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
3bd1e081
MD
899 stream->chan->max_sb_size,
900 SYNC_FILE_RANGE_WAIT_BEFORE
901 | SYNC_FILE_RANGE_WRITE
902 | SYNC_FILE_RANGE_WAIT_AFTER);
903 /*
904 * Give hints to the kernel about how we access the file:
905 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
906 * we write it.
907 *
908 * We need to call fadvise again after the file grows because the
909 * kernel does not seem to apply fadvise to non-existing parts of the
910 * file.
911 *
912 * Call fadvise _after_ having waited for the page writeback to
913 * complete because the dirty page writeback semantic is not well
914 * defined. So it can be expected to lead to lower throughput in
915 * streaming.
916 */
917 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
918 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
919}
920
921/*
922 * Initialise the necessary environnement :
923 * - create a new context
924 * - create the poll_pipe
925 * - create the should_quit pipe (for signal handler)
926 * - create the thread pipe (for splice)
927 *
928 * Takes a function pointer as argument, this function is called when data is
929 * available on a buffer. This function is responsible to do the
930 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
931 * buffer configuration and then kernctl_put_next_subbuf at the end.
932 *
933 * Returns a pointer to the new context or NULL on error.
934 */
935struct lttng_consumer_local_data *lttng_consumer_create(
936 enum lttng_consumer_type type,
4078b776 937 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 938 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
939 int (*recv_channel)(struct lttng_consumer_channel *channel),
940 int (*recv_stream)(struct lttng_consumer_stream *stream),
941 int (*update_stream)(int stream_key, uint32_t state))
942{
943 int ret, i;
944 struct lttng_consumer_local_data *ctx;
945
946 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
947 consumer_data.type == type);
948 consumer_data.type = type;
949
effcf122 950 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081
MD
951 if (ctx == NULL) {
952 perror("allocating context");
953 goto error;
954 }
955
956 ctx->consumer_error_socket = -1;
957 /* assign the callbacks */
958 ctx->on_buffer_ready = buffer_ready;
959 ctx->on_recv_channel = recv_channel;
960 ctx->on_recv_stream = recv_stream;
961 ctx->on_update_stream = update_stream;
962
963 ret = pipe(ctx->consumer_poll_pipe);
964 if (ret < 0) {
965 perror("Error creating poll pipe");
966 goto error_poll_pipe;
967 }
968
04fdd819
MD
969 /* set read end of the pipe to non-blocking */
970 ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
971 if (ret < 0) {
972 perror("fcntl O_NONBLOCK");
973 goto error_poll_fcntl;
974 }
975
976 /* set write end of the pipe to non-blocking */
977 ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
978 if (ret < 0) {
979 perror("fcntl O_NONBLOCK");
980 goto error_poll_fcntl;
981 }
982
3bd1e081
MD
983 ret = pipe(ctx->consumer_should_quit);
984 if (ret < 0) {
985 perror("Error creating recv pipe");
986 goto error_quit_pipe;
987 }
988
989 ret = pipe(ctx->consumer_thread_pipe);
990 if (ret < 0) {
991 perror("Error creating thread pipe");
992 goto error_thread_pipe;
993 }
994
995 return ctx;
996
997
998error_thread_pipe:
999 for (i = 0; i < 2; i++) {
1000 int err;
1001
1002 err = close(ctx->consumer_should_quit[i]);
4c462e79
MD
1003 if (err) {
1004 PERROR("close");
1005 }
3bd1e081 1006 }
04fdd819 1007error_poll_fcntl:
3bd1e081
MD
1008error_quit_pipe:
1009 for (i = 0; i < 2; i++) {
1010 int err;
1011
1012 err = close(ctx->consumer_poll_pipe[i]);
4c462e79
MD
1013 if (err) {
1014 PERROR("close");
1015 }
3bd1e081
MD
1016 }
1017error_poll_pipe:
1018 free(ctx);
1019error:
1020 return NULL;
1021}
1022
1023/*
1024 * Close all fds associated with the instance and free the context.
1025 */
1026void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1027{
4c462e79
MD
1028 int ret;
1029
1030 ret = close(ctx->consumer_error_socket);
1031 if (ret) {
1032 PERROR("close");
1033 }
1034 ret = close(ctx->consumer_thread_pipe[0]);
1035 if (ret) {
1036 PERROR("close");
1037 }
1038 ret = close(ctx->consumer_thread_pipe[1]);
1039 if (ret) {
1040 PERROR("close");
1041 }
1042 ret = close(ctx->consumer_poll_pipe[0]);
1043 if (ret) {
1044 PERROR("close");
1045 }
1046 ret = close(ctx->consumer_poll_pipe[1]);
1047 if (ret) {
1048 PERROR("close");
1049 }
1050 ret = close(ctx->consumer_should_quit[0]);
1051 if (ret) {
1052 PERROR("close");
1053 }
1054 ret = close(ctx->consumer_should_quit[1]);
1055 if (ret) {
1056 PERROR("close");
1057 }
3bd1e081
MD
1058 unlink(ctx->consumer_command_sock_path);
1059 free(ctx);
1060}
1061
1062/*
1063 * Mmap the ring buffer, read it and write the data to the tracefile.
1064 *
1065 * Returns the number of bytes written
1066 */
4078b776 1067ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081
MD
1068 struct lttng_consumer_local_data *ctx,
1069 struct lttng_consumer_stream *stream, unsigned long len)
1070{
1071 switch (consumer_data.type) {
1072 case LTTNG_CONSUMER_KERNEL:
1073 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
7753dea8
MD
1074 case LTTNG_CONSUMER32_UST:
1075 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1076 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
1077 default:
1078 ERR("Unknown consumer_data type");
1079 assert(0);
1080 }
b9182dd9
DG
1081
1082 return 0;
3bd1e081
MD
1083}
1084
1085/*
1086 * Splice the data from the ring buffer to the tracefile.
1087 *
1088 * Returns the number of bytes spliced.
1089 */
4078b776 1090ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081
MD
1091 struct lttng_consumer_local_data *ctx,
1092 struct lttng_consumer_stream *stream, unsigned long len)
1093{
1094 switch (consumer_data.type) {
1095 case LTTNG_CONSUMER_KERNEL:
1096 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
7753dea8
MD
1097 case LTTNG_CONSUMER32_UST:
1098 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1099 return -ENOSYS;
1100 default:
1101 ERR("Unknown consumer_data type");
1102 assert(0);
1103 return -ENOSYS;
1104 }
1105
1106}
1107
1108/*
1109 * Take a snapshot for a specific fd
1110 *
1111 * Returns 0 on success, < 0 on error
1112 */
1113int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1114 struct lttng_consumer_stream *stream)
1115{
1116 switch (consumer_data.type) {
1117 case LTTNG_CONSUMER_KERNEL:
1118 return lttng_kconsumer_take_snapshot(ctx, stream);
7753dea8
MD
1119 case LTTNG_CONSUMER32_UST:
1120 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1121 return lttng_ustconsumer_take_snapshot(ctx, stream);
1122 default:
1123 ERR("Unknown consumer_data type");
1124 assert(0);
1125 return -ENOSYS;
1126 }
1127
1128}
1129
1130/*
1131 * Get the produced position
1132 *
1133 * Returns 0 on success, < 0 on error
1134 */
1135int lttng_consumer_get_produced_snapshot(
1136 struct lttng_consumer_local_data *ctx,
1137 struct lttng_consumer_stream *stream,
1138 unsigned long *pos)
1139{
1140 switch (consumer_data.type) {
1141 case LTTNG_CONSUMER_KERNEL:
1142 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
7753dea8
MD
1143 case LTTNG_CONSUMER32_UST:
1144 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1145 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1146 default:
1147 ERR("Unknown consumer_data type");
1148 assert(0);
1149 return -ENOSYS;
1150 }
1151}
1152
1153int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1154 int sock, struct pollfd *consumer_sockpoll)
1155{
1156 switch (consumer_data.type) {
1157 case LTTNG_CONSUMER_KERNEL:
1158 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
1159 case LTTNG_CONSUMER32_UST:
1160 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1161 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1162 default:
1163 ERR("Unknown consumer_data type");
1164 assert(0);
1165 return -ENOSYS;
1166 }
1167}
1168
1169/*
e4421fec 1170 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
1171 * it to tracefile if necessary.
1172 */
1173void *lttng_consumer_thread_poll_fds(void *data)
1174{
1175 int num_rdy, num_hup, high_prio, ret, i;
1176 struct pollfd *pollfd = NULL;
1177 /* local view of the streams */
1178 struct lttng_consumer_stream **local_stream = NULL;
1179 /* local view of consumer_data.fds_count */
1180 int nb_fd = 0;
3bd1e081 1181 struct lttng_consumer_local_data *ctx = data;
00e2e675
DG
1182 struct lttng_ht *metadata_ht;
1183 struct lttng_ht_iter iter;
1184 struct lttng_ht_node_ulong *node;
1185 struct lttng_consumer_stream *metadata_stream;
1186 ssize_t len;
1187
1188 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
3bd1e081 1189
e7b994a3
DG
1190 rcu_register_thread();
1191
effcf122 1192 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
1193
1194 while (1) {
1195 high_prio = 0;
1196 num_hup = 0;
1197
1198 /*
e4421fec 1199 * the fds set has been updated, we need to update our
3bd1e081
MD
1200 * local array as well
1201 */
1202 pthread_mutex_lock(&consumer_data.lock);
1203 if (consumer_data.need_update) {
1204 if (pollfd != NULL) {
1205 free(pollfd);
1206 pollfd = NULL;
1207 }
1208 if (local_stream != NULL) {
1209 free(local_stream);
1210 local_stream = NULL;
1211 }
1212
1213 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 1214 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081
MD
1215 if (pollfd == NULL) {
1216 perror("pollfd malloc");
1217 pthread_mutex_unlock(&consumer_data.lock);
1218 goto end;
1219 }
1220
1221 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 1222 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
1223 sizeof(struct lttng_consumer_stream));
1224 if (local_stream == NULL) {
1225 perror("local_stream malloc");
1226 pthread_mutex_unlock(&consumer_data.lock);
1227 goto end;
1228 }
00e2e675
DG
1229 ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
1230 metadata_ht);
3bd1e081
MD
1231 if (ret < 0) {
1232 ERR("Error in allocating pollfd or local_outfds");
1233 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
1234 pthread_mutex_unlock(&consumer_data.lock);
1235 goto end;
1236 }
1237 nb_fd = ret;
1238 consumer_data.need_update = 0;
1239 }
1240 pthread_mutex_unlock(&consumer_data.lock);
1241
4078b776
MD
1242 /* No FDs and consumer_quit, consumer_cleanup the thread */
1243 if (nb_fd == 0 && consumer_quit == 1) {
1244 goto end;
1245 }
3bd1e081 1246 /* poll on the array of fds */
88f2b785 1247 restart:
3bd1e081
MD
1248 DBG("polling on %d fd", nb_fd + 1);
1249 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
1250 DBG("poll num_rdy : %d", num_rdy);
1251 if (num_rdy == -1) {
88f2b785
MD
1252 /*
1253 * Restart interrupted system call.
1254 */
1255 if (errno == EINTR) {
1256 goto restart;
1257 }
3bd1e081
MD
1258 perror("Poll error");
1259 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
1260 goto end;
1261 } else if (num_rdy == 0) {
1262 DBG("Polling thread timed out");
1263 goto end;
1264 }
1265
3bd1e081 1266 /*
00e2e675
DG
1267 * If the consumer_poll_pipe triggered poll go directly to the
1268 * beginning of the loop to update the array. We want to prioritize
1269 * array update over low-priority reads.
3bd1e081 1270 */
509bb1cf 1271 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
04fdd819
MD
1272 size_t pipe_readlen;
1273 char tmp;
1274
3bd1e081 1275 DBG("consumer_poll_pipe wake up");
04fdd819
MD
1276 /* Consume 1 byte of pipe data */
1277 do {
1278 pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
1279 } while (pipe_readlen == -1 && errno == EINTR);
3bd1e081
MD
1280 continue;
1281 }
1282
1283 /* Take care of high priority channels first. */
1284 for (i = 0; i < nb_fd; i++) {
00e2e675
DG
1285 /* Lookup for metadata which is the highest priority */
1286 lttng_ht_lookup(metadata_ht,
1287 (void *)((unsigned long) pollfd[i].fd), &iter);
1288 node = lttng_ht_iter_get_node_ulong(&iter);
1289 if (node != NULL &&
1290 (pollfd[i].revents & (POLLIN | POLLPRI))) {
1291 DBG("Urgent metadata read on fd %d", pollfd[i].fd);
1292 metadata_stream = caa_container_of(node,
1293 struct lttng_consumer_stream, waitfd_node);
1294 high_prio = 1;
1295 len = ctx->on_buffer_ready(metadata_stream, ctx);
1296 /* it's ok to have an unavailable sub-buffer */
1297 if (len < 0 && len != -EAGAIN) {
1298 goto end;
1299 } else if (len > 0) {
1300 metadata_stream->data_read = 1;
1301 }
1302 } else if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
1303 DBG("Urgent read on fd %d", pollfd[i].fd);
1304 high_prio = 1;
4078b776 1305 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 1306 /* it's ok to have an unavailable sub-buffer */
4078b776
MD
1307 if (len < 0 && len != -EAGAIN) {
1308 goto end;
1309 } else if (len > 0) {
1310 local_stream[i]->data_read = 1;
d41f73b7 1311 }
3bd1e081
MD
1312 }
1313 }
1314
4078b776
MD
1315 /*
1316 * If we read high prio channel in this loop, try again
1317 * for more high prio data.
1318 */
1319 if (high_prio) {
3bd1e081
MD
1320 continue;
1321 }
1322
1323 /* Take care of low priority channels. */
4078b776
MD
1324 for (i = 0; i < nb_fd; i++) {
1325 if ((pollfd[i].revents & POLLIN) ||
1326 local_stream[i]->hangup_flush_done) {
4078b776
MD
1327 DBG("Normal read on fd %d", pollfd[i].fd);
1328 len = ctx->on_buffer_ready(local_stream[i], ctx);
1329 /* it's ok to have an unavailable sub-buffer */
1330 if (len < 0 && len != -EAGAIN) {
1331 goto end;
1332 } else if (len > 0) {
1333 local_stream[i]->data_read = 1;
1334 }
1335 }
1336 }
1337
1338 /* Handle hangup and errors */
1339 for (i = 0; i < nb_fd; i++) {
1340 if (!local_stream[i]->hangup_flush_done
1341 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
1342 && (consumer_data.type == LTTNG_CONSUMER32_UST
1343 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1344 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1345 pollfd[i].fd);
1346 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
1347 /* Attempt read again, for the data we just flushed. */
1348 local_stream[i]->data_read = 1;
1349 }
1350 /*
1351 * If the poll flag is HUP/ERR/NVAL and we have
1352 * read no data in this pass, we can remove the
1353 * stream from its hash table.
1354 */
1355 if ((pollfd[i].revents & POLLHUP)) {
1356 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
1357 if (!local_stream[i]->data_read) {
00e2e675
DG
1358 if (local_stream[i]->metadata_flag) {
1359 iter.iter.node = &local_stream[i]->waitfd_node.node;
1360 ret = lttng_ht_del(metadata_ht, &iter);
1361 assert(!ret);
1362 }
702b1ea4 1363 consumer_del_stream(local_stream[i]);
4078b776
MD
1364 num_hup++;
1365 }
1366 } else if (pollfd[i].revents & POLLERR) {
1367 ERR("Error returned in polling fd %d.", pollfd[i].fd);
1368 if (!local_stream[i]->data_read) {
00e2e675
DG
1369 if (local_stream[i]->metadata_flag) {
1370 iter.iter.node = &local_stream[i]->waitfd_node.node;
1371 ret = lttng_ht_del(metadata_ht, &iter);
1372 assert(!ret);
1373 }
702b1ea4 1374 consumer_del_stream(local_stream[i]);
4078b776
MD
1375 num_hup++;
1376 }
1377 } else if (pollfd[i].revents & POLLNVAL) {
1378 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
1379 if (!local_stream[i]->data_read) {
00e2e675
DG
1380 if (local_stream[i]->metadata_flag) {
1381 iter.iter.node = &local_stream[i]->waitfd_node.node;
1382 ret = lttng_ht_del(metadata_ht, &iter);
1383 assert(!ret);
1384 }
702b1ea4 1385 consumer_del_stream(local_stream[i]);
4078b776 1386 num_hup++;
3bd1e081
MD
1387 }
1388 }
4078b776 1389 local_stream[i]->data_read = 0;
3bd1e081
MD
1390 }
1391 }
1392end:
1393 DBG("polling thread exiting");
1394 if (pollfd != NULL) {
1395 free(pollfd);
1396 pollfd = NULL;
1397 }
1398 if (local_stream != NULL) {
1399 free(local_stream);
1400 local_stream = NULL;
1401 }
e7b994a3 1402 rcu_unregister_thread();
3bd1e081
MD
1403 return NULL;
1404}
1405
1406/*
1407 * This thread listens on the consumerd socket and receives the file
1408 * descriptors from the session daemon.
1409 */
1410void *lttng_consumer_thread_receive_fds(void *data)
1411{
1412 int sock, client_socket, ret;
1413 /*
1414 * structure to poll for incoming data on communication socket avoids
1415 * making blocking sockets.
1416 */
1417 struct pollfd consumer_sockpoll[2];
1418 struct lttng_consumer_local_data *ctx = data;
1419
e7b994a3
DG
1420 rcu_register_thread();
1421
3bd1e081
MD
1422 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
1423 unlink(ctx->consumer_command_sock_path);
1424 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
1425 if (client_socket < 0) {
1426 ERR("Cannot create command socket");
1427 goto end;
1428 }
1429
1430 ret = lttcomm_listen_unix_sock(client_socket);
1431 if (ret < 0) {
1432 goto end;
1433 }
1434
32258573 1435 DBG("Sending ready command to lttng-sessiond");
3bd1e081
MD
1436 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
1437 /* return < 0 on error, but == 0 is not fatal */
1438 if (ret < 0) {
32258573 1439 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
1440 goto end;
1441 }
1442
1443 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
1444 if (ret < 0) {
1445 perror("fcntl O_NONBLOCK");
1446 goto end;
1447 }
1448
1449 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1450 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
1451 consumer_sockpoll[0].events = POLLIN | POLLPRI;
1452 consumer_sockpoll[1].fd = client_socket;
1453 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1454
1455 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1456 goto end;
1457 }
1458 DBG("Connection on client_socket");
1459
1460 /* Blocking call, waiting for transmission */
1461 sock = lttcomm_accept_unix_sock(client_socket);
1462 if (sock <= 0) {
1463 WARN("On accept");
1464 goto end;
1465 }
1466 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
1467 if (ret < 0) {
1468 perror("fcntl O_NONBLOCK");
1469 goto end;
1470 }
1471
1472 /* update the polling structure to poll on the established socket */
1473 consumer_sockpoll[1].fd = sock;
1474 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1475
1476 while (1) {
1477 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1478 goto end;
1479 }
1480 DBG("Incoming command on sock");
1481 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1482 if (ret == -ENOENT) {
1483 DBG("Received STOP command");
1484 goto end;
1485 }
1486 if (ret < 0) {
1487 ERR("Communication interrupted on command socket");
1488 goto end;
1489 }
1490 if (consumer_quit) {
1491 DBG("consumer_thread_receive_fds received quit from signal");
1492 goto end;
1493 }
1494 DBG("received fds on sock");
1495 }
1496end:
1497 DBG("consumer_thread_receive_fds exiting");
1498
1499 /*
1500 * when all fds have hung up, the polling thread
1501 * can exit cleanly
1502 */
1503 consumer_quit = 1;
1504
1505 /*
1506 * 2s of grace period, if no polling events occur during
1507 * this period, the polling thread will exit even if there
1508 * are still open FDs (should not happen, but safety mechanism).
1509 */
1510 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1511
04fdd819
MD
1512 /*
1513 * Wake-up the other end by writing a null byte in the pipe
1514 * (non-blocking). Important note: Because writing into the
1515 * pipe is non-blocking (and therefore we allow dropping wakeup
1516 * data, as long as there is wakeup data present in the pipe
1517 * buffer to wake up the other end), the other end should
1518 * perform the following sequence for waiting:
1519 * 1) empty the pipe (reads).
1520 * 2) perform update operation.
1521 * 3) wait on the pipe (poll).
1522 */
1523 do {
1524 ret = write(ctx->consumer_poll_pipe[1], "", 1);
6f94560a 1525 } while (ret < 0 && errno == EINTR);
e7b994a3 1526 rcu_unregister_thread();
3bd1e081
MD
1527 return NULL;
1528}
d41f73b7 1529
4078b776 1530ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
1531 struct lttng_consumer_local_data *ctx)
1532{
1533 switch (consumer_data.type) {
1534 case LTTNG_CONSUMER_KERNEL:
1535 return lttng_kconsumer_read_subbuffer(stream, ctx);
7753dea8
MD
1536 case LTTNG_CONSUMER32_UST:
1537 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1538 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1539 default:
1540 ERR("Unknown consumer_data type");
1541 assert(0);
1542 return -ENOSYS;
1543 }
1544}
1545
1546int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1547{
1548 switch (consumer_data.type) {
1549 case LTTNG_CONSUMER_KERNEL:
1550 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
1551 case LTTNG_CONSUMER32_UST:
1552 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1553 return lttng_ustconsumer_on_recv_stream(stream);
1554 default:
1555 ERR("Unknown consumer_data type");
1556 assert(0);
1557 return -ENOSYS;
1558 }
1559}
e4421fec
DG
1560
1561/*
1562 * Allocate and set consumer data hash tables.
1563 */
1564void lttng_consumer_init(void)
1565{
1566 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1567 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675 1568 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
e4421fec 1569}
This page took 0.164374 seconds and 4 git commands to generate.