Update coding style. Add error handling section
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
00e2e675 4 * 2012 - David Goulet <dgoulet@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081 9 *
d14d33bf
AM
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
3bd1e081 14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
77c7c900 30#include <inttypes.h>
3bd1e081 31
990570ed 32#include <common/common.h>
fb3a43a9
DG
33#include <common/utils.h>
34#include <common/compat/poll.h>
10a8a223 35#include <common/kernel-ctl/kernel-ctl.h>
00e2e675 36#include <common/sessiond-comm/relayd.h>
10a8a223
DG
37#include <common/sessiond-comm/sessiond-comm.h>
38#include <common/kernel-consumer/kernel-consumer.h>
00e2e675 39#include <common/relayd/relayd.h>
10a8a223
DG
40#include <common/ust-consumer/ust-consumer.h>
41
42#include "consumer.h"
3bd1e081
MD
43
44struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
45 .stream_count = 0,
46 .need_update = 1,
47 .type = LTTNG_CONSUMER_UNKNOWN,
48};
49
50/* timeout parameter, to control the polling thread grace period. */
51int consumer_poll_timeout = -1;
52
53/*
54 * Flag to inform the polling thread to quit when all fd hung up. Updated by
55 * the consumer_thread_receive_fds when it notices that all fds has hung up.
56 * Also updated by the signal handler (consumer_should_exit()). Read by the
57 * polling threads.
58 */
59volatile int consumer_quit = 0;
60
61/*
62 * Find a stream. The consumer_data.lock must be locked during this
63 * call.
64 */
65static struct lttng_consumer_stream *consumer_find_stream(int key)
66{
e4421fec
DG
67 struct lttng_ht_iter iter;
68 struct lttng_ht_node_ulong *node;
69 struct lttng_consumer_stream *stream = NULL;
3bd1e081 70
7ad0a0cb
MD
71 /* Negative keys are lookup failures */
72 if (key < 0)
73 return NULL;
e4421fec 74
6065ceec
DG
75 rcu_read_lock();
76
e4421fec
DG
77 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
78 &iter);
79 node = lttng_ht_iter_get_node_ulong(&iter);
80 if (node != NULL) {
81 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 82 }
e4421fec 83
6065ceec
DG
84 rcu_read_unlock();
85
e4421fec 86 return stream;
3bd1e081
MD
87}
88
7ad0a0cb
MD
89static void consumer_steal_stream_key(int key)
90{
91 struct lttng_consumer_stream *stream;
92
04253271 93 rcu_read_lock();
7ad0a0cb 94 stream = consumer_find_stream(key);
04253271 95 if (stream) {
7ad0a0cb 96 stream->key = -1;
04253271
MD
97 /*
98 * We don't want the lookup to match, but we still need
99 * to iterate on this stream when iterating over the hash table. Just
100 * change the node key.
101 */
102 stream->node.key = -1;
103 }
104 rcu_read_unlock();
7ad0a0cb
MD
105}
106
3bd1e081
MD
107static struct lttng_consumer_channel *consumer_find_channel(int key)
108{
e4421fec
DG
109 struct lttng_ht_iter iter;
110 struct lttng_ht_node_ulong *node;
111 struct lttng_consumer_channel *channel = NULL;
3bd1e081 112
7ad0a0cb
MD
113 /* Negative keys are lookup failures */
114 if (key < 0)
115 return NULL;
e4421fec 116
6065ceec
DG
117 rcu_read_lock();
118
e4421fec
DG
119 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
120 &iter);
121 node = lttng_ht_iter_get_node_ulong(&iter);
122 if (node != NULL) {
123 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 124 }
e4421fec 125
6065ceec
DG
126 rcu_read_unlock();
127
e4421fec 128 return channel;
3bd1e081
MD
129}
130
7ad0a0cb
MD
131static void consumer_steal_channel_key(int key)
132{
133 struct lttng_consumer_channel *channel;
134
04253271 135 rcu_read_lock();
7ad0a0cb 136 channel = consumer_find_channel(key);
04253271 137 if (channel) {
7ad0a0cb 138 channel->key = -1;
04253271
MD
139 /*
140 * We don't want the lookup to match, but we still need
141 * to iterate on this channel when iterating over the hash table. Just
142 * change the node key.
143 */
144 channel->node.key = -1;
145 }
146 rcu_read_unlock();
7ad0a0cb
MD
147}
148
702b1ea4
MD
149static
150void consumer_free_stream(struct rcu_head *head)
151{
152 struct lttng_ht_node_ulong *node =
153 caa_container_of(head, struct lttng_ht_node_ulong, head);
154 struct lttng_consumer_stream *stream =
155 caa_container_of(node, struct lttng_consumer_stream, node);
156
157 free(stream);
158}
159
00e2e675
DG
160/*
161 * RCU protected relayd socket pair free.
162 */
163static void consumer_rcu_free_relayd(struct rcu_head *head)
164{
165 struct lttng_ht_node_ulong *node =
166 caa_container_of(head, struct lttng_ht_node_ulong, head);
167 struct consumer_relayd_sock_pair *relayd =
168 caa_container_of(node, struct consumer_relayd_sock_pair, node);
169
170 free(relayd);
171}
172
173/*
174 * Destroy and free relayd socket pair object.
175 *
176 * This function MUST be called with the consumer_data lock acquired.
177 */
178void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
179{
180 int ret;
181 struct lttng_ht_iter iter;
182
173af62f
DG
183 if (relayd == NULL) {
184 return;
185 }
186
00e2e675
DG
187 DBG("Consumer destroy and close relayd socket pair");
188
189 iter.iter.node = &relayd->node.node;
190 ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
173af62f
DG
191 if (ret != 0) {
192 /* We assume the relayd was already destroyed */
193 return;
194 }
00e2e675
DG
195
196 /* Close all sockets */
197 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
198 (void) relayd_close(&relayd->control_sock);
199 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
200 (void) relayd_close(&relayd->data_sock);
201
202 /* RCU free() call */
203 call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
204}
205
a6ba4fe1
DG
206/*
207 * Flag a relayd socket pair for destruction. Destroy it if the refcount
208 * reaches zero.
209 *
210 * RCU read side lock MUST be aquired before calling this function.
211 */
212void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
213{
214 assert(relayd);
215
216 /* Set destroy flag for this object */
217 uatomic_set(&relayd->destroy_flag, 1);
218
219 /* Destroy the relayd if refcount is 0 */
220 if (uatomic_read(&relayd->refcount) == 0) {
221 consumer_destroy_relayd(relayd);
222 }
223}
224
3bd1e081
MD
225/*
226 * Remove a stream from the global list protected by a mutex. This
227 * function is also responsible for freeing its data structures.
228 */
229void consumer_del_stream(struct lttng_consumer_stream *stream)
230{
231 int ret;
e4421fec 232 struct lttng_ht_iter iter;
3bd1e081 233 struct lttng_consumer_channel *free_chan = NULL;
00e2e675
DG
234 struct consumer_relayd_sock_pair *relayd;
235
236 assert(stream);
3bd1e081
MD
237
238 pthread_mutex_lock(&consumer_data.lock);
239
240 switch (consumer_data.type) {
241 case LTTNG_CONSUMER_KERNEL:
242 if (stream->mmap_base != NULL) {
243 ret = munmap(stream->mmap_base, stream->mmap_len);
244 if (ret != 0) {
245 perror("munmap");
246 }
247 }
248 break;
7753dea8
MD
249 case LTTNG_CONSUMER32_UST:
250 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
251 lttng_ustconsumer_del_stream(stream);
252 break;
253 default:
254 ERR("Unknown consumer_data type");
255 assert(0);
256 goto end;
257 }
258
6065ceec 259 rcu_read_lock();
04253271
MD
260 iter.iter.node = &stream->node.node;
261 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
262 assert(!ret);
e4421fec 263
6065ceec
DG
264 rcu_read_unlock();
265
3bd1e081
MD
266 if (consumer_data.stream_count <= 0) {
267 goto end;
268 }
269 consumer_data.stream_count--;
270 if (!stream) {
271 goto end;
272 }
273 if (stream->out_fd >= 0) {
4c462e79
MD
274 ret = close(stream->out_fd);
275 if (ret) {
276 PERROR("close");
277 }
3bd1e081 278 }
b5c5fc29 279 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
4c462e79
MD
280 ret = close(stream->wait_fd);
281 if (ret) {
282 PERROR("close");
283 }
3bd1e081 284 }
2c1dd183 285 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
4c462e79
MD
286 ret = close(stream->shm_fd);
287 if (ret) {
288 PERROR("close");
289 }
3bd1e081 290 }
00e2e675
DG
291
292 /* Check and cleanup relayd */
b0b335c8 293 rcu_read_lock();
00e2e675
DG
294 relayd = consumer_find_relayd(stream->net_seq_idx);
295 if (relayd != NULL) {
b0b335c8
MD
296 uatomic_dec(&relayd->refcount);
297 assert(uatomic_read(&relayd->refcount) >= 0);
173af62f 298
3f8e211f
DG
299 /* Closing streams requires to lock the control socket. */
300 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
173af62f
DG
301 ret = relayd_send_close_stream(&relayd->control_sock,
302 stream->relayd_stream_id,
303 stream->next_net_seq_num - 1);
3f8e211f 304 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
173af62f 305 if (ret < 0) {
a4b92340
DG
306 DBG("Unable to close stream on the relayd. Continuing");
307 /*
308 * Continue here. There is nothing we can do for the relayd.
309 * Chances are that the relayd has closed the socket so we just
310 * continue cleaning up.
311 */
173af62f
DG
312 }
313
314 /* Both conditions are met, we destroy the relayd. */
315 if (uatomic_read(&relayd->refcount) == 0 &&
316 uatomic_read(&relayd->destroy_flag)) {
00e2e675
DG
317 consumer_destroy_relayd(relayd);
318 }
00e2e675 319 }
b0b335c8 320 rcu_read_unlock();
00e2e675
DG
321
322 if (!--stream->chan->refcount) {
3bd1e081 323 free_chan = stream->chan;
00e2e675
DG
324 }
325
702b1ea4
MD
326
327 call_rcu(&stream->node.head, consumer_free_stream);
3bd1e081
MD
328end:
329 consumer_data.need_update = 1;
330 pthread_mutex_unlock(&consumer_data.lock);
331
332 if (free_chan)
333 consumer_del_channel(free_chan);
334}
335
336struct lttng_consumer_stream *consumer_allocate_stream(
337 int channel_key, int stream_key,
338 int shm_fd, int wait_fd,
339 enum lttng_consumer_stream_state state,
340 uint64_t mmap_len,
341 enum lttng_event_output output,
6df2e2c9
MD
342 const char *path_name,
343 uid_t uid,
00e2e675
DG
344 gid_t gid,
345 int net_index,
346 int metadata_flag)
3bd1e081
MD
347{
348 struct lttng_consumer_stream *stream;
349 int ret;
350
effcf122 351 stream = zmalloc(sizeof(*stream));
3bd1e081
MD
352 if (stream == NULL) {
353 perror("malloc struct lttng_consumer_stream");
354 goto end;
355 }
356 stream->chan = consumer_find_channel(channel_key);
357 if (!stream->chan) {
358 perror("Unable to find channel key");
359 goto end;
360 }
361 stream->chan->refcount++;
362 stream->key = stream_key;
363 stream->shm_fd = shm_fd;
364 stream->wait_fd = wait_fd;
365 stream->out_fd = -1;
366 stream->out_fd_offset = 0;
367 stream->state = state;
368 stream->mmap_len = mmap_len;
369 stream->mmap_base = NULL;
370 stream->output = output;
6df2e2c9
MD
371 stream->uid = uid;
372 stream->gid = gid;
00e2e675
DG
373 stream->net_seq_idx = net_index;
374 stream->metadata_flag = metadata_flag;
375 strncpy(stream->path_name, path_name, sizeof(stream->path_name));
376 stream->path_name[sizeof(stream->path_name) - 1] = '\0';
e4421fec 377 lttng_ht_node_init_ulong(&stream->node, stream->key);
00e2e675 378 lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
3bd1e081
MD
379
380 switch (consumer_data.type) {
381 case LTTNG_CONSUMER_KERNEL:
382 break;
7753dea8
MD
383 case LTTNG_CONSUMER32_UST:
384 case LTTNG_CONSUMER64_UST:
5af2f756 385 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
386 ret = lttng_ustconsumer_allocate_stream(stream);
387 if (ret) {
388 free(stream);
389 return NULL;
390 }
391 break;
392 default:
393 ERR("Unknown consumer_data type");
394 assert(0);
395 goto end;
396 }
00e2e675 397 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
3bd1e081
MD
398 stream->path_name, stream->key,
399 stream->shm_fd,
400 stream->wait_fd,
401 (unsigned long long) stream->mmap_len,
00e2e675
DG
402 stream->out_fd,
403 stream->net_seq_idx);
3bd1e081
MD
404end:
405 return stream;
406}
407
408/*
409 * Add a stream to the global list protected by a mutex.
410 */
411int consumer_add_stream(struct lttng_consumer_stream *stream)
412{
413 int ret = 0;
c77fc10a
DG
414 struct lttng_ht_node_ulong *node;
415 struct lttng_ht_iter iter;
00e2e675 416 struct consumer_relayd_sock_pair *relayd;
3bd1e081
MD
417
418 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
419 /* Steal stream identifier, for UST */
420 consumer_steal_stream_key(stream->key);
c77fc10a 421
b0b335c8 422 rcu_read_lock();
c77fc10a
DG
423 lttng_ht_lookup(consumer_data.stream_ht,
424 (void *)((unsigned long) stream->key), &iter);
425 node = lttng_ht_iter_get_node_ulong(&iter);
426 if (node != NULL) {
427 rcu_read_unlock();
428 /* Stream already exist. Ignore the insertion */
429 goto end;
430 }
431
04253271 432 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
00e2e675
DG
433
434 /* Check and cleanup relayd */
435 relayd = consumer_find_relayd(stream->net_seq_idx);
436 if (relayd != NULL) {
b0b335c8 437 uatomic_inc(&relayd->refcount);
00e2e675 438 }
b0b335c8 439 rcu_read_unlock();
00e2e675
DG
440
441 /* Update consumer data */
3bd1e081
MD
442 consumer_data.stream_count++;
443 consumer_data.need_update = 1;
444
3bd1e081
MD
445end:
446 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 447
3bd1e081
MD
448 return ret;
449}
450
00e2e675 451/*
3f8e211f
DG
452 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
453 * be acquired before calling this.
00e2e675 454 */
3f8e211f 455
00e2e675
DG
456int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
457{
458 int ret = 0;
459 struct lttng_ht_node_ulong *node;
460 struct lttng_ht_iter iter;
461
462 if (relayd == NULL) {
463 ret = -1;
464 goto end;
465 }
466
00e2e675
DG
467 lttng_ht_lookup(consumer_data.relayd_ht,
468 (void *)((unsigned long) relayd->net_seq_idx), &iter);
469 node = lttng_ht_iter_get_node_ulong(&iter);
470 if (node != NULL) {
00e2e675
DG
471 /* Relayd already exist. Ignore the insertion */
472 goto end;
473 }
474 lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
475
00e2e675
DG
476end:
477 return ret;
478}
479
480/*
481 * Allocate and return a consumer relayd socket.
482 */
483struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
484 int net_seq_idx)
485{
486 struct consumer_relayd_sock_pair *obj = NULL;
487
488 /* Negative net sequence index is a failure */
489 if (net_seq_idx < 0) {
490 goto error;
491 }
492
493 obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
494 if (obj == NULL) {
495 PERROR("zmalloc relayd sock");
496 goto error;
497 }
498
499 obj->net_seq_idx = net_seq_idx;
500 obj->refcount = 0;
173af62f 501 obj->destroy_flag = 0;
00e2e675
DG
502 lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
503 pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
504
505error:
506 return obj;
507}
508
509/*
510 * Find a relayd socket pair in the global consumer data.
511 *
512 * Return the object if found else NULL.
b0b335c8
MD
513 * RCU read-side lock must be held across this call and while using the
514 * returned object.
00e2e675
DG
515 */
516struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
517{
518 struct lttng_ht_iter iter;
519 struct lttng_ht_node_ulong *node;
520 struct consumer_relayd_sock_pair *relayd = NULL;
521
522 /* Negative keys are lookup failures */
523 if (key < 0) {
524 goto error;
525 }
526
00e2e675
DG
527 lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
528 &iter);
529 node = lttng_ht_iter_get_node_ulong(&iter);
530 if (node != NULL) {
531 relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
532 }
533
00e2e675
DG
534error:
535 return relayd;
536}
537
538/*
539 * Handle stream for relayd transmission if the stream applies for network
540 * streaming where the net sequence index is set.
541 *
542 * Return destination file descriptor or negative value on error.
543 */
6197aea7
DG
544static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
545 size_t data_size, struct consumer_relayd_sock_pair *relayd)
00e2e675
DG
546{
547 int outfd = -1, ret;
00e2e675
DG
548 struct lttcomm_relayd_data_hdr data_hdr;
549
550 /* Safety net */
551 assert(stream);
6197aea7 552 assert(relayd);
00e2e675
DG
553
554 /* Reset data header */
555 memset(&data_hdr, 0, sizeof(data_hdr));
556
00e2e675
DG
557 if (stream->metadata_flag) {
558 /* Caller MUST acquire the relayd control socket lock */
559 ret = relayd_send_metadata(&relayd->control_sock, data_size);
560 if (ret < 0) {
561 goto error;
562 }
563
564 /* Metadata are always sent on the control socket. */
565 outfd = relayd->control_sock.fd;
566 } else {
567 /* Set header with stream information */
568 data_hdr.stream_id = htobe64(stream->relayd_stream_id);
569 data_hdr.data_size = htobe32(data_size);
173af62f 570 data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
00e2e675
DG
571 /* Other fields are zeroed previously */
572
573 ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
574 sizeof(data_hdr));
575 if (ret < 0) {
576 goto error;
577 }
578
579 /* Set to go on data socket */
580 outfd = relayd->data_sock.fd;
581 }
582
583error:
584 return outfd;
585}
586
3bd1e081
MD
587/*
588 * Update a stream according to what we just received.
589 */
590void consumer_change_stream_state(int stream_key,
591 enum lttng_consumer_stream_state state)
592{
593 struct lttng_consumer_stream *stream;
594
595 pthread_mutex_lock(&consumer_data.lock);
596 stream = consumer_find_stream(stream_key);
597 if (stream) {
598 stream->state = state;
599 }
600 consumer_data.need_update = 1;
601 pthread_mutex_unlock(&consumer_data.lock);
602}
603
702b1ea4
MD
604static
605void consumer_free_channel(struct rcu_head *head)
606{
607 struct lttng_ht_node_ulong *node =
608 caa_container_of(head, struct lttng_ht_node_ulong, head);
609 struct lttng_consumer_channel *channel =
610 caa_container_of(node, struct lttng_consumer_channel, node);
611
612 free(channel);
613}
614
3bd1e081
MD
615/*
616 * Remove a channel from the global list protected by a mutex. This
617 * function is also responsible for freeing its data structures.
618 */
619void consumer_del_channel(struct lttng_consumer_channel *channel)
620{
621 int ret;
e4421fec 622 struct lttng_ht_iter iter;
3bd1e081
MD
623
624 pthread_mutex_lock(&consumer_data.lock);
625
626 switch (consumer_data.type) {
627 case LTTNG_CONSUMER_KERNEL:
628 break;
7753dea8
MD
629 case LTTNG_CONSUMER32_UST:
630 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
631 lttng_ustconsumer_del_channel(channel);
632 break;
633 default:
634 ERR("Unknown consumer_data type");
635 assert(0);
636 goto end;
637 }
638
6065ceec 639 rcu_read_lock();
04253271
MD
640 iter.iter.node = &channel->node.node;
641 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
642 assert(!ret);
6065ceec
DG
643 rcu_read_unlock();
644
3bd1e081
MD
645 if (channel->mmap_base != NULL) {
646 ret = munmap(channel->mmap_base, channel->mmap_len);
647 if (ret != 0) {
648 perror("munmap");
649 }
650 }
b5c5fc29 651 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
4c462e79
MD
652 ret = close(channel->wait_fd);
653 if (ret) {
654 PERROR("close");
655 }
3bd1e081 656 }
2c1dd183 657 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
4c462e79
MD
658 ret = close(channel->shm_fd);
659 if (ret) {
660 PERROR("close");
661 }
3bd1e081 662 }
702b1ea4
MD
663
664 call_rcu(&channel->node.head, consumer_free_channel);
3bd1e081
MD
665end:
666 pthread_mutex_unlock(&consumer_data.lock);
667}
668
669struct lttng_consumer_channel *consumer_allocate_channel(
670 int channel_key,
671 int shm_fd, int wait_fd,
672 uint64_t mmap_len,
673 uint64_t max_sb_size)
674{
675 struct lttng_consumer_channel *channel;
676 int ret;
677
276b26d1 678 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
679 if (channel == NULL) {
680 perror("malloc struct lttng_consumer_channel");
681 goto end;
682 }
683 channel->key = channel_key;
684 channel->shm_fd = shm_fd;
685 channel->wait_fd = wait_fd;
686 channel->mmap_len = mmap_len;
687 channel->max_sb_size = max_sb_size;
688 channel->refcount = 0;
e4421fec 689 lttng_ht_node_init_ulong(&channel->node, channel->key);
3bd1e081
MD
690
691 switch (consumer_data.type) {
692 case LTTNG_CONSUMER_KERNEL:
693 channel->mmap_base = NULL;
694 channel->mmap_len = 0;
695 break;
7753dea8
MD
696 case LTTNG_CONSUMER32_UST:
697 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
698 ret = lttng_ustconsumer_allocate_channel(channel);
699 if (ret) {
700 free(channel);
701 return NULL;
702 }
703 break;
704 default:
705 ERR("Unknown consumer_data type");
706 assert(0);
707 goto end;
708 }
709 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
00e2e675 710 channel->key, channel->shm_fd, channel->wait_fd,
3bd1e081
MD
711 (unsigned long long) channel->mmap_len,
712 (unsigned long long) channel->max_sb_size);
713end:
714 return channel;
715}
716
717/*
718 * Add a channel to the global list protected by a mutex.
719 */
720int consumer_add_channel(struct lttng_consumer_channel *channel)
721{
c77fc10a
DG
722 struct lttng_ht_node_ulong *node;
723 struct lttng_ht_iter iter;
724
3bd1e081 725 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
726 /* Steal channel identifier, for UST */
727 consumer_steal_channel_key(channel->key);
6065ceec 728 rcu_read_lock();
c77fc10a
DG
729
730 lttng_ht_lookup(consumer_data.channel_ht,
731 (void *)((unsigned long) channel->key), &iter);
732 node = lttng_ht_iter_get_node_ulong(&iter);
733 if (node != NULL) {
734 /* Channel already exist. Ignore the insertion */
735 goto end;
736 }
737
04253271 738 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
c77fc10a
DG
739
740end:
6065ceec 741 rcu_read_unlock();
3bd1e081 742 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 743
7ad0a0cb 744 return 0;
3bd1e081
MD
745}
746
747/*
748 * Allocate the pollfd structure and the local view of the out fds to avoid
749 * doing a lookup in the linked list and concurrency issues when writing is
750 * needed. Called with consumer_data.lock held.
751 *
752 * Returns the number of fds in the structures.
753 */
754int consumer_update_poll_array(
755 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
fb3a43a9 756 struct lttng_consumer_stream **local_stream)
3bd1e081 757{
3bd1e081 758 int i = 0;
e4421fec
DG
759 struct lttng_ht_iter iter;
760 struct lttng_consumer_stream *stream;
3bd1e081
MD
761
762 DBG("Updating poll fd array");
481d6c57 763 rcu_read_lock();
e4421fec
DG
764 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
765 node.node) {
766 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
3bd1e081
MD
767 continue;
768 }
e4421fec
DG
769 DBG("Active FD %d", stream->wait_fd);
770 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 771 (*pollfd)[i].events = POLLIN | POLLPRI;
e4421fec 772 local_stream[i] = stream;
3bd1e081
MD
773 i++;
774 }
481d6c57 775 rcu_read_unlock();
3bd1e081
MD
776
777 /*
778 * Insert the consumer_poll_pipe at the end of the array and don't
779 * increment i so nb_fd is the number of real FD.
780 */
781 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
509bb1cf 782 (*pollfd)[i].events = POLLIN | POLLPRI;
3bd1e081
MD
783 return i;
784}
785
786/*
787 * Poll on the should_quit pipe and the command socket return -1 on error and
788 * should exit, 0 if data is available on the command socket
789 */
790int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
791{
792 int num_rdy;
793
88f2b785 794restart:
3bd1e081
MD
795 num_rdy = poll(consumer_sockpoll, 2, -1);
796 if (num_rdy == -1) {
88f2b785
MD
797 /*
798 * Restart interrupted system call.
799 */
800 if (errno == EINTR) {
801 goto restart;
802 }
3bd1e081
MD
803 perror("Poll error");
804 goto exit;
805 }
509bb1cf 806 if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
3bd1e081
MD
807 DBG("consumer_should_quit wake up");
808 goto exit;
809 }
810 return 0;
811
812exit:
813 return -1;
814}
815
816/*
817 * Set the error socket.
818 */
819void lttng_consumer_set_error_sock(
820 struct lttng_consumer_local_data *ctx, int sock)
821{
822 ctx->consumer_error_socket = sock;
823}
824
825/*
826 * Set the command socket path.
827 */
3bd1e081
MD
828void lttng_consumer_set_command_sock_path(
829 struct lttng_consumer_local_data *ctx, char *sock)
830{
831 ctx->consumer_command_sock_path = sock;
832}
833
834/*
835 * Send return code to the session daemon.
836 * If the socket is not defined, we return 0, it is not a fatal error
837 */
838int lttng_consumer_send_error(
839 struct lttng_consumer_local_data *ctx, int cmd)
840{
841 if (ctx->consumer_error_socket > 0) {
842 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
843 sizeof(enum lttcomm_sessiond_command));
844 }
845
846 return 0;
847}
848
849/*
850 * Close all the tracefiles and stream fds, should be called when all instances
851 * are destroyed.
852 */
853void lttng_consumer_cleanup(void)
854{
e4421fec 855 struct lttng_ht_iter iter;
6065ceec
DG
856 struct lttng_ht_node_ulong *node;
857
858 rcu_read_lock();
3bd1e081
MD
859
860 /*
6065ceec
DG
861 * close all outfd. Called when there are no more threads running (after
862 * joining on the threads), no need to protect list iteration with mutex.
3bd1e081 863 */
6065ceec
DG
864 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
865 node) {
702b1ea4
MD
866 struct lttng_consumer_stream *stream =
867 caa_container_of(node, struct lttng_consumer_stream, node);
868 consumer_del_stream(stream);
3bd1e081 869 }
e4421fec 870
6065ceec
DG
871 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
872 node) {
702b1ea4
MD
873 struct lttng_consumer_channel *channel =
874 caa_container_of(node, struct lttng_consumer_channel, node);
875 consumer_del_channel(channel);
3bd1e081 876 }
6065ceec
DG
877
878 rcu_read_unlock();
d6ce1df2
MD
879
880 lttng_ht_destroy(consumer_data.stream_ht);
881 lttng_ht_destroy(consumer_data.channel_ht);
3bd1e081
MD
882}
883
884/*
885 * Called from signal handler.
886 */
887void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
888{
889 int ret;
890 consumer_quit = 1;
6f94560a
MD
891 do {
892 ret = write(ctx->consumer_should_quit[1], "4", 1);
893 } while (ret < 0 && errno == EINTR);
3bd1e081
MD
894 if (ret < 0) {
895 perror("write consumer quit");
896 }
897}
898
00e2e675
DG
899void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
900 off_t orig_offset)
3bd1e081
MD
901{
902 int outfd = stream->out_fd;
903
904 /*
905 * This does a blocking write-and-wait on any page that belongs to the
906 * subbuffer prior to the one we just wrote.
907 * Don't care about error values, as these are just hints and ways to
908 * limit the amount of page cache used.
909 */
910 if (orig_offset < stream->chan->max_sb_size) {
911 return;
912 }
b9182dd9 913 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
3bd1e081
MD
914 stream->chan->max_sb_size,
915 SYNC_FILE_RANGE_WAIT_BEFORE
916 | SYNC_FILE_RANGE_WRITE
917 | SYNC_FILE_RANGE_WAIT_AFTER);
918 /*
919 * Give hints to the kernel about how we access the file:
920 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
921 * we write it.
922 *
923 * We need to call fadvise again after the file grows because the
924 * kernel does not seem to apply fadvise to non-existing parts of the
925 * file.
926 *
927 * Call fadvise _after_ having waited for the page writeback to
928 * complete because the dirty page writeback semantic is not well
929 * defined. So it can be expected to lead to lower throughput in
930 * streaming.
931 */
932 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
933 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
934}
935
936/*
937 * Initialise the necessary environnement :
938 * - create a new context
939 * - create the poll_pipe
940 * - create the should_quit pipe (for signal handler)
941 * - create the thread pipe (for splice)
942 *
943 * Takes a function pointer as argument, this function is called when data is
944 * available on a buffer. This function is responsible to do the
945 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
946 * buffer configuration and then kernctl_put_next_subbuf at the end.
947 *
948 * Returns a pointer to the new context or NULL on error.
949 */
950struct lttng_consumer_local_data *lttng_consumer_create(
951 enum lttng_consumer_type type,
4078b776 952 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 953 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
954 int (*recv_channel)(struct lttng_consumer_channel *channel),
955 int (*recv_stream)(struct lttng_consumer_stream *stream),
956 int (*update_stream)(int stream_key, uint32_t state))
957{
958 int ret, i;
959 struct lttng_consumer_local_data *ctx;
960
961 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
962 consumer_data.type == type);
963 consumer_data.type = type;
964
effcf122 965 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081
MD
966 if (ctx == NULL) {
967 perror("allocating context");
968 goto error;
969 }
970
971 ctx->consumer_error_socket = -1;
972 /* assign the callbacks */
973 ctx->on_buffer_ready = buffer_ready;
974 ctx->on_recv_channel = recv_channel;
975 ctx->on_recv_stream = recv_stream;
976 ctx->on_update_stream = update_stream;
977
978 ret = pipe(ctx->consumer_poll_pipe);
979 if (ret < 0) {
980 perror("Error creating poll pipe");
981 goto error_poll_pipe;
982 }
983
04fdd819
MD
984 /* set read end of the pipe to non-blocking */
985 ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
986 if (ret < 0) {
987 perror("fcntl O_NONBLOCK");
988 goto error_poll_fcntl;
989 }
990
991 /* set write end of the pipe to non-blocking */
992 ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
993 if (ret < 0) {
994 perror("fcntl O_NONBLOCK");
995 goto error_poll_fcntl;
996 }
997
3bd1e081
MD
998 ret = pipe(ctx->consumer_should_quit);
999 if (ret < 0) {
1000 perror("Error creating recv pipe");
1001 goto error_quit_pipe;
1002 }
1003
1004 ret = pipe(ctx->consumer_thread_pipe);
1005 if (ret < 0) {
1006 perror("Error creating thread pipe");
1007 goto error_thread_pipe;
1008 }
1009
fb3a43a9
DG
1010 ret = utils_create_pipe(ctx->consumer_metadata_pipe);
1011 if (ret < 0) {
1012 goto error_metadata_pipe;
1013 }
3bd1e081 1014
fb3a43a9
DG
1015 ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
1016 if (ret < 0) {
1017 goto error_splice_pipe;
1018 }
1019
1020 return ctx;
3bd1e081 1021
fb3a43a9
DG
1022error_splice_pipe:
1023 utils_close_pipe(ctx->consumer_metadata_pipe);
1024error_metadata_pipe:
1025 utils_close_pipe(ctx->consumer_thread_pipe);
3bd1e081
MD
1026error_thread_pipe:
1027 for (i = 0; i < 2; i++) {
1028 int err;
1029
1030 err = close(ctx->consumer_should_quit[i]);
4c462e79
MD
1031 if (err) {
1032 PERROR("close");
1033 }
3bd1e081 1034 }
04fdd819 1035error_poll_fcntl:
3bd1e081
MD
1036error_quit_pipe:
1037 for (i = 0; i < 2; i++) {
1038 int err;
1039
1040 err = close(ctx->consumer_poll_pipe[i]);
4c462e79
MD
1041 if (err) {
1042 PERROR("close");
1043 }
3bd1e081
MD
1044 }
1045error_poll_pipe:
1046 free(ctx);
1047error:
1048 return NULL;
1049}
1050
1051/*
1052 * Close all fds associated with the instance and free the context.
1053 */
1054void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
1055{
4c462e79
MD
1056 int ret;
1057
1058 ret = close(ctx->consumer_error_socket);
1059 if (ret) {
1060 PERROR("close");
1061 }
1062 ret = close(ctx->consumer_thread_pipe[0]);
1063 if (ret) {
1064 PERROR("close");
1065 }
1066 ret = close(ctx->consumer_thread_pipe[1]);
1067 if (ret) {
1068 PERROR("close");
1069 }
1070 ret = close(ctx->consumer_poll_pipe[0]);
1071 if (ret) {
1072 PERROR("close");
1073 }
1074 ret = close(ctx->consumer_poll_pipe[1]);
1075 if (ret) {
1076 PERROR("close");
1077 }
1078 ret = close(ctx->consumer_should_quit[0]);
1079 if (ret) {
1080 PERROR("close");
1081 }
1082 ret = close(ctx->consumer_should_quit[1]);
1083 if (ret) {
1084 PERROR("close");
1085 }
fb3a43a9
DG
1086 utils_close_pipe(ctx->consumer_splice_metadata_pipe);
1087
3bd1e081
MD
1088 unlink(ctx->consumer_command_sock_path);
1089 free(ctx);
1090}
1091
6197aea7
DG
1092/*
1093 * Write the metadata stream id on the specified file descriptor.
1094 */
1095static int write_relayd_metadata_id(int fd,
1096 struct lttng_consumer_stream *stream,
1097 struct consumer_relayd_sock_pair *relayd)
1098{
1099 int ret;
1100 uint64_t metadata_id;
1101
1102 metadata_id = htobe64(stream->relayd_stream_id);
1103 do {
1104 ret = write(fd, (void *) &metadata_id,
1105 sizeof(stream->relayd_stream_id));
1106 } while (ret < 0 && errno == EINTR);
1107 if (ret < 0) {
1108 PERROR("write metadata stream id");
1109 goto end;
1110 }
77c7c900 1111 DBG("Metadata stream id %" PRIu64 " written before data",
6197aea7
DG
1112 stream->relayd_stream_id);
1113
1114end:
1115 return ret;
1116}
1117
3bd1e081 1118/*
09e26845
DG
1119 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1120 * core function for writing trace buffers to either the local filesystem or
1121 * the network.
1122 *
1123 * Careful review MUST be put if any changes occur!
3bd1e081
MD
1124 *
1125 * Returns the number of bytes written
1126 */
4078b776 1127ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081
MD
1128 struct lttng_consumer_local_data *ctx,
1129 struct lttng_consumer_stream *stream, unsigned long len)
1130{
f02e1e8a
DG
1131 unsigned long mmap_offset;
1132 ssize_t ret = 0, written = 0;
1133 off_t orig_offset = stream->out_fd_offset;
1134 /* Default is on the disk */
1135 int outfd = stream->out_fd;
f02e1e8a
DG
1136 struct consumer_relayd_sock_pair *relayd = NULL;
1137
1138 /* RCU lock for the relayd pointer */
1139 rcu_read_lock();
1140
1141 /* Flag that the current stream if set for network streaming. */
1142 if (stream->net_seq_idx != -1) {
1143 relayd = consumer_find_relayd(stream->net_seq_idx);
1144 if (relayd == NULL) {
1145 goto end;
1146 }
1147 }
1148
1149 /* get the offset inside the fd to mmap */
3bd1e081
MD
1150 switch (consumer_data.type) {
1151 case LTTNG_CONSUMER_KERNEL:
f02e1e8a
DG
1152 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
1153 break;
7753dea8
MD
1154 case LTTNG_CONSUMER32_UST:
1155 case LTTNG_CONSUMER64_UST:
f02e1e8a
DG
1156 ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
1157 stream->buf, &mmap_offset);
1158 break;
3bd1e081
MD
1159 default:
1160 ERR("Unknown consumer_data type");
1161 assert(0);
1162 }
f02e1e8a
DG
1163 if (ret != 0) {
1164 errno = -ret;
1165 PERROR("tracer ctl get_mmap_read_offset");
1166 written = ret;
1167 goto end;
1168 }
b9182dd9 1169
f02e1e8a
DG
1170 /* Handle stream on the relayd if the output is on the network */
1171 if (relayd) {
1172 unsigned long netlen = len;
1173
1174 /*
1175 * Lock the control socket for the complete duration of the function
1176 * since from this point on we will use the socket.
1177 */
1178 if (stream->metadata_flag) {
1179 /* Metadata requires the control socket. */
1180 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1181 netlen += sizeof(stream->relayd_stream_id);
1182 }
1183
6197aea7 1184 ret = write_relayd_stream_header(stream, netlen, relayd);
f02e1e8a
DG
1185 if (ret >= 0) {
1186 /* Use the returned socket. */
1187 outfd = ret;
1188
1189 /* Write metadata stream id before payload */
1190 if (stream->metadata_flag) {
6197aea7 1191 ret = write_relayd_metadata_id(outfd, stream, relayd);
f02e1e8a 1192 if (ret < 0) {
f02e1e8a
DG
1193 written = ret;
1194 goto end;
1195 }
f02e1e8a
DG
1196 }
1197 }
1198 /* Else, use the default set before which is the filesystem. */
1199 }
1200
1201 while (len > 0) {
1202 do {
1203 ret = write(outfd, stream->mmap_base + mmap_offset, len);
1204 } while (ret < 0 && errno == EINTR);
1205 if (ret < 0) {
1206 PERROR("Error in file write");
1207 if (written == 0) {
1208 written = ret;
1209 }
1210 goto end;
1211 } else if (ret > len) {
77c7c900 1212 PERROR("Error in file write (ret %zd > len %lu)", ret, len);
f02e1e8a
DG
1213 written += ret;
1214 goto end;
1215 } else {
1216 len -= ret;
1217 mmap_offset += ret;
1218 }
77c7c900 1219 DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
f02e1e8a
DG
1220
1221 /* This call is useless on a socket so better save a syscall. */
1222 if (!relayd) {
1223 /* This won't block, but will start writeout asynchronously */
1224 lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
1225 SYNC_FILE_RANGE_WRITE);
1226 stream->out_fd_offset += ret;
1227 }
1228 written += ret;
1229 }
1230 lttng_consumer_sync_trace_file(stream, orig_offset);
1231
1232end:
1233 /* Unlock only if ctrl socket used */
1234 if (relayd && stream->metadata_flag) {
1235 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1236 }
1237
1238 rcu_read_unlock();
1239 return written;
3bd1e081
MD
1240}
1241
1242/*
1243 * Splice the data from the ring buffer to the tracefile.
1244 *
1245 * Returns the number of bytes spliced.
1246 */
4078b776 1247ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081
MD
1248 struct lttng_consumer_local_data *ctx,
1249 struct lttng_consumer_stream *stream, unsigned long len)
1250{
f02e1e8a
DG
1251 ssize_t ret = 0, written = 0, ret_splice = 0;
1252 loff_t offset = 0;
1253 off_t orig_offset = stream->out_fd_offset;
1254 int fd = stream->wait_fd;
1255 /* Default is on the disk */
1256 int outfd = stream->out_fd;
f02e1e8a 1257 struct consumer_relayd_sock_pair *relayd = NULL;
fb3a43a9 1258 int *splice_pipe;
f02e1e8a 1259
3bd1e081
MD
1260 switch (consumer_data.type) {
1261 case LTTNG_CONSUMER_KERNEL:
f02e1e8a 1262 break;
7753dea8
MD
1263 case LTTNG_CONSUMER32_UST:
1264 case LTTNG_CONSUMER64_UST:
f02e1e8a 1265 /* Not supported for user space tracing */
3bd1e081
MD
1266 return -ENOSYS;
1267 default:
1268 ERR("Unknown consumer_data type");
1269 assert(0);
3bd1e081
MD
1270 }
1271
f02e1e8a
DG
1272 /* RCU lock for the relayd pointer */
1273 rcu_read_lock();
1274
1275 /* Flag that the current stream if set for network streaming. */
1276 if (stream->net_seq_idx != -1) {
1277 relayd = consumer_find_relayd(stream->net_seq_idx);
1278 if (relayd == NULL) {
1279 goto end;
1280 }
1281 }
1282
fb3a43a9
DG
1283 /*
1284 * Choose right pipe for splice. Metadata and trace data are handled by
1285 * different threads hence the use of two pipes in order not to race or
1286 * corrupt the written data.
1287 */
1288 if (stream->metadata_flag) {
1289 splice_pipe = ctx->consumer_splice_metadata_pipe;
1290 } else {
1291 splice_pipe = ctx->consumer_thread_pipe;
1292 }
1293
f02e1e8a
DG
1294 /* Write metadata stream id before payload */
1295 if (stream->metadata_flag && relayd) {
1296 /*
1297 * Lock the control socket for the complete duration of the function
1298 * since from this point on we will use the socket.
1299 */
1300 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1301
fb3a43a9 1302 ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd);
f02e1e8a 1303 if (ret < 0) {
f02e1e8a
DG
1304 written = ret;
1305 goto end;
1306 }
f02e1e8a
DG
1307 }
1308
1309 while (len > 0) {
1310 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
1311 (unsigned long)offset, len, fd);
fb3a43a9 1312 ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
f02e1e8a
DG
1313 SPLICE_F_MOVE | SPLICE_F_MORE);
1314 DBG("splice chan to pipe, ret %zd", ret_splice);
1315 if (ret_splice < 0) {
1316 PERROR("Error in relay splice");
1317 if (written == 0) {
1318 written = ret_splice;
1319 }
1320 ret = errno;
1321 goto splice_error;
1322 }
1323
1324 /* Handle stream on the relayd if the output is on the network */
1325 if (relayd) {
1326 if (stream->metadata_flag) {
1327 /* Update counter to fit the spliced data */
1328 ret_splice += sizeof(stream->relayd_stream_id);
1329 len += sizeof(stream->relayd_stream_id);
1330 /*
1331 * We do this so the return value can match the len passed as
1332 * argument to this function.
1333 */
1334 written -= sizeof(stream->relayd_stream_id);
1335 }
1336
6197aea7 1337 ret = write_relayd_stream_header(stream, ret_splice, relayd);
f02e1e8a
DG
1338 if (ret >= 0) {
1339 /* Use the returned socket. */
1340 outfd = ret;
1341 } else {
6197aea7
DG
1342 ERR("Remote relayd disconnected. Stopping");
1343 goto end;
f02e1e8a
DG
1344 }
1345 }
1346
1347 /* Splice data out */
fb3a43a9 1348 ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
f02e1e8a
DG
1349 ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
1350 DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice);
1351 if (ret_splice < 0) {
1352 PERROR("Error in file splice");
1353 if (written == 0) {
1354 written = ret_splice;
1355 }
1356 ret = errno;
1357 goto splice_error;
1358 } else if (ret_splice > len) {
1359 errno = EINVAL;
1360 PERROR("Wrote more data than requested %zd (len: %lu)",
1361 ret_splice, len);
1362 written += ret_splice;
1363 ret = errno;
1364 goto splice_error;
1365 }
1366 len -= ret_splice;
1367
1368 /* This call is useless on a socket so better save a syscall. */
1369 if (!relayd) {
1370 /* This won't block, but will start writeout asynchronously */
1371 lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
1372 SYNC_FILE_RANGE_WRITE);
1373 stream->out_fd_offset += ret_splice;
1374 }
1375 written += ret_splice;
1376 }
1377 lttng_consumer_sync_trace_file(stream, orig_offset);
1378
1379 ret = ret_splice;
1380
1381 goto end;
1382
1383splice_error:
1384 /* send the appropriate error description to sessiond */
1385 switch (ret) {
1386 case EBADF:
f73fabfd 1387 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
f02e1e8a
DG
1388 break;
1389 case EINVAL:
f73fabfd 1390 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
f02e1e8a
DG
1391 break;
1392 case ENOMEM:
f73fabfd 1393 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ENOMEM);
f02e1e8a
DG
1394 break;
1395 case ESPIPE:
f73fabfd 1396 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_ESPIPE);
f02e1e8a
DG
1397 break;
1398 }
1399
1400end:
1401 if (relayd && stream->metadata_flag) {
1402 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1403 }
1404
1405 rcu_read_unlock();
1406 return written;
3bd1e081
MD
1407}
1408
1409/*
1410 * Take a snapshot for a specific fd
1411 *
1412 * Returns 0 on success, < 0 on error
1413 */
1414int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
1415 struct lttng_consumer_stream *stream)
1416{
1417 switch (consumer_data.type) {
1418 case LTTNG_CONSUMER_KERNEL:
1419 return lttng_kconsumer_take_snapshot(ctx, stream);
7753dea8
MD
1420 case LTTNG_CONSUMER32_UST:
1421 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1422 return lttng_ustconsumer_take_snapshot(ctx, stream);
1423 default:
1424 ERR("Unknown consumer_data type");
1425 assert(0);
1426 return -ENOSYS;
1427 }
1428
1429}
1430
1431/*
1432 * Get the produced position
1433 *
1434 * Returns 0 on success, < 0 on error
1435 */
1436int lttng_consumer_get_produced_snapshot(
1437 struct lttng_consumer_local_data *ctx,
1438 struct lttng_consumer_stream *stream,
1439 unsigned long *pos)
1440{
1441 switch (consumer_data.type) {
1442 case LTTNG_CONSUMER_KERNEL:
1443 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
7753dea8
MD
1444 case LTTNG_CONSUMER32_UST:
1445 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1446 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
1447 default:
1448 ERR("Unknown consumer_data type");
1449 assert(0);
1450 return -ENOSYS;
1451 }
1452}
1453
1454int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1455 int sock, struct pollfd *consumer_sockpoll)
1456{
1457 switch (consumer_data.type) {
1458 case LTTNG_CONSUMER_KERNEL:
1459 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
1460 case LTTNG_CONSUMER32_UST:
1461 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
1462 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
1463 default:
1464 ERR("Unknown consumer_data type");
1465 assert(0);
1466 return -ENOSYS;
1467 }
1468}
1469
fb3a43a9
DG
1470/*
1471 * Iterate over all stream element of the hashtable and free them. This is race
1472 * free since the hashtable received MUST be in a race free synchronization
1473 * state. It's the caller responsability to make sure of that.
1474 */
1475static void destroy_stream_ht(struct lttng_ht *ht)
1476{
1477 int ret;
1478 struct lttng_ht_iter iter;
1479 struct lttng_consumer_stream *stream;
1480
1481 if (ht == NULL) {
1482 return;
1483 }
1484
1485 cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
1486 ret = lttng_ht_del(ht, &iter);
1487 assert(!ret);
1488
1489 free(stream);
1490 }
1491
1492 lttng_ht_destroy(ht);
1493}
1494
1495/*
1496 * Clean up a metadata stream and free its memory.
1497 */
1498static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
1499{
1500 int ret;
1501 struct lttng_consumer_channel *free_chan = NULL;
1502 struct consumer_relayd_sock_pair *relayd;
1503
1504 assert(stream);
1505 /*
1506 * This call should NEVER receive regular stream. It must always be
1507 * metadata stream and this is crucial for data structure synchronization.
1508 */
1509 assert(stream->metadata_flag);
1510
1511 pthread_mutex_lock(&consumer_data.lock);
1512 switch (consumer_data.type) {
1513 case LTTNG_CONSUMER_KERNEL:
1514 if (stream->mmap_base != NULL) {
1515 ret = munmap(stream->mmap_base, stream->mmap_len);
1516 if (ret != 0) {
1517 PERROR("munmap metadata stream");
1518 }
1519 }
1520 break;
1521 case LTTNG_CONSUMER32_UST:
1522 case LTTNG_CONSUMER64_UST:
1523 lttng_ustconsumer_del_stream(stream);
1524 break;
1525 default:
1526 ERR("Unknown consumer_data type");
1527 assert(0);
1528 }
1529 pthread_mutex_unlock(&consumer_data.lock);
1530
1531 if (stream->out_fd >= 0) {
1532 ret = close(stream->out_fd);
1533 if (ret) {
1534 PERROR("close");
1535 }
1536 }
1537
1538 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
1539 ret = close(stream->wait_fd);
1540 if (ret) {
1541 PERROR("close");
1542 }
1543 }
1544
1545 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
1546 ret = close(stream->shm_fd);
1547 if (ret) {
1548 PERROR("close");
1549 }
1550 }
1551
1552 /* Check and cleanup relayd */
1553 rcu_read_lock();
1554 relayd = consumer_find_relayd(stream->net_seq_idx);
1555 if (relayd != NULL) {
1556 uatomic_dec(&relayd->refcount);
1557 assert(uatomic_read(&relayd->refcount) >= 0);
1558
1559 /* Closing streams requires to lock the control socket. */
1560 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
1561 ret = relayd_send_close_stream(&relayd->control_sock,
1562 stream->relayd_stream_id, stream->next_net_seq_num - 1);
1563 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
1564 if (ret < 0) {
1565 DBG("Unable to close stream on the relayd. Continuing");
1566 /*
1567 * Continue here. There is nothing we can do for the relayd.
1568 * Chances are that the relayd has closed the socket so we just
1569 * continue cleaning up.
1570 */
1571 }
1572
1573 /* Both conditions are met, we destroy the relayd. */
1574 if (uatomic_read(&relayd->refcount) == 0 &&
1575 uatomic_read(&relayd->destroy_flag)) {
1576 consumer_destroy_relayd(relayd);
1577 }
1578 }
1579 rcu_read_unlock();
1580
1581 /* Atomically decrement channel refcount since other threads can use it. */
1582 uatomic_dec(&stream->chan->refcount);
1583 if (!uatomic_read(&stream->chan->refcount)) {
1584 free_chan = stream->chan;
1585 }
1586
1587 if (free_chan) {
1588 consumer_del_channel(free_chan);
1589 }
1590
1591 free(stream);
1592}
1593
1594/*
1595 * Action done with the metadata stream when adding it to the consumer internal
1596 * data structures to handle it.
1597 */
1598static void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
1599{
1600 struct consumer_relayd_sock_pair *relayd;
1601
1602 /* Find relayd and, if one is found, increment refcount. */
1603 rcu_read_lock();
1604 relayd = consumer_find_relayd(stream->net_seq_idx);
1605 if (relayd != NULL) {
1606 uatomic_inc(&relayd->refcount);
1607 }
1608 rcu_read_unlock();
1609}
1610
1611/*
1612 * Thread polls on metadata file descriptor and write them on disk or on the
1613 * network.
1614 */
1615void *lttng_consumer_thread_poll_metadata(void *data)
1616{
1617 int ret, i, pollfd;
1618 uint32_t revents, nb_fd;
1619 struct lttng_consumer_stream *stream;
1620 struct lttng_ht_iter iter;
1621 struct lttng_ht_node_ulong *node;
1622 struct lttng_ht *metadata_ht = NULL;
1623 struct lttng_poll_event events;
1624 struct lttng_consumer_local_data *ctx = data;
1625 ssize_t len;
1626
1627 rcu_register_thread();
1628
1629 DBG("Thread metadata poll started");
1630
1631 metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1632 if (metadata_ht == NULL) {
1633 goto end;
1634 }
1635
1636 /* Size is set to 1 for the consumer_metadata pipe */
1637 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
1638 if (ret < 0) {
1639 ERR("Poll set creation failed");
1640 goto end;
1641 }
1642
1643 ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
1644 if (ret < 0) {
1645 goto end;
1646 }
1647
1648 /* Main loop */
1649 DBG("Metadata main loop started");
1650
1651 while (1) {
1652 lttng_poll_reset(&events);
1653
1654 nb_fd = LTTNG_POLL_GETNB(&events);
1655
1656 /* Only the metadata pipe is set */
1657 if (nb_fd == 0 && consumer_quit == 1) {
1658 goto end;
1659 }
1660
1661restart:
1662 DBG("Metadata poll wait with %d fd(s)", nb_fd);
1663 ret = lttng_poll_wait(&events, -1);
1664 DBG("Metadata event catched in thread");
1665 if (ret < 0) {
1666 if (errno == EINTR) {
1667 goto restart;
1668 }
1669 goto error;
1670 }
1671
1672 for (i = 0; i < nb_fd; i++) {
1673 revents = LTTNG_POLL_GETEV(&events, i);
1674 pollfd = LTTNG_POLL_GETFD(&events, i);
1675
1676 /* Check the metadata pipe for incoming metadata. */
1677 if (pollfd == ctx->consumer_metadata_pipe[0]) {
1678 if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) {
1679 DBG("Metadata thread pipe hung up");
1680 /*
1681 * Remove the pipe from the poll set and continue the loop
1682 * since their might be data to consume.
1683 */
1684 lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
1685 close(ctx->consumer_metadata_pipe[0]);
1686 continue;
1687 } else if (revents & LPOLLIN) {
1688 stream = zmalloc(sizeof(struct lttng_consumer_stream));
1689 if (stream == NULL) {
1690 PERROR("zmalloc metadata consumer stream");
1691 goto error;
1692 }
1693
1694 do {
1695 /* Get the stream and add it to the local hash table */
1696 ret = read(pollfd, stream,
1697 sizeof(struct lttng_consumer_stream));
1698 } while (ret < 0 && errno == EINTR);
1699 if (ret < 0 || ret < sizeof(struct lttng_consumer_stream)) {
1700 PERROR("read metadata stream");
1701 free(stream);
1702 /*
1703 * Let's continue here and hope we can still work
1704 * without stopping the consumer. XXX: Should we?
1705 */
1706 continue;
1707 }
1708
1709 DBG("Adding metadata stream %d to poll set",
1710 stream->wait_fd);
1711
1712 /* The node should be init at this point */
1713 lttng_ht_add_unique_ulong(metadata_ht,
1714 &stream->waitfd_node);
1715
1716 /* Add metadata stream to the global poll events list */
1717 lttng_poll_add(&events, stream->wait_fd,
1718 LPOLLIN | LPOLLPRI);
1719
1720 consumer_add_metadata_stream(stream);
1721 }
1722
1723 /* Metadata pipe handled. Continue handling the others */
1724 continue;
1725 }
1726
1727 /* From here, the event is a metadata wait fd */
1728
1729 lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
1730 &iter);
1731 node = lttng_ht_iter_get_node_ulong(&iter);
1732 if (node == NULL) {
1733 /* FD not found, continue loop */
1734 continue;
1735 }
1736
1737 stream = caa_container_of(node, struct lttng_consumer_stream,
1738 waitfd_node);
1739
1740 /* Get the data out of the metadata file descriptor */
1741 if (revents & (LPOLLIN | LPOLLPRI)) {
1742 DBG("Metadata available on fd %d", pollfd);
1743 assert(stream->wait_fd == pollfd);
1744
1745 len = ctx->on_buffer_ready(stream, ctx);
1746 /* It's ok to have an unavailable sub-buffer */
1747 if (len < 0 && len != -EAGAIN) {
1748 goto end;
1749 } else if (len > 0) {
1750 stream->data_read = 1;
1751 }
1752 }
1753
1754 /*
1755 * Remove the stream from the hash table since there is no data
1756 * left on the fd because we previously did a read on the buffer.
1757 */
1758 if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) {
1759 DBG("Metadata fd %d is hup|err|nval.", pollfd);
1760 if (!stream->hangup_flush_done
1761 && (consumer_data.type == LTTNG_CONSUMER32_UST
1762 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1763 DBG("Attempting to flush and consume the UST buffers");
1764 lttng_ustconsumer_on_stream_hangup(stream);
1765
1766 /* We just flushed the stream now read it. */
1767 len = ctx->on_buffer_ready(stream, ctx);
1768 /* It's ok to have an unavailable sub-buffer */
1769 if (len < 0 && len != -EAGAIN) {
1770 goto end;
1771 }
1772 }
1773
1774 /* Removing it from hash table, poll set and free memory */
1775 lttng_ht_del(metadata_ht, &iter);
1776 lttng_poll_del(&events, stream->wait_fd);
1777 consumer_del_metadata_stream(stream);
1778 }
1779 }
1780 }
1781
1782error:
1783end:
1784 DBG("Metadata poll thread exiting");
1785 lttng_poll_clean(&events);
1786
1787 if (metadata_ht) {
1788 destroy_stream_ht(metadata_ht);
1789 }
1790
1791 rcu_unregister_thread();
1792 return NULL;
1793}
1794
3bd1e081 1795/*
e4421fec 1796 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
1797 * it to tracefile if necessary.
1798 */
1799void *lttng_consumer_thread_poll_fds(void *data)
1800{
1801 int num_rdy, num_hup, high_prio, ret, i;
1802 struct pollfd *pollfd = NULL;
1803 /* local view of the streams */
1804 struct lttng_consumer_stream **local_stream = NULL;
1805 /* local view of consumer_data.fds_count */
1806 int nb_fd = 0;
3bd1e081 1807 struct lttng_consumer_local_data *ctx = data;
00e2e675 1808 ssize_t len;
fb3a43a9
DG
1809 pthread_t metadata_thread;
1810 void *status;
3bd1e081 1811
e7b994a3
DG
1812 rcu_register_thread();
1813
fb3a43a9
DG
1814 /* Start metadata polling thread */
1815 ret = pthread_create(&metadata_thread, NULL,
1816 lttng_consumer_thread_poll_metadata, (void *) ctx);
1817 if (ret < 0) {
1818 PERROR("pthread_create metadata thread");
1819 goto end;
1820 }
1821
effcf122 1822 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
1823
1824 while (1) {
1825 high_prio = 0;
1826 num_hup = 0;
1827
1828 /*
e4421fec 1829 * the fds set has been updated, we need to update our
3bd1e081
MD
1830 * local array as well
1831 */
1832 pthread_mutex_lock(&consumer_data.lock);
1833 if (consumer_data.need_update) {
1834 if (pollfd != NULL) {
1835 free(pollfd);
1836 pollfd = NULL;
1837 }
1838 if (local_stream != NULL) {
1839 free(local_stream);
1840 local_stream = NULL;
1841 }
1842
1843 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 1844 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081
MD
1845 if (pollfd == NULL) {
1846 perror("pollfd malloc");
1847 pthread_mutex_unlock(&consumer_data.lock);
1848 goto end;
1849 }
1850
1851 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 1852 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
1853 sizeof(struct lttng_consumer_stream));
1854 if (local_stream == NULL) {
1855 perror("local_stream malloc");
1856 pthread_mutex_unlock(&consumer_data.lock);
1857 goto end;
1858 }
fb3a43a9 1859 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
3bd1e081
MD
1860 if (ret < 0) {
1861 ERR("Error in allocating pollfd or local_outfds");
f73fabfd 1862 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
1863 pthread_mutex_unlock(&consumer_data.lock);
1864 goto end;
1865 }
1866 nb_fd = ret;
1867 consumer_data.need_update = 0;
1868 }
1869 pthread_mutex_unlock(&consumer_data.lock);
1870
4078b776
MD
1871 /* No FDs and consumer_quit, consumer_cleanup the thread */
1872 if (nb_fd == 0 && consumer_quit == 1) {
1873 goto end;
1874 }
3bd1e081 1875 /* poll on the array of fds */
88f2b785 1876 restart:
3bd1e081
MD
1877 DBG("polling on %d fd", nb_fd + 1);
1878 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
1879 DBG("poll num_rdy : %d", num_rdy);
1880 if (num_rdy == -1) {
88f2b785
MD
1881 /*
1882 * Restart interrupted system call.
1883 */
1884 if (errno == EINTR) {
1885 goto restart;
1886 }
3bd1e081 1887 perror("Poll error");
f73fabfd 1888 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
3bd1e081
MD
1889 goto end;
1890 } else if (num_rdy == 0) {
1891 DBG("Polling thread timed out");
1892 goto end;
1893 }
1894
3bd1e081 1895 /*
00e2e675
DG
1896 * If the consumer_poll_pipe triggered poll go directly to the
1897 * beginning of the loop to update the array. We want to prioritize
1898 * array update over low-priority reads.
3bd1e081 1899 */
509bb1cf 1900 if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
04fdd819
MD
1901 size_t pipe_readlen;
1902 char tmp;
1903
3bd1e081 1904 DBG("consumer_poll_pipe wake up");
04fdd819
MD
1905 /* Consume 1 byte of pipe data */
1906 do {
1907 pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
1908 } while (pipe_readlen == -1 && errno == EINTR);
3bd1e081
MD
1909 continue;
1910 }
1911
1912 /* Take care of high priority channels first. */
1913 for (i = 0; i < nb_fd; i++) {
fb3a43a9 1914 if (pollfd[i].revents & POLLPRI) {
d41f73b7
MD
1915 DBG("Urgent read on fd %d", pollfd[i].fd);
1916 high_prio = 1;
4078b776 1917 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 1918 /* it's ok to have an unavailable sub-buffer */
4078b776
MD
1919 if (len < 0 && len != -EAGAIN) {
1920 goto end;
1921 } else if (len > 0) {
1922 local_stream[i]->data_read = 1;
d41f73b7 1923 }
3bd1e081
MD
1924 }
1925 }
1926
4078b776
MD
1927 /*
1928 * If we read high prio channel in this loop, try again
1929 * for more high prio data.
1930 */
1931 if (high_prio) {
3bd1e081
MD
1932 continue;
1933 }
1934
1935 /* Take care of low priority channels. */
4078b776
MD
1936 for (i = 0; i < nb_fd; i++) {
1937 if ((pollfd[i].revents & POLLIN) ||
1938 local_stream[i]->hangup_flush_done) {
4078b776
MD
1939 DBG("Normal read on fd %d", pollfd[i].fd);
1940 len = ctx->on_buffer_ready(local_stream[i], ctx);
1941 /* it's ok to have an unavailable sub-buffer */
1942 if (len < 0 && len != -EAGAIN) {
1943 goto end;
1944 } else if (len > 0) {
1945 local_stream[i]->data_read = 1;
1946 }
1947 }
1948 }
1949
1950 /* Handle hangup and errors */
1951 for (i = 0; i < nb_fd; i++) {
1952 if (!local_stream[i]->hangup_flush_done
1953 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
1954 && (consumer_data.type == LTTNG_CONSUMER32_UST
1955 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1956 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1957 pollfd[i].fd);
1958 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
1959 /* Attempt read again, for the data we just flushed. */
1960 local_stream[i]->data_read = 1;
1961 }
1962 /*
1963 * If the poll flag is HUP/ERR/NVAL and we have
1964 * read no data in this pass, we can remove the
1965 * stream from its hash table.
1966 */
1967 if ((pollfd[i].revents & POLLHUP)) {
1968 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
1969 if (!local_stream[i]->data_read) {
702b1ea4 1970 consumer_del_stream(local_stream[i]);
4078b776
MD
1971 num_hup++;
1972 }
1973 } else if (pollfd[i].revents & POLLERR) {
1974 ERR("Error returned in polling fd %d.", pollfd[i].fd);
1975 if (!local_stream[i]->data_read) {
702b1ea4 1976 consumer_del_stream(local_stream[i]);
4078b776
MD
1977 num_hup++;
1978 }
1979 } else if (pollfd[i].revents & POLLNVAL) {
1980 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
1981 if (!local_stream[i]->data_read) {
702b1ea4 1982 consumer_del_stream(local_stream[i]);
4078b776 1983 num_hup++;
3bd1e081
MD
1984 }
1985 }
4078b776 1986 local_stream[i]->data_read = 0;
3bd1e081
MD
1987 }
1988 }
1989end:
1990 DBG("polling thread exiting");
1991 if (pollfd != NULL) {
1992 free(pollfd);
1993 pollfd = NULL;
1994 }
1995 if (local_stream != NULL) {
1996 free(local_stream);
1997 local_stream = NULL;
1998 }
fb3a43a9
DG
1999
2000 /*
2001 * Close the write side of the pipe so epoll_wait() in
2002 * lttng_consumer_thread_poll_metadata can catch it. The thread is
2003 * monitoring the read side of the pipe. If we close them both, epoll_wait
2004 * strangely does not return and could create a endless wait period if the
2005 * pipe is the only tracked fd in the poll set. The thread will take care
2006 * of closing the read side.
2007 */
2008 close(ctx->consumer_metadata_pipe[1]);
2009 if (ret) {
2010 ret = pthread_join(metadata_thread, &status);
2011 if (ret < 0) {
2012 PERROR("pthread_join metadata thread");
2013 }
2014 }
2015
e7b994a3 2016 rcu_unregister_thread();
3bd1e081
MD
2017 return NULL;
2018}
2019
2020/*
2021 * This thread listens on the consumerd socket and receives the file
2022 * descriptors from the session daemon.
2023 */
2024void *lttng_consumer_thread_receive_fds(void *data)
2025{
2026 int sock, client_socket, ret;
2027 /*
2028 * structure to poll for incoming data on communication socket avoids
2029 * making blocking sockets.
2030 */
2031 struct pollfd consumer_sockpoll[2];
2032 struct lttng_consumer_local_data *ctx = data;
2033
e7b994a3
DG
2034 rcu_register_thread();
2035
3bd1e081
MD
2036 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
2037 unlink(ctx->consumer_command_sock_path);
2038 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
2039 if (client_socket < 0) {
2040 ERR("Cannot create command socket");
2041 goto end;
2042 }
2043
2044 ret = lttcomm_listen_unix_sock(client_socket);
2045 if (ret < 0) {
2046 goto end;
2047 }
2048
32258573 2049 DBG("Sending ready command to lttng-sessiond");
f73fabfd 2050 ret = lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY);
3bd1e081
MD
2051 /* return < 0 on error, but == 0 is not fatal */
2052 if (ret < 0) {
32258573 2053 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
2054 goto end;
2055 }
2056
2057 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
2058 if (ret < 0) {
2059 perror("fcntl O_NONBLOCK");
2060 goto end;
2061 }
2062
2063 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2064 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
2065 consumer_sockpoll[0].events = POLLIN | POLLPRI;
2066 consumer_sockpoll[1].fd = client_socket;
2067 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2068
2069 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2070 goto end;
2071 }
2072 DBG("Connection on client_socket");
2073
2074 /* Blocking call, waiting for transmission */
2075 sock = lttcomm_accept_unix_sock(client_socket);
2076 if (sock <= 0) {
2077 WARN("On accept");
2078 goto end;
2079 }
2080 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
2081 if (ret < 0) {
2082 perror("fcntl O_NONBLOCK");
2083 goto end;
2084 }
2085
2086 /* update the polling structure to poll on the established socket */
2087 consumer_sockpoll[1].fd = sock;
2088 consumer_sockpoll[1].events = POLLIN | POLLPRI;
2089
2090 while (1) {
2091 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2092 goto end;
2093 }
2094 DBG("Incoming command on sock");
2095 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
2096 if (ret == -ENOENT) {
2097 DBG("Received STOP command");
2098 goto end;
2099 }
4cbc1a04
DG
2100 if (ret <= 0) {
2101 /*
2102 * This could simply be a session daemon quitting. Don't output
2103 * ERR() here.
2104 */
2105 DBG("Communication interrupted on command socket");
3bd1e081
MD
2106 goto end;
2107 }
2108 if (consumer_quit) {
2109 DBG("consumer_thread_receive_fds received quit from signal");
2110 goto end;
2111 }
2112 DBG("received fds on sock");
2113 }
2114end:
2115 DBG("consumer_thread_receive_fds exiting");
2116
2117 /*
2118 * when all fds have hung up, the polling thread
2119 * can exit cleanly
2120 */
2121 consumer_quit = 1;
2122
2123 /*
2124 * 2s of grace period, if no polling events occur during
2125 * this period, the polling thread will exit even if there
2126 * are still open FDs (should not happen, but safety mechanism).
2127 */
2128 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
2129
04fdd819
MD
2130 /*
2131 * Wake-up the other end by writing a null byte in the pipe
2132 * (non-blocking). Important note: Because writing into the
2133 * pipe is non-blocking (and therefore we allow dropping wakeup
2134 * data, as long as there is wakeup data present in the pipe
2135 * buffer to wake up the other end), the other end should
2136 * perform the following sequence for waiting:
2137 * 1) empty the pipe (reads).
2138 * 2) perform update operation.
2139 * 3) wait on the pipe (poll).
2140 */
2141 do {
2142 ret = write(ctx->consumer_poll_pipe[1], "", 1);
6f94560a 2143 } while (ret < 0 && errno == EINTR);
e7b994a3 2144 rcu_unregister_thread();
3bd1e081
MD
2145 return NULL;
2146}
d41f73b7 2147
4078b776 2148ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
2149 struct lttng_consumer_local_data *ctx)
2150{
2151 switch (consumer_data.type) {
2152 case LTTNG_CONSUMER_KERNEL:
2153 return lttng_kconsumer_read_subbuffer(stream, ctx);
7753dea8
MD
2154 case LTTNG_CONSUMER32_UST:
2155 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
2156 return lttng_ustconsumer_read_subbuffer(stream, ctx);
2157 default:
2158 ERR("Unknown consumer_data type");
2159 assert(0);
2160 return -ENOSYS;
2161 }
2162}
2163
2164int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
2165{
2166 switch (consumer_data.type) {
2167 case LTTNG_CONSUMER_KERNEL:
2168 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
2169 case LTTNG_CONSUMER32_UST:
2170 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
2171 return lttng_ustconsumer_on_recv_stream(stream);
2172 default:
2173 ERR("Unknown consumer_data type");
2174 assert(0);
2175 return -ENOSYS;
2176 }
2177}
e4421fec
DG
2178
2179/*
2180 * Allocate and set consumer data hash tables.
2181 */
2182void lttng_consumer_init(void)
2183{
2184 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2185 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
00e2e675 2186 consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
e4421fec 2187}
7735ef9e
DG
2188
2189/*
2190 * Process the ADD_RELAYD command receive by a consumer.
2191 *
2192 * This will create a relayd socket pair and add it to the relayd hash table.
2193 * The caller MUST acquire a RCU read side lock before calling it.
2194 */
2195int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
2196 struct lttng_consumer_local_data *ctx, int sock,
2197 struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
2198{
2199 int fd, ret = -1;
2200 struct consumer_relayd_sock_pair *relayd;
2201
2202 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
2203
2204 /* Get relayd reference if exists. */
2205 relayd = consumer_find_relayd(net_seq_idx);
2206 if (relayd == NULL) {
2207 /* Not found. Allocate one. */
2208 relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
2209 if (relayd == NULL) {
f73fabfd 2210 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
7735ef9e
DG
2211 goto error;
2212 }
2213 }
2214
2215 /* Poll on consumer socket. */
2216 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
2217 ret = -EINTR;
2218 goto error;
2219 }
2220
2221 /* Get relayd socket from session daemon */
2222 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
2223 if (ret != sizeof(fd)) {
f73fabfd 2224 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
7735ef9e
DG
2225 ret = -1;
2226 goto error;
2227 }
2228
2229 /* Copy socket information and received FD */
2230 switch (sock_type) {
2231 case LTTNG_STREAM_CONTROL:
2232 /* Copy received lttcomm socket */
2233 lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
2234 ret = lttcomm_create_sock(&relayd->control_sock);
2235 if (ret < 0) {
2236 goto error;
2237 }
2238
2239 /* Close the created socket fd which is useless */
2240 close(relayd->control_sock.fd);
2241
2242 /* Assign new file descriptor */
2243 relayd->control_sock.fd = fd;
2244 break;
2245 case LTTNG_STREAM_DATA:
2246 /* Copy received lttcomm socket */
2247 lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
2248 ret = lttcomm_create_sock(&relayd->data_sock);
2249 if (ret < 0) {
2250 goto error;
2251 }
2252
2253 /* Close the created socket fd which is useless */
2254 close(relayd->data_sock.fd);
2255
2256 /* Assign new file descriptor */
2257 relayd->data_sock.fd = fd;
2258 break;
2259 default:
2260 ERR("Unknown relayd socket type (%d)", sock_type);
2261 goto error;
2262 }
2263
2264 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
2265 sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
2266 relayd->net_seq_idx, fd);
2267
2268 /*
2269 * Add relayd socket pair to consumer data hashtable. If object already
2270 * exists or on error, the function gracefully returns.
2271 */
2272 consumer_add_relayd(relayd);
2273
2274 /* All good! */
2275 ret = 0;
2276
2277error:
2278 return ret;
2279}
This page took 0.128137 seconds and 4 git commands to generate.