Merge branch 'master' of git://git.lttng.org/lttng-tools
[lttng-tools.git] / src / common / consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30
990570ed 31#include <common/common.h>
10a8a223
DG
32#include <common/kernel-ctl/kernel-ctl.h>
33#include <common/sessiond-comm/sessiond-comm.h>
34#include <common/kernel-consumer/kernel-consumer.h>
35#include <common/ust-consumer/ust-consumer.h>
36
37#include "consumer.h"
3bd1e081
MD
38
39struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
40 .stream_count = 0,
41 .need_update = 1,
42 .type = LTTNG_CONSUMER_UNKNOWN,
43};
44
45/* timeout parameter, to control the polling thread grace period. */
46int consumer_poll_timeout = -1;
47
48/*
49 * Flag to inform the polling thread to quit when all fd hung up. Updated by
50 * the consumer_thread_receive_fds when it notices that all fds has hung up.
51 * Also updated by the signal handler (consumer_should_exit()). Read by the
52 * polling threads.
53 */
54volatile int consumer_quit = 0;
55
56/*
57 * Find a stream. The consumer_data.lock must be locked during this
58 * call.
59 */
60static struct lttng_consumer_stream *consumer_find_stream(int key)
61{
e4421fec
DG
62 struct lttng_ht_iter iter;
63 struct lttng_ht_node_ulong *node;
64 struct lttng_consumer_stream *stream = NULL;
3bd1e081 65
7ad0a0cb
MD
66 /* Negative keys are lookup failures */
67 if (key < 0)
68 return NULL;
e4421fec 69
6065ceec
DG
70 rcu_read_lock();
71
e4421fec
DG
72 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
73 &iter);
74 node = lttng_ht_iter_get_node_ulong(&iter);
75 if (node != NULL) {
76 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 77 }
e4421fec 78
6065ceec
DG
79 rcu_read_unlock();
80
e4421fec 81 return stream;
3bd1e081
MD
82}
83
7ad0a0cb
MD
84static void consumer_steal_stream_key(int key)
85{
86 struct lttng_consumer_stream *stream;
87
88 stream = consumer_find_stream(key);
89 if (stream)
90 stream->key = -1;
91}
92
3bd1e081
MD
93static struct lttng_consumer_channel *consumer_find_channel(int key)
94{
e4421fec
DG
95 struct lttng_ht_iter iter;
96 struct lttng_ht_node_ulong *node;
97 struct lttng_consumer_channel *channel = NULL;
3bd1e081 98
7ad0a0cb
MD
99 /* Negative keys are lookup failures */
100 if (key < 0)
101 return NULL;
e4421fec 102
6065ceec
DG
103 rcu_read_lock();
104
e4421fec
DG
105 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
106 &iter);
107 node = lttng_ht_iter_get_node_ulong(&iter);
108 if (node != NULL) {
109 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 110 }
e4421fec 111
6065ceec
DG
112 rcu_read_unlock();
113
e4421fec 114 return channel;
3bd1e081
MD
115}
116
7ad0a0cb
MD
117static void consumer_steal_channel_key(int key)
118{
119 struct lttng_consumer_channel *channel;
120
121 channel = consumer_find_channel(key);
122 if (channel)
123 channel->key = -1;
124}
125
702b1ea4
MD
126static
127void consumer_free_stream(struct rcu_head *head)
128{
129 struct lttng_ht_node_ulong *node =
130 caa_container_of(head, struct lttng_ht_node_ulong, head);
131 struct lttng_consumer_stream *stream =
132 caa_container_of(node, struct lttng_consumer_stream, node);
133
134 free(stream);
135}
136
3bd1e081
MD
137/*
138 * Remove a stream from the global list protected by a mutex. This
139 * function is also responsible for freeing its data structures.
140 */
141void consumer_del_stream(struct lttng_consumer_stream *stream)
142{
143 int ret;
e4421fec 144 struct lttng_ht_iter iter;
3bd1e081
MD
145 struct lttng_consumer_channel *free_chan = NULL;
146
147 pthread_mutex_lock(&consumer_data.lock);
148
149 switch (consumer_data.type) {
150 case LTTNG_CONSUMER_KERNEL:
151 if (stream->mmap_base != NULL) {
152 ret = munmap(stream->mmap_base, stream->mmap_len);
153 if (ret != 0) {
154 perror("munmap");
155 }
156 }
157 break;
7753dea8
MD
158 case LTTNG_CONSUMER32_UST:
159 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
160 lttng_ustconsumer_del_stream(stream);
161 break;
162 default:
163 ERR("Unknown consumer_data type");
164 assert(0);
165 goto end;
166 }
167
6065ceec
DG
168 rcu_read_lock();
169
e4421fec
DG
170 /* Get stream node from hash table */
171 lttng_ht_lookup(consumer_data.stream_ht,
172 (void *)((unsigned long) stream->key), &iter);
702b1ea4
MD
173 /*
174 * Remove stream node from hash table. It can fail if it's been
175 * replaced due to key reuse.
176 */
177 (void) lttng_ht_del(consumer_data.stream_ht, &iter);
e4421fec 178
6065ceec
DG
179 rcu_read_unlock();
180
3bd1e081
MD
181 if (consumer_data.stream_count <= 0) {
182 goto end;
183 }
184 consumer_data.stream_count--;
185 if (!stream) {
186 goto end;
187 }
188 if (stream->out_fd >= 0) {
4c462e79
MD
189 ret = close(stream->out_fd);
190 if (ret) {
191 PERROR("close");
192 }
3bd1e081 193 }
b5c5fc29 194 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
4c462e79
MD
195 ret = close(stream->wait_fd);
196 if (ret) {
197 PERROR("close");
198 }
3bd1e081 199 }
2c1dd183 200 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
4c462e79
MD
201 ret = close(stream->shm_fd);
202 if (ret) {
203 PERROR("close");
204 }
3bd1e081
MD
205 }
206 if (!--stream->chan->refcount)
207 free_chan = stream->chan;
702b1ea4
MD
208
209 call_rcu(&stream->node.head, consumer_free_stream);
3bd1e081
MD
210end:
211 consumer_data.need_update = 1;
212 pthread_mutex_unlock(&consumer_data.lock);
213
214 if (free_chan)
215 consumer_del_channel(free_chan);
216}
217
218struct lttng_consumer_stream *consumer_allocate_stream(
219 int channel_key, int stream_key,
220 int shm_fd, int wait_fd,
221 enum lttng_consumer_stream_state state,
222 uint64_t mmap_len,
223 enum lttng_event_output output,
6df2e2c9
MD
224 const char *path_name,
225 uid_t uid,
226 gid_t gid)
3bd1e081
MD
227{
228 struct lttng_consumer_stream *stream;
229 int ret;
230
effcf122 231 stream = zmalloc(sizeof(*stream));
3bd1e081
MD
232 if (stream == NULL) {
233 perror("malloc struct lttng_consumer_stream");
234 goto end;
235 }
236 stream->chan = consumer_find_channel(channel_key);
237 if (!stream->chan) {
238 perror("Unable to find channel key");
239 goto end;
240 }
241 stream->chan->refcount++;
242 stream->key = stream_key;
243 stream->shm_fd = shm_fd;
244 stream->wait_fd = wait_fd;
245 stream->out_fd = -1;
246 stream->out_fd_offset = 0;
247 stream->state = state;
248 stream->mmap_len = mmap_len;
249 stream->mmap_base = NULL;
250 stream->output = output;
6df2e2c9
MD
251 stream->uid = uid;
252 stream->gid = gid;
3bd1e081
MD
253 strncpy(stream->path_name, path_name, PATH_MAX - 1);
254 stream->path_name[PATH_MAX - 1] = '\0';
e4421fec 255 lttng_ht_node_init_ulong(&stream->node, stream->key);
3bd1e081
MD
256
257 switch (consumer_data.type) {
258 case LTTNG_CONSUMER_KERNEL:
259 break;
7753dea8
MD
260 case LTTNG_CONSUMER32_UST:
261 case LTTNG_CONSUMER64_UST:
5af2f756 262 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
263 ret = lttng_ustconsumer_allocate_stream(stream);
264 if (ret) {
265 free(stream);
266 return NULL;
267 }
268 break;
269 default:
270 ERR("Unknown consumer_data type");
271 assert(0);
272 goto end;
273 }
274 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
275 stream->path_name, stream->key,
276 stream->shm_fd,
277 stream->wait_fd,
278 (unsigned long long) stream->mmap_len,
279 stream->out_fd);
280end:
281 return stream;
282}
283
284/*
285 * Add a stream to the global list protected by a mutex.
286 */
287int consumer_add_stream(struct lttng_consumer_stream *stream)
288{
289 int ret = 0;
290
291 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
292 /* Steal stream identifier, for UST */
293 consumer_steal_stream_key(stream->key);
6065ceec 294 rcu_read_lock();
702b1ea4
MD
295 /*
296 * We simply remove the old channel from the hash table. It's
297 * ok, since we know for sure the sessiond wants to replace it
298 * with the new version, because the key has been reused.
299 */
300 (void) lttng_ht_add_replace_ulong(consumer_data.stream_ht, &stream->node);
6065ceec 301 rcu_read_unlock();
3bd1e081
MD
302 consumer_data.stream_count++;
303 consumer_data.need_update = 1;
304
305 switch (consumer_data.type) {
306 case LTTNG_CONSUMER_KERNEL:
307 break;
7753dea8
MD
308 case LTTNG_CONSUMER32_UST:
309 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
310 /* Streams are in CPU number order (we rely on this) */
311 stream->cpu = stream->chan->nr_streams++;
312 break;
313 default:
314 ERR("Unknown consumer_data type");
315 assert(0);
316 goto end;
317 }
318
319end:
320 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 321
3bd1e081
MD
322 return ret;
323}
324
325/*
326 * Update a stream according to what we just received.
327 */
328void consumer_change_stream_state(int stream_key,
329 enum lttng_consumer_stream_state state)
330{
331 struct lttng_consumer_stream *stream;
332
333 pthread_mutex_lock(&consumer_data.lock);
334 stream = consumer_find_stream(stream_key);
335 if (stream) {
336 stream->state = state;
337 }
338 consumer_data.need_update = 1;
339 pthread_mutex_unlock(&consumer_data.lock);
340}
341
702b1ea4
MD
342static
343void consumer_free_channel(struct rcu_head *head)
344{
345 struct lttng_ht_node_ulong *node =
346 caa_container_of(head, struct lttng_ht_node_ulong, head);
347 struct lttng_consumer_channel *channel =
348 caa_container_of(node, struct lttng_consumer_channel, node);
349
350 free(channel);
351}
352
3bd1e081
MD
353/*
354 * Remove a channel from the global list protected by a mutex. This
355 * function is also responsible for freeing its data structures.
356 */
357void consumer_del_channel(struct lttng_consumer_channel *channel)
358{
359 int ret;
e4421fec 360 struct lttng_ht_iter iter;
3bd1e081
MD
361
362 pthread_mutex_lock(&consumer_data.lock);
363
364 switch (consumer_data.type) {
365 case LTTNG_CONSUMER_KERNEL:
366 break;
7753dea8
MD
367 case LTTNG_CONSUMER32_UST:
368 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
369 lttng_ustconsumer_del_channel(channel);
370 break;
371 default:
372 ERR("Unknown consumer_data type");
373 assert(0);
374 goto end;
375 }
376
6065ceec
DG
377 rcu_read_lock();
378
e4421fec
DG
379 lttng_ht_lookup(consumer_data.channel_ht,
380 (void *)((unsigned long) channel->key), &iter);
702b1ea4
MD
381
382 /*
383 * Remove channel node from hash table. It can fail if it's been
384 * replaced due to key reuse.
385 */
386 (void) lttng_ht_del(consumer_data.channel_ht, &iter);
e4421fec 387
6065ceec
DG
388 rcu_read_unlock();
389
3bd1e081
MD
390 if (channel->mmap_base != NULL) {
391 ret = munmap(channel->mmap_base, channel->mmap_len);
392 if (ret != 0) {
393 perror("munmap");
394 }
395 }
b5c5fc29 396 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
4c462e79
MD
397 ret = close(channel->wait_fd);
398 if (ret) {
399 PERROR("close");
400 }
3bd1e081 401 }
2c1dd183 402 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
4c462e79
MD
403 ret = close(channel->shm_fd);
404 if (ret) {
405 PERROR("close");
406 }
3bd1e081 407 }
702b1ea4
MD
408
409 call_rcu(&channel->node.head, consumer_free_channel);
3bd1e081
MD
410end:
411 pthread_mutex_unlock(&consumer_data.lock);
412}
413
414struct lttng_consumer_channel *consumer_allocate_channel(
415 int channel_key,
416 int shm_fd, int wait_fd,
417 uint64_t mmap_len,
418 uint64_t max_sb_size)
419{
420 struct lttng_consumer_channel *channel;
421 int ret;
422
276b26d1 423 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
424 if (channel == NULL) {
425 perror("malloc struct lttng_consumer_channel");
426 goto end;
427 }
428 channel->key = channel_key;
429 channel->shm_fd = shm_fd;
430 channel->wait_fd = wait_fd;
431 channel->mmap_len = mmap_len;
432 channel->max_sb_size = max_sb_size;
433 channel->refcount = 0;
434 channel->nr_streams = 0;
e4421fec 435 lttng_ht_node_init_ulong(&channel->node, channel->key);
3bd1e081
MD
436
437 switch (consumer_data.type) {
438 case LTTNG_CONSUMER_KERNEL:
439 channel->mmap_base = NULL;
440 channel->mmap_len = 0;
441 break;
7753dea8
MD
442 case LTTNG_CONSUMER32_UST:
443 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
444 ret = lttng_ustconsumer_allocate_channel(channel);
445 if (ret) {
446 free(channel);
447 return NULL;
448 }
449 break;
450 default:
451 ERR("Unknown consumer_data type");
452 assert(0);
453 goto end;
454 }
455 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
456 channel->key,
457 channel->shm_fd,
458 channel->wait_fd,
459 (unsigned long long) channel->mmap_len,
460 (unsigned long long) channel->max_sb_size);
461end:
462 return channel;
463}
464
465/*
466 * Add a channel to the global list protected by a mutex.
467 */
468int consumer_add_channel(struct lttng_consumer_channel *channel)
469{
3bd1e081 470 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
471 /* Steal channel identifier, for UST */
472 consumer_steal_channel_key(channel->key);
6065ceec 473 rcu_read_lock();
702b1ea4
MD
474 /*
475 * We simply remove the old channel from the hash table. It's
476 * ok, since we know for sure the sessiond wants to replace it
477 * with the new version, because the key has been reused.
478 */
479 (void) lttng_ht_add_replace_ulong(consumer_data.channel_ht, &channel->node);
6065ceec 480 rcu_read_unlock();
3bd1e081 481 pthread_mutex_unlock(&consumer_data.lock);
702b1ea4 482
7ad0a0cb 483 return 0;
3bd1e081
MD
484}
485
486/*
487 * Allocate the pollfd structure and the local view of the out fds to avoid
488 * doing a lookup in the linked list and concurrency issues when writing is
489 * needed. Called with consumer_data.lock held.
490 *
491 * Returns the number of fds in the structures.
492 */
493int consumer_update_poll_array(
494 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
495 struct lttng_consumer_stream **local_stream)
496{
3bd1e081 497 int i = 0;
e4421fec
DG
498 struct lttng_ht_iter iter;
499 struct lttng_consumer_stream *stream;
3bd1e081
MD
500
501 DBG("Updating poll fd array");
e4421fec
DG
502 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
503 node.node) {
504 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
3bd1e081
MD
505 continue;
506 }
e4421fec
DG
507 DBG("Active FD %d", stream->wait_fd);
508 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 509 (*pollfd)[i].events = POLLIN | POLLPRI;
e4421fec 510 local_stream[i] = stream;
3bd1e081
MD
511 i++;
512 }
513
514 /*
515 * Insert the consumer_poll_pipe at the end of the array and don't
516 * increment i so nb_fd is the number of real FD.
517 */
518 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
519 (*pollfd)[i].events = POLLIN;
520 return i;
521}
522
523/*
524 * Poll on the should_quit pipe and the command socket return -1 on error and
525 * should exit, 0 if data is available on the command socket
526 */
527int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
528{
529 int num_rdy;
530
88f2b785 531restart:
3bd1e081
MD
532 num_rdy = poll(consumer_sockpoll, 2, -1);
533 if (num_rdy == -1) {
88f2b785
MD
534 /*
535 * Restart interrupted system call.
536 */
537 if (errno == EINTR) {
538 goto restart;
539 }
3bd1e081
MD
540 perror("Poll error");
541 goto exit;
542 }
543 if (consumer_sockpoll[0].revents == POLLIN) {
544 DBG("consumer_should_quit wake up");
545 goto exit;
546 }
547 return 0;
548
549exit:
550 return -1;
551}
552
553/*
554 * Set the error socket.
555 */
556void lttng_consumer_set_error_sock(
557 struct lttng_consumer_local_data *ctx, int sock)
558{
559 ctx->consumer_error_socket = sock;
560}
561
562/*
563 * Set the command socket path.
564 */
565
566void lttng_consumer_set_command_sock_path(
567 struct lttng_consumer_local_data *ctx, char *sock)
568{
569 ctx->consumer_command_sock_path = sock;
570}
571
572/*
573 * Send return code to the session daemon.
574 * If the socket is not defined, we return 0, it is not a fatal error
575 */
576int lttng_consumer_send_error(
577 struct lttng_consumer_local_data *ctx, int cmd)
578{
579 if (ctx->consumer_error_socket > 0) {
580 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
581 sizeof(enum lttcomm_sessiond_command));
582 }
583
584 return 0;
585}
586
587/*
588 * Close all the tracefiles and stream fds, should be called when all instances
589 * are destroyed.
590 */
591void lttng_consumer_cleanup(void)
592{
e4421fec 593 struct lttng_ht_iter iter;
6065ceec
DG
594 struct lttng_ht_node_ulong *node;
595
596 rcu_read_lock();
3bd1e081
MD
597
598 /*
6065ceec
DG
599 * close all outfd. Called when there are no more threads running (after
600 * joining on the threads), no need to protect list iteration with mutex.
3bd1e081 601 */
6065ceec
DG
602 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
603 node) {
702b1ea4
MD
604 struct lttng_consumer_stream *stream =
605 caa_container_of(node, struct lttng_consumer_stream, node);
606 consumer_del_stream(stream);
3bd1e081 607 }
e4421fec 608
6065ceec
DG
609 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
610 node) {
702b1ea4
MD
611 struct lttng_consumer_channel *channel =
612 caa_container_of(node, struct lttng_consumer_channel, node);
613 consumer_del_channel(channel);
3bd1e081 614 }
6065ceec
DG
615
616 rcu_read_unlock();
3bd1e081
MD
617}
618
619/*
620 * Called from signal handler.
621 */
622void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
623{
624 int ret;
625 consumer_quit = 1;
626 ret = write(ctx->consumer_should_quit[1], "4", 1);
627 if (ret < 0) {
628 perror("write consumer quit");
629 }
630}
631
632void lttng_consumer_sync_trace_file(
633 struct lttng_consumer_stream *stream, off_t orig_offset)
634{
635 int outfd = stream->out_fd;
636
637 /*
638 * This does a blocking write-and-wait on any page that belongs to the
639 * subbuffer prior to the one we just wrote.
640 * Don't care about error values, as these are just hints and ways to
641 * limit the amount of page cache used.
642 */
643 if (orig_offset < stream->chan->max_sb_size) {
644 return;
645 }
b9182dd9 646 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
3bd1e081
MD
647 stream->chan->max_sb_size,
648 SYNC_FILE_RANGE_WAIT_BEFORE
649 | SYNC_FILE_RANGE_WRITE
650 | SYNC_FILE_RANGE_WAIT_AFTER);
651 /*
652 * Give hints to the kernel about how we access the file:
653 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
654 * we write it.
655 *
656 * We need to call fadvise again after the file grows because the
657 * kernel does not seem to apply fadvise to non-existing parts of the
658 * file.
659 *
660 * Call fadvise _after_ having waited for the page writeback to
661 * complete because the dirty page writeback semantic is not well
662 * defined. So it can be expected to lead to lower throughput in
663 * streaming.
664 */
665 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
666 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
667}
668
669/*
670 * Initialise the necessary environnement :
671 * - create a new context
672 * - create the poll_pipe
673 * - create the should_quit pipe (for signal handler)
674 * - create the thread pipe (for splice)
675 *
676 * Takes a function pointer as argument, this function is called when data is
677 * available on a buffer. This function is responsible to do the
678 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
679 * buffer configuration and then kernctl_put_next_subbuf at the end.
680 *
681 * Returns a pointer to the new context or NULL on error.
682 */
683struct lttng_consumer_local_data *lttng_consumer_create(
684 enum lttng_consumer_type type,
4078b776 685 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
d41f73b7 686 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
687 int (*recv_channel)(struct lttng_consumer_channel *channel),
688 int (*recv_stream)(struct lttng_consumer_stream *stream),
689 int (*update_stream)(int stream_key, uint32_t state))
690{
691 int ret, i;
692 struct lttng_consumer_local_data *ctx;
693
694 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
695 consumer_data.type == type);
696 consumer_data.type = type;
697
effcf122 698 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081
MD
699 if (ctx == NULL) {
700 perror("allocating context");
701 goto error;
702 }
703
704 ctx->consumer_error_socket = -1;
705 /* assign the callbacks */
706 ctx->on_buffer_ready = buffer_ready;
707 ctx->on_recv_channel = recv_channel;
708 ctx->on_recv_stream = recv_stream;
709 ctx->on_update_stream = update_stream;
710
711 ret = pipe(ctx->consumer_poll_pipe);
712 if (ret < 0) {
713 perror("Error creating poll pipe");
714 goto error_poll_pipe;
715 }
716
717 ret = pipe(ctx->consumer_should_quit);
718 if (ret < 0) {
719 perror("Error creating recv pipe");
720 goto error_quit_pipe;
721 }
722
723 ret = pipe(ctx->consumer_thread_pipe);
724 if (ret < 0) {
725 perror("Error creating thread pipe");
726 goto error_thread_pipe;
727 }
728
729 return ctx;
730
731
732error_thread_pipe:
733 for (i = 0; i < 2; i++) {
734 int err;
735
736 err = close(ctx->consumer_should_quit[i]);
4c462e79
MD
737 if (err) {
738 PERROR("close");
739 }
3bd1e081
MD
740 }
741error_quit_pipe:
742 for (i = 0; i < 2; i++) {
743 int err;
744
745 err = close(ctx->consumer_poll_pipe[i]);
4c462e79
MD
746 if (err) {
747 PERROR("close");
748 }
3bd1e081
MD
749 }
750error_poll_pipe:
751 free(ctx);
752error:
753 return NULL;
754}
755
756/*
757 * Close all fds associated with the instance and free the context.
758 */
759void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
760{
4c462e79
MD
761 int ret;
762
763 ret = close(ctx->consumer_error_socket);
764 if (ret) {
765 PERROR("close");
766 }
767 ret = close(ctx->consumer_thread_pipe[0]);
768 if (ret) {
769 PERROR("close");
770 }
771 ret = close(ctx->consumer_thread_pipe[1]);
772 if (ret) {
773 PERROR("close");
774 }
775 ret = close(ctx->consumer_poll_pipe[0]);
776 if (ret) {
777 PERROR("close");
778 }
779 ret = close(ctx->consumer_poll_pipe[1]);
780 if (ret) {
781 PERROR("close");
782 }
783 ret = close(ctx->consumer_should_quit[0]);
784 if (ret) {
785 PERROR("close");
786 }
787 ret = close(ctx->consumer_should_quit[1]);
788 if (ret) {
789 PERROR("close");
790 }
3bd1e081
MD
791 unlink(ctx->consumer_command_sock_path);
792 free(ctx);
793}
794
795/*
796 * Mmap the ring buffer, read it and write the data to the tracefile.
797 *
798 * Returns the number of bytes written
799 */
4078b776 800ssize_t lttng_consumer_on_read_subbuffer_mmap(
3bd1e081
MD
801 struct lttng_consumer_local_data *ctx,
802 struct lttng_consumer_stream *stream, unsigned long len)
803{
804 switch (consumer_data.type) {
805 case LTTNG_CONSUMER_KERNEL:
806 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
7753dea8
MD
807 case LTTNG_CONSUMER32_UST:
808 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
809 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
810 default:
811 ERR("Unknown consumer_data type");
812 assert(0);
813 }
b9182dd9
DG
814
815 return 0;
3bd1e081
MD
816}
817
818/*
819 * Splice the data from the ring buffer to the tracefile.
820 *
821 * Returns the number of bytes spliced.
822 */
4078b776 823ssize_t lttng_consumer_on_read_subbuffer_splice(
3bd1e081
MD
824 struct lttng_consumer_local_data *ctx,
825 struct lttng_consumer_stream *stream, unsigned long len)
826{
827 switch (consumer_data.type) {
828 case LTTNG_CONSUMER_KERNEL:
829 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
7753dea8
MD
830 case LTTNG_CONSUMER32_UST:
831 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
832 return -ENOSYS;
833 default:
834 ERR("Unknown consumer_data type");
835 assert(0);
836 return -ENOSYS;
837 }
838
839}
840
841/*
842 * Take a snapshot for a specific fd
843 *
844 * Returns 0 on success, < 0 on error
845 */
846int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
847 struct lttng_consumer_stream *stream)
848{
849 switch (consumer_data.type) {
850 case LTTNG_CONSUMER_KERNEL:
851 return lttng_kconsumer_take_snapshot(ctx, stream);
7753dea8
MD
852 case LTTNG_CONSUMER32_UST:
853 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
854 return lttng_ustconsumer_take_snapshot(ctx, stream);
855 default:
856 ERR("Unknown consumer_data type");
857 assert(0);
858 return -ENOSYS;
859 }
860
861}
862
863/*
864 * Get the produced position
865 *
866 * Returns 0 on success, < 0 on error
867 */
868int lttng_consumer_get_produced_snapshot(
869 struct lttng_consumer_local_data *ctx,
870 struct lttng_consumer_stream *stream,
871 unsigned long *pos)
872{
873 switch (consumer_data.type) {
874 case LTTNG_CONSUMER_KERNEL:
875 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
7753dea8
MD
876 case LTTNG_CONSUMER32_UST:
877 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
878 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
879 default:
880 ERR("Unknown consumer_data type");
881 assert(0);
882 return -ENOSYS;
883 }
884}
885
886int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
887 int sock, struct pollfd *consumer_sockpoll)
888{
889 switch (consumer_data.type) {
890 case LTTNG_CONSUMER_KERNEL:
891 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
892 case LTTNG_CONSUMER32_UST:
893 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
894 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
895 default:
896 ERR("Unknown consumer_data type");
897 assert(0);
898 return -ENOSYS;
899 }
900}
901
902/*
e4421fec 903 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
904 * it to tracefile if necessary.
905 */
906void *lttng_consumer_thread_poll_fds(void *data)
907{
908 int num_rdy, num_hup, high_prio, ret, i;
909 struct pollfd *pollfd = NULL;
910 /* local view of the streams */
911 struct lttng_consumer_stream **local_stream = NULL;
912 /* local view of consumer_data.fds_count */
913 int nb_fd = 0;
914 char tmp;
915 int tmp2;
916 struct lttng_consumer_local_data *ctx = data;
917
e7b994a3
DG
918 rcu_register_thread();
919
effcf122 920 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
921
922 while (1) {
923 high_prio = 0;
924 num_hup = 0;
925
926 /*
e4421fec 927 * the fds set has been updated, we need to update our
3bd1e081
MD
928 * local array as well
929 */
930 pthread_mutex_lock(&consumer_data.lock);
931 if (consumer_data.need_update) {
932 if (pollfd != NULL) {
933 free(pollfd);
934 pollfd = NULL;
935 }
936 if (local_stream != NULL) {
937 free(local_stream);
938 local_stream = NULL;
939 }
940
941 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 942 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081
MD
943 if (pollfd == NULL) {
944 perror("pollfd malloc");
945 pthread_mutex_unlock(&consumer_data.lock);
946 goto end;
947 }
948
949 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 950 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
951 sizeof(struct lttng_consumer_stream));
952 if (local_stream == NULL) {
953 perror("local_stream malloc");
954 pthread_mutex_unlock(&consumer_data.lock);
955 goto end;
956 }
957 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
958 if (ret < 0) {
959 ERR("Error in allocating pollfd or local_outfds");
960 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
961 pthread_mutex_unlock(&consumer_data.lock);
962 goto end;
963 }
964 nb_fd = ret;
965 consumer_data.need_update = 0;
966 }
967 pthread_mutex_unlock(&consumer_data.lock);
968
4078b776
MD
969 /* No FDs and consumer_quit, consumer_cleanup the thread */
970 if (nb_fd == 0 && consumer_quit == 1) {
971 goto end;
972 }
3bd1e081 973 /* poll on the array of fds */
88f2b785 974 restart:
3bd1e081
MD
975 DBG("polling on %d fd", nb_fd + 1);
976 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
977 DBG("poll num_rdy : %d", num_rdy);
978 if (num_rdy == -1) {
88f2b785
MD
979 /*
980 * Restart interrupted system call.
981 */
982 if (errno == EINTR) {
983 goto restart;
984 }
3bd1e081
MD
985 perror("Poll error");
986 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
987 goto end;
988 } else if (num_rdy == 0) {
989 DBG("Polling thread timed out");
990 goto end;
991 }
992
3bd1e081
MD
993 /*
994 * If the consumer_poll_pipe triggered poll go
995 * directly to the beginning of the loop to update the
996 * array. We want to prioritize array update over
997 * low-priority reads.
998 */
d41f73b7 999 if (pollfd[nb_fd].revents & POLLIN) {
3bd1e081
MD
1000 DBG("consumer_poll_pipe wake up");
1001 tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
1002 if (tmp2 < 0) {
d41f73b7 1003 perror("read consumer poll");
3bd1e081
MD
1004 }
1005 continue;
1006 }
1007
1008 /* Take care of high priority channels first. */
1009 for (i = 0; i < nb_fd; i++) {
d41f73b7 1010 if (pollfd[i].revents & POLLPRI) {
4078b776
MD
1011 ssize_t len;
1012
d41f73b7
MD
1013 DBG("Urgent read on fd %d", pollfd[i].fd);
1014 high_prio = 1;
4078b776 1015 len = ctx->on_buffer_ready(local_stream[i], ctx);
d41f73b7 1016 /* it's ok to have an unavailable sub-buffer */
4078b776
MD
1017 if (len < 0 && len != -EAGAIN) {
1018 goto end;
1019 } else if (len > 0) {
1020 local_stream[i]->data_read = 1;
d41f73b7 1021 }
3bd1e081
MD
1022 }
1023 }
1024
4078b776
MD
1025 /*
1026 * If we read high prio channel in this loop, try again
1027 * for more high prio data.
1028 */
1029 if (high_prio) {
3bd1e081
MD
1030 continue;
1031 }
1032
1033 /* Take care of low priority channels. */
4078b776
MD
1034 for (i = 0; i < nb_fd; i++) {
1035 if ((pollfd[i].revents & POLLIN) ||
1036 local_stream[i]->hangup_flush_done) {
1037 ssize_t len;
1038
1039 assert(!(pollfd[i].revents & POLLERR));
1040 assert(!(pollfd[i].revents & POLLNVAL));
1041 DBG("Normal read on fd %d", pollfd[i].fd);
1042 len = ctx->on_buffer_ready(local_stream[i], ctx);
1043 /* it's ok to have an unavailable sub-buffer */
1044 if (len < 0 && len != -EAGAIN) {
1045 goto end;
1046 } else if (len > 0) {
1047 local_stream[i]->data_read = 1;
1048 }
1049 }
1050 }
1051
1052 /* Handle hangup and errors */
1053 for (i = 0; i < nb_fd; i++) {
1054 if (!local_stream[i]->hangup_flush_done
1055 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
1056 && (consumer_data.type == LTTNG_CONSUMER32_UST
1057 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1058 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1059 pollfd[i].fd);
1060 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
1061 /* Attempt read again, for the data we just flushed. */
1062 local_stream[i]->data_read = 1;
1063 }
1064 /*
1065 * If the poll flag is HUP/ERR/NVAL and we have
1066 * read no data in this pass, we can remove the
1067 * stream from its hash table.
1068 */
1069 if ((pollfd[i].revents & POLLHUP)) {
1070 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
1071 if (!local_stream[i]->data_read) {
702b1ea4 1072 consumer_del_stream(local_stream[i]);
4078b776
MD
1073 num_hup++;
1074 }
1075 } else if (pollfd[i].revents & POLLERR) {
1076 ERR("Error returned in polling fd %d.", pollfd[i].fd);
1077 if (!local_stream[i]->data_read) {
702b1ea4 1078 consumer_del_stream(local_stream[i]);
4078b776
MD
1079 num_hup++;
1080 }
1081 } else if (pollfd[i].revents & POLLNVAL) {
1082 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
1083 if (!local_stream[i]->data_read) {
702b1ea4 1084 consumer_del_stream(local_stream[i]);
4078b776 1085 num_hup++;
3bd1e081
MD
1086 }
1087 }
4078b776 1088 local_stream[i]->data_read = 0;
3bd1e081
MD
1089 }
1090 }
1091end:
1092 DBG("polling thread exiting");
1093 if (pollfd != NULL) {
1094 free(pollfd);
1095 pollfd = NULL;
1096 }
1097 if (local_stream != NULL) {
1098 free(local_stream);
1099 local_stream = NULL;
1100 }
e7b994a3 1101 rcu_unregister_thread();
3bd1e081
MD
1102 return NULL;
1103}
1104
1105/*
1106 * This thread listens on the consumerd socket and receives the file
1107 * descriptors from the session daemon.
1108 */
1109void *lttng_consumer_thread_receive_fds(void *data)
1110{
1111 int sock, client_socket, ret;
1112 /*
1113 * structure to poll for incoming data on communication socket avoids
1114 * making blocking sockets.
1115 */
1116 struct pollfd consumer_sockpoll[2];
1117 struct lttng_consumer_local_data *ctx = data;
1118
e7b994a3
DG
1119 rcu_register_thread();
1120
3bd1e081
MD
1121 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
1122 unlink(ctx->consumer_command_sock_path);
1123 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
1124 if (client_socket < 0) {
1125 ERR("Cannot create command socket");
1126 goto end;
1127 }
1128
1129 ret = lttcomm_listen_unix_sock(client_socket);
1130 if (ret < 0) {
1131 goto end;
1132 }
1133
32258573 1134 DBG("Sending ready command to lttng-sessiond");
3bd1e081
MD
1135 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
1136 /* return < 0 on error, but == 0 is not fatal */
1137 if (ret < 0) {
32258573 1138 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
1139 goto end;
1140 }
1141
1142 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
1143 if (ret < 0) {
1144 perror("fcntl O_NONBLOCK");
1145 goto end;
1146 }
1147
1148 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1149 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
1150 consumer_sockpoll[0].events = POLLIN | POLLPRI;
1151 consumer_sockpoll[1].fd = client_socket;
1152 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1153
1154 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1155 goto end;
1156 }
1157 DBG("Connection on client_socket");
1158
1159 /* Blocking call, waiting for transmission */
1160 sock = lttcomm_accept_unix_sock(client_socket);
1161 if (sock <= 0) {
1162 WARN("On accept");
1163 goto end;
1164 }
1165 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
1166 if (ret < 0) {
1167 perror("fcntl O_NONBLOCK");
1168 goto end;
1169 }
1170
1171 /* update the polling structure to poll on the established socket */
1172 consumer_sockpoll[1].fd = sock;
1173 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1174
1175 while (1) {
1176 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1177 goto end;
1178 }
1179 DBG("Incoming command on sock");
1180 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1181 if (ret == -ENOENT) {
1182 DBG("Received STOP command");
1183 goto end;
1184 }
1185 if (ret < 0) {
1186 ERR("Communication interrupted on command socket");
1187 goto end;
1188 }
1189 if (consumer_quit) {
1190 DBG("consumer_thread_receive_fds received quit from signal");
1191 goto end;
1192 }
1193 DBG("received fds on sock");
1194 }
1195end:
1196 DBG("consumer_thread_receive_fds exiting");
1197
1198 /*
1199 * when all fds have hung up, the polling thread
1200 * can exit cleanly
1201 */
1202 consumer_quit = 1;
1203
1204 /*
1205 * 2s of grace period, if no polling events occur during
1206 * this period, the polling thread will exit even if there
1207 * are still open FDs (should not happen, but safety mechanism).
1208 */
1209 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1210
1211 /* wake up the polling thread */
1212 ret = write(ctx->consumer_poll_pipe[1], "4", 1);
1213 if (ret < 0) {
1214 perror("poll pipe write");
1215 }
e7b994a3 1216 rcu_unregister_thread();
3bd1e081
MD
1217 return NULL;
1218}
d41f73b7 1219
4078b776 1220ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
d41f73b7
MD
1221 struct lttng_consumer_local_data *ctx)
1222{
1223 switch (consumer_data.type) {
1224 case LTTNG_CONSUMER_KERNEL:
1225 return lttng_kconsumer_read_subbuffer(stream, ctx);
7753dea8
MD
1226 case LTTNG_CONSUMER32_UST:
1227 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1228 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1229 default:
1230 ERR("Unknown consumer_data type");
1231 assert(0);
1232 return -ENOSYS;
1233 }
1234}
1235
1236int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1237{
1238 switch (consumer_data.type) {
1239 case LTTNG_CONSUMER_KERNEL:
1240 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
1241 case LTTNG_CONSUMER32_UST:
1242 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1243 return lttng_ustconsumer_on_recv_stream(stream);
1244 default:
1245 ERR("Unknown consumer_data type");
1246 assert(0);
1247 return -ENOSYS;
1248 }
1249}
e4421fec
DG
1250
1251/*
1252 * Allocate and set consumer data hash tables.
1253 */
1254void lttng_consumer_init(void)
1255{
1256 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1257 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1258}
1259
This page took 0.081808 seconds and 4 git commands to generate.