Fix multi-session UST handling combined with shm fd teardown
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
22#include <fcntl.h>
23#include <poll.h>
24#include <pthread.h>
25#include <stdlib.h>
26#include <string.h>
27#include <sys/mman.h>
28#include <sys/socket.h>
29#include <sys/types.h>
30#include <unistd.h>
31
32#include <lttng-kernel-ctl.h>
33#include <lttng-sessiond-comm.h>
34#include <lttng/lttng-consumer.h>
35#include <lttng/lttng-kconsumer.h>
36#include <lttng/lttng-ustconsumer.h>
37#include <lttngerr.h>
38
39struct lttng_consumer_global_data consumer_data = {
40 .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
41 .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
42 .stream_count = 0,
43 .need_update = 1,
44 .type = LTTNG_CONSUMER_UNKNOWN,
45};
46
47/* timeout parameter, to control the polling thread grace period. */
48int consumer_poll_timeout = -1;
49
50/*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56volatile int consumer_quit = 0;
57
58/*
59 * Find a stream. The consumer_data.lock must be locked during this
60 * call.
61 */
62static struct lttng_consumer_stream *consumer_find_stream(int key)
63{
64 struct lttng_consumer_stream *iter;
65
7ad0a0cb
MD
66 /* Negative keys are lookup failures */
67 if (key < 0)
68 return NULL;
3bd1e081
MD
69 cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
70 if (iter->key == key) {
71 DBG("Found stream key %d", key);
72 return iter;
73 }
74 }
75 return NULL;
76}
77
7ad0a0cb
MD
78static void consumer_steal_stream_key(int key)
79{
80 struct lttng_consumer_stream *stream;
81
82 stream = consumer_find_stream(key);
83 if (stream)
84 stream->key = -1;
85}
86
3bd1e081
MD
87static struct lttng_consumer_channel *consumer_find_channel(int key)
88{
89 struct lttng_consumer_channel *iter;
90
7ad0a0cb
MD
91 /* Negative keys are lookup failures */
92 if (key < 0)
93 return NULL;
3bd1e081
MD
94 cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
95 if (iter->key == key) {
96 DBG("Found channel key %d", key);
97 return iter;
98 }
99 }
100 return NULL;
101}
102
7ad0a0cb
MD
103static void consumer_steal_channel_key(int key)
104{
105 struct lttng_consumer_channel *channel;
106
107 channel = consumer_find_channel(key);
108 if (channel)
109 channel->key = -1;
110}
111
3bd1e081
MD
112/*
113 * Remove a stream from the global list protected by a mutex. This
114 * function is also responsible for freeing its data structures.
115 */
116void consumer_del_stream(struct lttng_consumer_stream *stream)
117{
118 int ret;
119 struct lttng_consumer_channel *free_chan = NULL;
120
121 pthread_mutex_lock(&consumer_data.lock);
122
123 switch (consumer_data.type) {
124 case LTTNG_CONSUMER_KERNEL:
125 if (stream->mmap_base != NULL) {
126 ret = munmap(stream->mmap_base, stream->mmap_len);
127 if (ret != 0) {
128 perror("munmap");
129 }
130 }
131 break;
132 case LTTNG_CONSUMER_UST:
133 lttng_ustconsumer_del_stream(stream);
134 break;
135 default:
136 ERR("Unknown consumer_data type");
137 assert(0);
138 goto end;
139 }
140
141 cds_list_del(&stream->list);
142 if (consumer_data.stream_count <= 0) {
143 goto end;
144 }
145 consumer_data.stream_count--;
146 if (!stream) {
147 goto end;
148 }
149 if (stream->out_fd >= 0) {
150 close(stream->out_fd);
151 }
b5c5fc29 152 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
3bd1e081
MD
153 close(stream->wait_fd);
154 }
b5c5fc29
MD
155 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd
156 && !stream->shm_fd_is_copy) {
3bd1e081
MD
157 close(stream->shm_fd);
158 }
159 if (!--stream->chan->refcount)
160 free_chan = stream->chan;
161 free(stream);
162end:
163 consumer_data.need_update = 1;
164 pthread_mutex_unlock(&consumer_data.lock);
165
166 if (free_chan)
167 consumer_del_channel(free_chan);
168}
169
170struct lttng_consumer_stream *consumer_allocate_stream(
171 int channel_key, int stream_key,
172 int shm_fd, int wait_fd,
173 enum lttng_consumer_stream_state state,
174 uint64_t mmap_len,
175 enum lttng_event_output output,
176 const char *path_name)
177{
178 struct lttng_consumer_stream *stream;
179 int ret;
180
181 stream = malloc(sizeof(*stream));
182 if (stream == NULL) {
183 perror("malloc struct lttng_consumer_stream");
184 goto end;
185 }
186 stream->chan = consumer_find_channel(channel_key);
187 if (!stream->chan) {
188 perror("Unable to find channel key");
189 goto end;
190 }
191 stream->chan->refcount++;
192 stream->key = stream_key;
193 stream->shm_fd = shm_fd;
194 stream->wait_fd = wait_fd;
195 stream->out_fd = -1;
196 stream->out_fd_offset = 0;
197 stream->state = state;
198 stream->mmap_len = mmap_len;
199 stream->mmap_base = NULL;
200 stream->output = output;
201 strncpy(stream->path_name, path_name, PATH_MAX - 1);
202 stream->path_name[PATH_MAX - 1] = '\0';
203
204 switch (consumer_data.type) {
205 case LTTNG_CONSUMER_KERNEL:
206 break;
207 case LTTNG_CONSUMER_UST:
5af2f756 208 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
209 ret = lttng_ustconsumer_allocate_stream(stream);
210 if (ret) {
211 free(stream);
212 return NULL;
213 }
214 break;
215 default:
216 ERR("Unknown consumer_data type");
217 assert(0);
218 goto end;
219 }
220 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
221 stream->path_name, stream->key,
222 stream->shm_fd,
223 stream->wait_fd,
224 (unsigned long long) stream->mmap_len,
225 stream->out_fd);
226end:
227 return stream;
228}
229
230/*
231 * Add a stream to the global list protected by a mutex.
232 */
233int consumer_add_stream(struct lttng_consumer_stream *stream)
234{
235 int ret = 0;
236
237 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
238 /* Steal stream identifier, for UST */
239 consumer_steal_stream_key(stream->key);
3bd1e081
MD
240 cds_list_add(&stream->list, &consumer_data.stream_list.head);
241 consumer_data.stream_count++;
242 consumer_data.need_update = 1;
243
244 switch (consumer_data.type) {
245 case LTTNG_CONSUMER_KERNEL:
246 break;
247 case LTTNG_CONSUMER_UST:
248 /* Streams are in CPU number order (we rely on this) */
249 stream->cpu = stream->chan->nr_streams++;
250 break;
251 default:
252 ERR("Unknown consumer_data type");
253 assert(0);
254 goto end;
255 }
256
257end:
258 pthread_mutex_unlock(&consumer_data.lock);
259 return ret;
260}
261
262/*
263 * Update a stream according to what we just received.
264 */
265void consumer_change_stream_state(int stream_key,
266 enum lttng_consumer_stream_state state)
267{
268 struct lttng_consumer_stream *stream;
269
270 pthread_mutex_lock(&consumer_data.lock);
271 stream = consumer_find_stream(stream_key);
272 if (stream) {
273 stream->state = state;
274 }
275 consumer_data.need_update = 1;
276 pthread_mutex_unlock(&consumer_data.lock);
277}
278
279/*
280 * Remove a channel from the global list protected by a mutex. This
281 * function is also responsible for freeing its data structures.
282 */
283void consumer_del_channel(struct lttng_consumer_channel *channel)
284{
285 int ret;
286
287 pthread_mutex_lock(&consumer_data.lock);
288
289 switch (consumer_data.type) {
290 case LTTNG_CONSUMER_KERNEL:
291 break;
292 case LTTNG_CONSUMER_UST:
293 lttng_ustconsumer_del_channel(channel);
294 break;
295 default:
296 ERR("Unknown consumer_data type");
297 assert(0);
298 goto end;
299 }
300
301 cds_list_del(&channel->list);
302 if (channel->mmap_base != NULL) {
303 ret = munmap(channel->mmap_base, channel->mmap_len);
304 if (ret != 0) {
305 perror("munmap");
306 }
307 }
b5c5fc29 308 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
3bd1e081
MD
309 close(channel->wait_fd);
310 }
b5c5fc29
MD
311 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd
312 && !channel->shm_fd_is_copy) {
3bd1e081
MD
313 close(channel->shm_fd);
314 }
315 free(channel);
316end:
317 pthread_mutex_unlock(&consumer_data.lock);
318}
319
320struct lttng_consumer_channel *consumer_allocate_channel(
321 int channel_key,
322 int shm_fd, int wait_fd,
323 uint64_t mmap_len,
324 uint64_t max_sb_size)
325{
326 struct lttng_consumer_channel *channel;
327 int ret;
328
276b26d1 329 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
330 if (channel == NULL) {
331 perror("malloc struct lttng_consumer_channel");
332 goto end;
333 }
334 channel->key = channel_key;
335 channel->shm_fd = shm_fd;
336 channel->wait_fd = wait_fd;
337 channel->mmap_len = mmap_len;
338 channel->max_sb_size = max_sb_size;
339 channel->refcount = 0;
340 channel->nr_streams = 0;
341
342 switch (consumer_data.type) {
343 case LTTNG_CONSUMER_KERNEL:
344 channel->mmap_base = NULL;
345 channel->mmap_len = 0;
346 break;
347 case LTTNG_CONSUMER_UST:
348 ret = lttng_ustconsumer_allocate_channel(channel);
349 if (ret) {
350 free(channel);
351 return NULL;
352 }
353 break;
354 default:
355 ERR("Unknown consumer_data type");
356 assert(0);
357 goto end;
358 }
359 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
360 channel->key,
361 channel->shm_fd,
362 channel->wait_fd,
363 (unsigned long long) channel->mmap_len,
364 (unsigned long long) channel->max_sb_size);
365end:
366 return channel;
367}
368
369/*
370 * Add a channel to the global list protected by a mutex.
371 */
372int consumer_add_channel(struct lttng_consumer_channel *channel)
373{
3bd1e081 374 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
375 /* Steal channel identifier, for UST */
376 consumer_steal_channel_key(channel->key);
3bd1e081 377 cds_list_add(&channel->list, &consumer_data.channel_list.head);
3bd1e081 378 pthread_mutex_unlock(&consumer_data.lock);
7ad0a0cb 379 return 0;
3bd1e081
MD
380}
381
382/*
383 * Allocate the pollfd structure and the local view of the out fds to avoid
384 * doing a lookup in the linked list and concurrency issues when writing is
385 * needed. Called with consumer_data.lock held.
386 *
387 * Returns the number of fds in the structures.
388 */
389int consumer_update_poll_array(
390 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
391 struct lttng_consumer_stream **local_stream)
392{
393 struct lttng_consumer_stream *iter;
394 int i = 0;
395
396 DBG("Updating poll fd array");
397 cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
398 if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
399 continue;
400 }
401 DBG("Active FD %d", iter->wait_fd);
402 (*pollfd)[i].fd = iter->wait_fd;
403 (*pollfd)[i].events = POLLIN | POLLPRI;
404 local_stream[i] = iter;
405 i++;
406 }
407
408 /*
409 * Insert the consumer_poll_pipe at the end of the array and don't
410 * increment i so nb_fd is the number of real FD.
411 */
412 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
413 (*pollfd)[i].events = POLLIN;
414 return i;
415}
416
417/*
418 * Poll on the should_quit pipe and the command socket return -1 on error and
419 * should exit, 0 if data is available on the command socket
420 */
421int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
422{
423 int num_rdy;
424
425 num_rdy = poll(consumer_sockpoll, 2, -1);
426 if (num_rdy == -1) {
427 perror("Poll error");
428 goto exit;
429 }
430 if (consumer_sockpoll[0].revents == POLLIN) {
431 DBG("consumer_should_quit wake up");
432 goto exit;
433 }
434 return 0;
435
436exit:
437 return -1;
438}
439
440/*
441 * Set the error socket.
442 */
443void lttng_consumer_set_error_sock(
444 struct lttng_consumer_local_data *ctx, int sock)
445{
446 ctx->consumer_error_socket = sock;
447}
448
449/*
450 * Set the command socket path.
451 */
452
453void lttng_consumer_set_command_sock_path(
454 struct lttng_consumer_local_data *ctx, char *sock)
455{
456 ctx->consumer_command_sock_path = sock;
457}
458
459/*
460 * Send return code to the session daemon.
461 * If the socket is not defined, we return 0, it is not a fatal error
462 */
463int lttng_consumer_send_error(
464 struct lttng_consumer_local_data *ctx, int cmd)
465{
466 if (ctx->consumer_error_socket > 0) {
467 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
468 sizeof(enum lttcomm_sessiond_command));
469 }
470
471 return 0;
472}
473
474/*
475 * Close all the tracefiles and stream fds, should be called when all instances
476 * are destroyed.
477 */
478void lttng_consumer_cleanup(void)
479{
480 struct lttng_consumer_stream *iter, *tmp;
481 struct lttng_consumer_channel *citer, *ctmp;
482
483 /*
484 * close all outfd. Called when there are no more threads
485 * running (after joining on the threads), no need to protect
486 * list iteration with mutex.
487 */
488 cds_list_for_each_entry_safe(iter, tmp,
489 &consumer_data.stream_list.head, list) {
490 consumer_del_stream(iter);
491 }
492 cds_list_for_each_entry_safe(citer, ctmp,
493 &consumer_data.channel_list.head, list) {
494 consumer_del_channel(citer);
495 }
496}
497
498/*
499 * Called from signal handler.
500 */
501void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
502{
503 int ret;
504 consumer_quit = 1;
505 ret = write(ctx->consumer_should_quit[1], "4", 1);
506 if (ret < 0) {
507 perror("write consumer quit");
508 }
509}
510
511void lttng_consumer_sync_trace_file(
512 struct lttng_consumer_stream *stream, off_t orig_offset)
513{
514 int outfd = stream->out_fd;
515
516 /*
517 * This does a blocking write-and-wait on any page that belongs to the
518 * subbuffer prior to the one we just wrote.
519 * Don't care about error values, as these are just hints and ways to
520 * limit the amount of page cache used.
521 */
522 if (orig_offset < stream->chan->max_sb_size) {
523 return;
524 }
525 sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
526 stream->chan->max_sb_size,
527 SYNC_FILE_RANGE_WAIT_BEFORE
528 | SYNC_FILE_RANGE_WRITE
529 | SYNC_FILE_RANGE_WAIT_AFTER);
530 /*
531 * Give hints to the kernel about how we access the file:
532 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
533 * we write it.
534 *
535 * We need to call fadvise again after the file grows because the
536 * kernel does not seem to apply fadvise to non-existing parts of the
537 * file.
538 *
539 * Call fadvise _after_ having waited for the page writeback to
540 * complete because the dirty page writeback semantic is not well
541 * defined. So it can be expected to lead to lower throughput in
542 * streaming.
543 */
544 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
545 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
546}
547
548/*
549 * Initialise the necessary environnement :
550 * - create a new context
551 * - create the poll_pipe
552 * - create the should_quit pipe (for signal handler)
553 * - create the thread pipe (for splice)
554 *
555 * Takes a function pointer as argument, this function is called when data is
556 * available on a buffer. This function is responsible to do the
557 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
558 * buffer configuration and then kernctl_put_next_subbuf at the end.
559 *
560 * Returns a pointer to the new context or NULL on error.
561 */
562struct lttng_consumer_local_data *lttng_consumer_create(
563 enum lttng_consumer_type type,
d41f73b7
MD
564 int (*buffer_ready)(struct lttng_consumer_stream *stream,
565 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
566 int (*recv_channel)(struct lttng_consumer_channel *channel),
567 int (*recv_stream)(struct lttng_consumer_stream *stream),
568 int (*update_stream)(int stream_key, uint32_t state))
569{
570 int ret, i;
571 struct lttng_consumer_local_data *ctx;
572
573 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
574 consumer_data.type == type);
575 consumer_data.type = type;
576
577 ctx = malloc(sizeof(struct lttng_consumer_local_data));
578 if (ctx == NULL) {
579 perror("allocating context");
580 goto error;
581 }
582
583 ctx->consumer_error_socket = -1;
584 /* assign the callbacks */
585 ctx->on_buffer_ready = buffer_ready;
586 ctx->on_recv_channel = recv_channel;
587 ctx->on_recv_stream = recv_stream;
588 ctx->on_update_stream = update_stream;
589
590 ret = pipe(ctx->consumer_poll_pipe);
591 if (ret < 0) {
592 perror("Error creating poll pipe");
593 goto error_poll_pipe;
594 }
595
596 ret = pipe(ctx->consumer_should_quit);
597 if (ret < 0) {
598 perror("Error creating recv pipe");
599 goto error_quit_pipe;
600 }
601
602 ret = pipe(ctx->consumer_thread_pipe);
603 if (ret < 0) {
604 perror("Error creating thread pipe");
605 goto error_thread_pipe;
606 }
607
608 return ctx;
609
610
611error_thread_pipe:
612 for (i = 0; i < 2; i++) {
613 int err;
614
615 err = close(ctx->consumer_should_quit[i]);
616 assert(!err);
617 }
618error_quit_pipe:
619 for (i = 0; i < 2; i++) {
620 int err;
621
622 err = close(ctx->consumer_poll_pipe[i]);
623 assert(!err);
624 }
625error_poll_pipe:
626 free(ctx);
627error:
628 return NULL;
629}
630
631/*
632 * Close all fds associated with the instance and free the context.
633 */
634void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
635{
636 close(ctx->consumer_error_socket);
637 close(ctx->consumer_thread_pipe[0]);
638 close(ctx->consumer_thread_pipe[1]);
639 close(ctx->consumer_poll_pipe[0]);
640 close(ctx->consumer_poll_pipe[1]);
641 close(ctx->consumer_should_quit[0]);
642 close(ctx->consumer_should_quit[1]);
643 unlink(ctx->consumer_command_sock_path);
644 free(ctx);
645}
646
647/*
648 * Mmap the ring buffer, read it and write the data to the tracefile.
649 *
650 * Returns the number of bytes written
651 */
652int lttng_consumer_on_read_subbuffer_mmap(
653 struct lttng_consumer_local_data *ctx,
654 struct lttng_consumer_stream *stream, unsigned long len)
655{
656 switch (consumer_data.type) {
657 case LTTNG_CONSUMER_KERNEL:
658 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
659 case LTTNG_CONSUMER_UST:
660 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
661 default:
662 ERR("Unknown consumer_data type");
663 assert(0);
664 }
665}
666
667/*
668 * Splice the data from the ring buffer to the tracefile.
669 *
670 * Returns the number of bytes spliced.
671 */
672int lttng_consumer_on_read_subbuffer_splice(
673 struct lttng_consumer_local_data *ctx,
674 struct lttng_consumer_stream *stream, unsigned long len)
675{
676 switch (consumer_data.type) {
677 case LTTNG_CONSUMER_KERNEL:
678 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
679 case LTTNG_CONSUMER_UST:
680 return -ENOSYS;
681 default:
682 ERR("Unknown consumer_data type");
683 assert(0);
684 return -ENOSYS;
685 }
686
687}
688
689/*
690 * Take a snapshot for a specific fd
691 *
692 * Returns 0 on success, < 0 on error
693 */
694int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
695 struct lttng_consumer_stream *stream)
696{
697 switch (consumer_data.type) {
698 case LTTNG_CONSUMER_KERNEL:
699 return lttng_kconsumer_take_snapshot(ctx, stream);
700 case LTTNG_CONSUMER_UST:
701 return lttng_ustconsumer_take_snapshot(ctx, stream);
702 default:
703 ERR("Unknown consumer_data type");
704 assert(0);
705 return -ENOSYS;
706 }
707
708}
709
710/*
711 * Get the produced position
712 *
713 * Returns 0 on success, < 0 on error
714 */
715int lttng_consumer_get_produced_snapshot(
716 struct lttng_consumer_local_data *ctx,
717 struct lttng_consumer_stream *stream,
718 unsigned long *pos)
719{
720 switch (consumer_data.type) {
721 case LTTNG_CONSUMER_KERNEL:
722 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
723 case LTTNG_CONSUMER_UST:
724 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
725 default:
726 ERR("Unknown consumer_data type");
727 assert(0);
728 return -ENOSYS;
729 }
730}
731
732int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
733 int sock, struct pollfd *consumer_sockpoll)
734{
735 switch (consumer_data.type) {
736 case LTTNG_CONSUMER_KERNEL:
737 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
738 case LTTNG_CONSUMER_UST:
739 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
740 default:
741 ERR("Unknown consumer_data type");
742 assert(0);
743 return -ENOSYS;
744 }
745}
746
747/*
748 * This thread polls the fds in the ltt_fd_list to consume the data and write
749 * it to tracefile if necessary.
750 */
751void *lttng_consumer_thread_poll_fds(void *data)
752{
753 int num_rdy, num_hup, high_prio, ret, i;
754 struct pollfd *pollfd = NULL;
755 /* local view of the streams */
756 struct lttng_consumer_stream **local_stream = NULL;
757 /* local view of consumer_data.fds_count */
758 int nb_fd = 0;
759 char tmp;
760 int tmp2;
761 struct lttng_consumer_local_data *ctx = data;
762
763 local_stream = malloc(sizeof(struct lttng_consumer_stream));
764
765 while (1) {
766 high_prio = 0;
767 num_hup = 0;
768
769 /*
770 * the ltt_fd_list has been updated, we need to update our
771 * local array as well
772 */
773 pthread_mutex_lock(&consumer_data.lock);
774 if (consumer_data.need_update) {
775 if (pollfd != NULL) {
776 free(pollfd);
777 pollfd = NULL;
778 }
779 if (local_stream != NULL) {
780 free(local_stream);
781 local_stream = NULL;
782 }
783
784 /* allocate for all fds + 1 for the consumer_poll_pipe */
785 pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
786 if (pollfd == NULL) {
787 perror("pollfd malloc");
788 pthread_mutex_unlock(&consumer_data.lock);
789 goto end;
790 }
791
792 /* allocate for all fds + 1 for the consumer_poll_pipe */
793 local_stream = malloc((consumer_data.stream_count + 1) *
794 sizeof(struct lttng_consumer_stream));
795 if (local_stream == NULL) {
796 perror("local_stream malloc");
797 pthread_mutex_unlock(&consumer_data.lock);
798 goto end;
799 }
800 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
801 if (ret < 0) {
802 ERR("Error in allocating pollfd or local_outfds");
803 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
804 pthread_mutex_unlock(&consumer_data.lock);
805 goto end;
806 }
807 nb_fd = ret;
808 consumer_data.need_update = 0;
809 }
810 pthread_mutex_unlock(&consumer_data.lock);
811
812 /* poll on the array of fds */
813 DBG("polling on %d fd", nb_fd + 1);
814 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
815 DBG("poll num_rdy : %d", num_rdy);
816 if (num_rdy == -1) {
817 perror("Poll error");
818 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
819 goto end;
820 } else if (num_rdy == 0) {
821 DBG("Polling thread timed out");
822 goto end;
823 }
824
d41f73b7 825 /* No FDs and consumer_quit, consumer_cleanup the thread */
3bd1e081
MD
826 if (nb_fd == 0 && consumer_quit == 1) {
827 goto end;
828 }
829
830 /*
831 * If the consumer_poll_pipe triggered poll go
832 * directly to the beginning of the loop to update the
833 * array. We want to prioritize array update over
834 * low-priority reads.
835 */
d41f73b7 836 if (pollfd[nb_fd].revents & POLLIN) {
3bd1e081
MD
837 DBG("consumer_poll_pipe wake up");
838 tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
839 if (tmp2 < 0) {
d41f73b7 840 perror("read consumer poll");
3bd1e081
MD
841 }
842 continue;
843 }
844
845 /* Take care of high priority channels first. */
846 for (i = 0; i < nb_fd; i++) {
d41f73b7
MD
847 if (pollfd[i].revents & POLLPRI) {
848 DBG("Urgent read on fd %d", pollfd[i].fd);
849 high_prio = 1;
850 ret = ctx->on_buffer_ready(local_stream[i], ctx);
851 /* it's ok to have an unavailable sub-buffer */
852 if (ret == EAGAIN) {
853 ret = 0;
854 }
855 } else if (pollfd[i].revents & POLLERR) {
3bd1e081
MD
856 ERR("Error returned in polling fd %d.", pollfd[i].fd);
857 consumer_del_stream(local_stream[i]);
858 num_hup++;
d41f73b7
MD
859 } else if (pollfd[i].revents & POLLNVAL) {
860 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
3bd1e081
MD
861 consumer_del_stream(local_stream[i]);
862 num_hup++;
1c3c14ac
MD
863 } else if ((pollfd[i].revents & POLLHUP) &&
864 !(pollfd[i].revents & POLLIN)) {
d056b477
MD
865 if (consumer_data.type == LTTNG_CONSUMER_UST) {
866 DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
867 pollfd[i].fd);
868 if (!local_stream[i]->hangup_flush_done) {
869 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
870 /* try reading after flush */
871 ret = ctx->on_buffer_ready(local_stream[i], ctx);
872 /* it's ok to have an unavailable sub-buffer */
873 if (ret == EAGAIN) {
874 ret = 0;
875 }
876 }
877 } else {
878 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
879 }
3bd1e081
MD
880 consumer_del_stream(local_stream[i]);
881 num_hup++;
3bd1e081
MD
882 }
883 }
884
885 /* If every buffer FD has hung up, we end the read loop here */
886 if (nb_fd > 0 && num_hup == nb_fd) {
887 DBG("every buffer FD has hung up\n");
888 if (consumer_quit == 1) {
889 goto end;
890 }
891 continue;
892 }
893
894 /* Take care of low priority channels. */
895 if (high_prio == 0) {
896 for (i = 0; i < nb_fd; i++) {
d41f73b7 897 if (pollfd[i].revents & POLLIN) {
3bd1e081 898 DBG("Normal read on fd %d", pollfd[i].fd);
d41f73b7 899 ret = ctx->on_buffer_ready(local_stream[i], ctx);
3bd1e081
MD
900 /* it's ok to have an unavailable subbuffer */
901 if (ret == EAGAIN) {
902 ret = 0;
903 }
904 }
905 }
906 }
907 }
908end:
909 DBG("polling thread exiting");
910 if (pollfd != NULL) {
911 free(pollfd);
912 pollfd = NULL;
913 }
914 if (local_stream != NULL) {
915 free(local_stream);
916 local_stream = NULL;
917 }
918 return NULL;
919}
920
921/*
922 * This thread listens on the consumerd socket and receives the file
923 * descriptors from the session daemon.
924 */
925void *lttng_consumer_thread_receive_fds(void *data)
926{
927 int sock, client_socket, ret;
928 /*
929 * structure to poll for incoming data on communication socket avoids
930 * making blocking sockets.
931 */
932 struct pollfd consumer_sockpoll[2];
933 struct lttng_consumer_local_data *ctx = data;
934
935 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
936 unlink(ctx->consumer_command_sock_path);
937 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
938 if (client_socket < 0) {
939 ERR("Cannot create command socket");
940 goto end;
941 }
942
943 ret = lttcomm_listen_unix_sock(client_socket);
944 if (ret < 0) {
945 goto end;
946 }
947
32258573 948 DBG("Sending ready command to lttng-sessiond");
3bd1e081
MD
949 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
950 /* return < 0 on error, but == 0 is not fatal */
951 if (ret < 0) {
32258573 952 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
953 goto end;
954 }
955
956 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
957 if (ret < 0) {
958 perror("fcntl O_NONBLOCK");
959 goto end;
960 }
961
962 /* prepare the FDs to poll : to client socket and the should_quit pipe */
963 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
964 consumer_sockpoll[0].events = POLLIN | POLLPRI;
965 consumer_sockpoll[1].fd = client_socket;
966 consumer_sockpoll[1].events = POLLIN | POLLPRI;
967
968 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
969 goto end;
970 }
971 DBG("Connection on client_socket");
972
973 /* Blocking call, waiting for transmission */
974 sock = lttcomm_accept_unix_sock(client_socket);
975 if (sock <= 0) {
976 WARN("On accept");
977 goto end;
978 }
979 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
980 if (ret < 0) {
981 perror("fcntl O_NONBLOCK");
982 goto end;
983 }
984
985 /* update the polling structure to poll on the established socket */
986 consumer_sockpoll[1].fd = sock;
987 consumer_sockpoll[1].events = POLLIN | POLLPRI;
988
989 while (1) {
990 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
991 goto end;
992 }
993 DBG("Incoming command on sock");
994 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
995 if (ret == -ENOENT) {
996 DBG("Received STOP command");
997 goto end;
998 }
999 if (ret < 0) {
1000 ERR("Communication interrupted on command socket");
1001 goto end;
1002 }
1003 if (consumer_quit) {
1004 DBG("consumer_thread_receive_fds received quit from signal");
1005 goto end;
1006 }
1007 DBG("received fds on sock");
1008 }
1009end:
1010 DBG("consumer_thread_receive_fds exiting");
1011
1012 /*
1013 * when all fds have hung up, the polling thread
1014 * can exit cleanly
1015 */
1016 consumer_quit = 1;
1017
1018 /*
1019 * 2s of grace period, if no polling events occur during
1020 * this period, the polling thread will exit even if there
1021 * are still open FDs (should not happen, but safety mechanism).
1022 */
1023 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1024
1025 /* wake up the polling thread */
1026 ret = write(ctx->consumer_poll_pipe[1], "4", 1);
1027 if (ret < 0) {
1028 perror("poll pipe write");
1029 }
1030 return NULL;
1031}
d41f73b7
MD
1032
1033int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
1034 struct lttng_consumer_local_data *ctx)
1035{
1036 switch (consumer_data.type) {
1037 case LTTNG_CONSUMER_KERNEL:
1038 return lttng_kconsumer_read_subbuffer(stream, ctx);
1039 case LTTNG_CONSUMER_UST:
1040 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1041 default:
1042 ERR("Unknown consumer_data type");
1043 assert(0);
1044 return -ENOSYS;
1045 }
1046}
1047
1048int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1049{
1050 switch (consumer_data.type) {
1051 case LTTNG_CONSUMER_KERNEL:
1052 return lttng_kconsumer_on_recv_stream(stream);
1053 case LTTNG_CONSUMER_UST:
1054 return lttng_ustconsumer_on_recv_stream(stream);
1055 default:
1056 ERR("Unknown consumer_data type");
1057 assert(0);
1058 return -ENOSYS;
1059 }
1060}
This page took 0.06282 seconds and 4 git commands to generate.