UST consumer: fix read on hangup, and UST get subbuf error handling
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
22#include <fcntl.h>
23#include <poll.h>
24#include <pthread.h>
25#include <stdlib.h>
26#include <string.h>
27#include <sys/mman.h>
28#include <sys/socket.h>
29#include <sys/types.h>
30#include <unistd.h>
31
32#include <lttng-kernel-ctl.h>
33#include <lttng-sessiond-comm.h>
34#include <lttng/lttng-consumer.h>
35#include <lttng/lttng-kconsumer.h>
36#include <lttng/lttng-ustconsumer.h>
37#include <lttngerr.h>
38
39struct lttng_consumer_global_data consumer_data = {
40 .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
41 .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
42 .stream_count = 0,
43 .need_update = 1,
44 .type = LTTNG_CONSUMER_UNKNOWN,
45};
46
47/* timeout parameter, to control the polling thread grace period. */
48int consumer_poll_timeout = -1;
49
50/*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56volatile int consumer_quit = 0;
57
58/*
59 * Find a stream. The consumer_data.lock must be locked during this
60 * call.
61 */
62static struct lttng_consumer_stream *consumer_find_stream(int key)
63{
64 struct lttng_consumer_stream *iter;
65
7ad0a0cb
MD
66 /* Negative keys are lookup failures */
67 if (key < 0)
68 return NULL;
3bd1e081
MD
69 cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
70 if (iter->key == key) {
71 DBG("Found stream key %d", key);
72 return iter;
73 }
74 }
75 return NULL;
76}
77
7ad0a0cb
MD
78static void consumer_steal_stream_key(int key)
79{
80 struct lttng_consumer_stream *stream;
81
82 stream = consumer_find_stream(key);
83 if (stream)
84 stream->key = -1;
85}
86
3bd1e081
MD
87static struct lttng_consumer_channel *consumer_find_channel(int key)
88{
89 struct lttng_consumer_channel *iter;
90
7ad0a0cb
MD
91 /* Negative keys are lookup failures */
92 if (key < 0)
93 return NULL;
3bd1e081
MD
94 cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
95 if (iter->key == key) {
96 DBG("Found channel key %d", key);
97 return iter;
98 }
99 }
100 return NULL;
101}
102
7ad0a0cb
MD
103static void consumer_steal_channel_key(int key)
104{
105 struct lttng_consumer_channel *channel;
106
107 channel = consumer_find_channel(key);
108 if (channel)
109 channel->key = -1;
110}
111
3bd1e081
MD
112/*
113 * Remove a stream from the global list protected by a mutex. This
114 * function is also responsible for freeing its data structures.
115 */
116void consumer_del_stream(struct lttng_consumer_stream *stream)
117{
118 int ret;
119 struct lttng_consumer_channel *free_chan = NULL;
120
121 pthread_mutex_lock(&consumer_data.lock);
122
123 switch (consumer_data.type) {
124 case LTTNG_CONSUMER_KERNEL:
125 if (stream->mmap_base != NULL) {
126 ret = munmap(stream->mmap_base, stream->mmap_len);
127 if (ret != 0) {
128 perror("munmap");
129 }
130 }
131 break;
132 case LTTNG_CONSUMER_UST:
133 lttng_ustconsumer_del_stream(stream);
134 break;
135 default:
136 ERR("Unknown consumer_data type");
137 assert(0);
138 goto end;
139 }
140
141 cds_list_del(&stream->list);
142 if (consumer_data.stream_count <= 0) {
143 goto end;
144 }
145 consumer_data.stream_count--;
146 if (!stream) {
147 goto end;
148 }
149 if (stream->out_fd >= 0) {
150 close(stream->out_fd);
151 }
b5c5fc29 152 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
3bd1e081
MD
153 close(stream->wait_fd);
154 }
b5c5fc29
MD
155 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd
156 && !stream->shm_fd_is_copy) {
3bd1e081
MD
157 close(stream->shm_fd);
158 }
159 if (!--stream->chan->refcount)
160 free_chan = stream->chan;
161 free(stream);
162end:
163 consumer_data.need_update = 1;
164 pthread_mutex_unlock(&consumer_data.lock);
165
166 if (free_chan)
167 consumer_del_channel(free_chan);
168}
169
170struct lttng_consumer_stream *consumer_allocate_stream(
171 int channel_key, int stream_key,
172 int shm_fd, int wait_fd,
173 enum lttng_consumer_stream_state state,
174 uint64_t mmap_len,
175 enum lttng_event_output output,
176 const char *path_name)
177{
178 struct lttng_consumer_stream *stream;
179 int ret;
180
effcf122 181 stream = zmalloc(sizeof(*stream));
3bd1e081
MD
182 if (stream == NULL) {
183 perror("malloc struct lttng_consumer_stream");
184 goto end;
185 }
186 stream->chan = consumer_find_channel(channel_key);
187 if (!stream->chan) {
188 perror("Unable to find channel key");
189 goto end;
190 }
191 stream->chan->refcount++;
192 stream->key = stream_key;
193 stream->shm_fd = shm_fd;
194 stream->wait_fd = wait_fd;
195 stream->out_fd = -1;
196 stream->out_fd_offset = 0;
197 stream->state = state;
198 stream->mmap_len = mmap_len;
199 stream->mmap_base = NULL;
200 stream->output = output;
201 strncpy(stream->path_name, path_name, PATH_MAX - 1);
202 stream->path_name[PATH_MAX - 1] = '\0';
203
204 switch (consumer_data.type) {
205 case LTTNG_CONSUMER_KERNEL:
206 break;
207 case LTTNG_CONSUMER_UST:
5af2f756 208 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
209 ret = lttng_ustconsumer_allocate_stream(stream);
210 if (ret) {
211 free(stream);
212 return NULL;
213 }
214 break;
215 default:
216 ERR("Unknown consumer_data type");
217 assert(0);
218 goto end;
219 }
220 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
221 stream->path_name, stream->key,
222 stream->shm_fd,
223 stream->wait_fd,
224 (unsigned long long) stream->mmap_len,
225 stream->out_fd);
226end:
227 return stream;
228}
229
230/*
231 * Add a stream to the global list protected by a mutex.
232 */
233int consumer_add_stream(struct lttng_consumer_stream *stream)
234{
235 int ret = 0;
236
237 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
238 /* Steal stream identifier, for UST */
239 consumer_steal_stream_key(stream->key);
3bd1e081
MD
240 cds_list_add(&stream->list, &consumer_data.stream_list.head);
241 consumer_data.stream_count++;
242 consumer_data.need_update = 1;
243
244 switch (consumer_data.type) {
245 case LTTNG_CONSUMER_KERNEL:
246 break;
247 case LTTNG_CONSUMER_UST:
248 /* Streams are in CPU number order (we rely on this) */
249 stream->cpu = stream->chan->nr_streams++;
250 break;
251 default:
252 ERR("Unknown consumer_data type");
253 assert(0);
254 goto end;
255 }
256
257end:
258 pthread_mutex_unlock(&consumer_data.lock);
259 return ret;
260}
261
262/*
263 * Update a stream according to what we just received.
264 */
265void consumer_change_stream_state(int stream_key,
266 enum lttng_consumer_stream_state state)
267{
268 struct lttng_consumer_stream *stream;
269
270 pthread_mutex_lock(&consumer_data.lock);
271 stream = consumer_find_stream(stream_key);
272 if (stream) {
273 stream->state = state;
274 }
275 consumer_data.need_update = 1;
276 pthread_mutex_unlock(&consumer_data.lock);
277}
278
279/*
280 * Remove a channel from the global list protected by a mutex. This
281 * function is also responsible for freeing its data structures.
282 */
283void consumer_del_channel(struct lttng_consumer_channel *channel)
284{
285 int ret;
286
287 pthread_mutex_lock(&consumer_data.lock);
288
289 switch (consumer_data.type) {
290 case LTTNG_CONSUMER_KERNEL:
291 break;
292 case LTTNG_CONSUMER_UST:
293 lttng_ustconsumer_del_channel(channel);
294 break;
295 default:
296 ERR("Unknown consumer_data type");
297 assert(0);
298 goto end;
299 }
300
301 cds_list_del(&channel->list);
302 if (channel->mmap_base != NULL) {
303 ret = munmap(channel->mmap_base, channel->mmap_len);
304 if (ret != 0) {
305 perror("munmap");
306 }
307 }
b5c5fc29 308 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
3bd1e081
MD
309 close(channel->wait_fd);
310 }
b5c5fc29
MD
311 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd
312 && !channel->shm_fd_is_copy) {
3bd1e081
MD
313 close(channel->shm_fd);
314 }
315 free(channel);
316end:
317 pthread_mutex_unlock(&consumer_data.lock);
318}
319
320struct lttng_consumer_channel *consumer_allocate_channel(
321 int channel_key,
322 int shm_fd, int wait_fd,
323 uint64_t mmap_len,
324 uint64_t max_sb_size)
325{
326 struct lttng_consumer_channel *channel;
327 int ret;
328
276b26d1 329 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
330 if (channel == NULL) {
331 perror("malloc struct lttng_consumer_channel");
332 goto end;
333 }
334 channel->key = channel_key;
335 channel->shm_fd = shm_fd;
336 channel->wait_fd = wait_fd;
337 channel->mmap_len = mmap_len;
338 channel->max_sb_size = max_sb_size;
339 channel->refcount = 0;
340 channel->nr_streams = 0;
341
342 switch (consumer_data.type) {
343 case LTTNG_CONSUMER_KERNEL:
344 channel->mmap_base = NULL;
345 channel->mmap_len = 0;
346 break;
347 case LTTNG_CONSUMER_UST:
348 ret = lttng_ustconsumer_allocate_channel(channel);
349 if (ret) {
350 free(channel);
351 return NULL;
352 }
353 break;
354 default:
355 ERR("Unknown consumer_data type");
356 assert(0);
357 goto end;
358 }
359 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
360 channel->key,
361 channel->shm_fd,
362 channel->wait_fd,
363 (unsigned long long) channel->mmap_len,
364 (unsigned long long) channel->max_sb_size);
365end:
366 return channel;
367}
368
369/*
370 * Add a channel to the global list protected by a mutex.
371 */
372int consumer_add_channel(struct lttng_consumer_channel *channel)
373{
3bd1e081 374 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
375 /* Steal channel identifier, for UST */
376 consumer_steal_channel_key(channel->key);
3bd1e081 377 cds_list_add(&channel->list, &consumer_data.channel_list.head);
3bd1e081 378 pthread_mutex_unlock(&consumer_data.lock);
7ad0a0cb 379 return 0;
3bd1e081
MD
380}
381
382/*
383 * Allocate the pollfd structure and the local view of the out fds to avoid
384 * doing a lookup in the linked list and concurrency issues when writing is
385 * needed. Called with consumer_data.lock held.
386 *
387 * Returns the number of fds in the structures.
388 */
389int consumer_update_poll_array(
390 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
391 struct lttng_consumer_stream **local_stream)
392{
393 struct lttng_consumer_stream *iter;
394 int i = 0;
395
396 DBG("Updating poll fd array");
397 cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
398 if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
399 continue;
400 }
401 DBG("Active FD %d", iter->wait_fd);
402 (*pollfd)[i].fd = iter->wait_fd;
403 (*pollfd)[i].events = POLLIN | POLLPRI;
404 local_stream[i] = iter;
405 i++;
406 }
407
408 /*
409 * Insert the consumer_poll_pipe at the end of the array and don't
410 * increment i so nb_fd is the number of real FD.
411 */
412 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
413 (*pollfd)[i].events = POLLIN;
414 return i;
415}
416
417/*
418 * Poll on the should_quit pipe and the command socket return -1 on error and
419 * should exit, 0 if data is available on the command socket
420 */
421int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
422{
423 int num_rdy;
424
425 num_rdy = poll(consumer_sockpoll, 2, -1);
426 if (num_rdy == -1) {
427 perror("Poll error");
428 goto exit;
429 }
430 if (consumer_sockpoll[0].revents == POLLIN) {
431 DBG("consumer_should_quit wake up");
432 goto exit;
433 }
434 return 0;
435
436exit:
437 return -1;
438}
439
440/*
441 * Set the error socket.
442 */
443void lttng_consumer_set_error_sock(
444 struct lttng_consumer_local_data *ctx, int sock)
445{
446 ctx->consumer_error_socket = sock;
447}
448
449/*
450 * Set the command socket path.
451 */
452
453void lttng_consumer_set_command_sock_path(
454 struct lttng_consumer_local_data *ctx, char *sock)
455{
456 ctx->consumer_command_sock_path = sock;
457}
458
459/*
460 * Send return code to the session daemon.
461 * If the socket is not defined, we return 0, it is not a fatal error
462 */
463int lttng_consumer_send_error(
464 struct lttng_consumer_local_data *ctx, int cmd)
465{
466 if (ctx->consumer_error_socket > 0) {
467 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
468 sizeof(enum lttcomm_sessiond_command));
469 }
470
471 return 0;
472}
473
474/*
475 * Close all the tracefiles and stream fds, should be called when all instances
476 * are destroyed.
477 */
478void lttng_consumer_cleanup(void)
479{
480 struct lttng_consumer_stream *iter, *tmp;
481 struct lttng_consumer_channel *citer, *ctmp;
482
483 /*
484 * close all outfd. Called when there are no more threads
485 * running (after joining on the threads), no need to protect
486 * list iteration with mutex.
487 */
488 cds_list_for_each_entry_safe(iter, tmp,
489 &consumer_data.stream_list.head, list) {
490 consumer_del_stream(iter);
491 }
492 cds_list_for_each_entry_safe(citer, ctmp,
493 &consumer_data.channel_list.head, list) {
494 consumer_del_channel(citer);
495 }
496}
497
498/*
499 * Called from signal handler.
500 */
501void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
502{
503 int ret;
504 consumer_quit = 1;
505 ret = write(ctx->consumer_should_quit[1], "4", 1);
506 if (ret < 0) {
507 perror("write consumer quit");
508 }
509}
510
511void lttng_consumer_sync_trace_file(
512 struct lttng_consumer_stream *stream, off_t orig_offset)
513{
514 int outfd = stream->out_fd;
515
516 /*
517 * This does a blocking write-and-wait on any page that belongs to the
518 * subbuffer prior to the one we just wrote.
519 * Don't care about error values, as these are just hints and ways to
520 * limit the amount of page cache used.
521 */
522 if (orig_offset < stream->chan->max_sb_size) {
523 return;
524 }
525 sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
526 stream->chan->max_sb_size,
527 SYNC_FILE_RANGE_WAIT_BEFORE
528 | SYNC_FILE_RANGE_WRITE
529 | SYNC_FILE_RANGE_WAIT_AFTER);
530 /*
531 * Give hints to the kernel about how we access the file:
532 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
533 * we write it.
534 *
535 * We need to call fadvise again after the file grows because the
536 * kernel does not seem to apply fadvise to non-existing parts of the
537 * file.
538 *
539 * Call fadvise _after_ having waited for the page writeback to
540 * complete because the dirty page writeback semantic is not well
541 * defined. So it can be expected to lead to lower throughput in
542 * streaming.
543 */
544 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
545 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
546}
547
548/*
549 * Initialise the necessary environnement :
550 * - create a new context
551 * - create the poll_pipe
552 * - create the should_quit pipe (for signal handler)
553 * - create the thread pipe (for splice)
554 *
555 * Takes a function pointer as argument, this function is called when data is
556 * available on a buffer. This function is responsible to do the
557 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
558 * buffer configuration and then kernctl_put_next_subbuf at the end.
559 *
560 * Returns a pointer to the new context or NULL on error.
561 */
562struct lttng_consumer_local_data *lttng_consumer_create(
563 enum lttng_consumer_type type,
d41f73b7
MD
564 int (*buffer_ready)(struct lttng_consumer_stream *stream,
565 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
566 int (*recv_channel)(struct lttng_consumer_channel *channel),
567 int (*recv_stream)(struct lttng_consumer_stream *stream),
568 int (*update_stream)(int stream_key, uint32_t state))
569{
570 int ret, i;
571 struct lttng_consumer_local_data *ctx;
572
573 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
574 consumer_data.type == type);
575 consumer_data.type = type;
576
effcf122 577 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081
MD
578 if (ctx == NULL) {
579 perror("allocating context");
580 goto error;
581 }
582
583 ctx->consumer_error_socket = -1;
584 /* assign the callbacks */
585 ctx->on_buffer_ready = buffer_ready;
586 ctx->on_recv_channel = recv_channel;
587 ctx->on_recv_stream = recv_stream;
588 ctx->on_update_stream = update_stream;
589
590 ret = pipe(ctx->consumer_poll_pipe);
591 if (ret < 0) {
592 perror("Error creating poll pipe");
593 goto error_poll_pipe;
594 }
595
596 ret = pipe(ctx->consumer_should_quit);
597 if (ret < 0) {
598 perror("Error creating recv pipe");
599 goto error_quit_pipe;
600 }
601
602 ret = pipe(ctx->consumer_thread_pipe);
603 if (ret < 0) {
604 perror("Error creating thread pipe");
605 goto error_thread_pipe;
606 }
607
608 return ctx;
609
610
611error_thread_pipe:
612 for (i = 0; i < 2; i++) {
613 int err;
614
615 err = close(ctx->consumer_should_quit[i]);
616 assert(!err);
617 }
618error_quit_pipe:
619 for (i = 0; i < 2; i++) {
620 int err;
621
622 err = close(ctx->consumer_poll_pipe[i]);
623 assert(!err);
624 }
625error_poll_pipe:
626 free(ctx);
627error:
628 return NULL;
629}
630
631/*
632 * Close all fds associated with the instance and free the context.
633 */
634void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
635{
636 close(ctx->consumer_error_socket);
637 close(ctx->consumer_thread_pipe[0]);
638 close(ctx->consumer_thread_pipe[1]);
639 close(ctx->consumer_poll_pipe[0]);
640 close(ctx->consumer_poll_pipe[1]);
641 close(ctx->consumer_should_quit[0]);
642 close(ctx->consumer_should_quit[1]);
643 unlink(ctx->consumer_command_sock_path);
644 free(ctx);
645}
646
647/*
648 * Mmap the ring buffer, read it and write the data to the tracefile.
649 *
650 * Returns the number of bytes written
651 */
652int lttng_consumer_on_read_subbuffer_mmap(
653 struct lttng_consumer_local_data *ctx,
654 struct lttng_consumer_stream *stream, unsigned long len)
655{
656 switch (consumer_data.type) {
657 case LTTNG_CONSUMER_KERNEL:
658 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
659 case LTTNG_CONSUMER_UST:
660 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
661 default:
662 ERR("Unknown consumer_data type");
663 assert(0);
664 }
665}
666
667/*
668 * Splice the data from the ring buffer to the tracefile.
669 *
670 * Returns the number of bytes spliced.
671 */
672int lttng_consumer_on_read_subbuffer_splice(
673 struct lttng_consumer_local_data *ctx,
674 struct lttng_consumer_stream *stream, unsigned long len)
675{
676 switch (consumer_data.type) {
677 case LTTNG_CONSUMER_KERNEL:
678 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
679 case LTTNG_CONSUMER_UST:
680 return -ENOSYS;
681 default:
682 ERR("Unknown consumer_data type");
683 assert(0);
684 return -ENOSYS;
685 }
686
687}
688
689/*
690 * Take a snapshot for a specific fd
691 *
692 * Returns 0 on success, < 0 on error
693 */
694int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
695 struct lttng_consumer_stream *stream)
696{
697 switch (consumer_data.type) {
698 case LTTNG_CONSUMER_KERNEL:
699 return lttng_kconsumer_take_snapshot(ctx, stream);
700 case LTTNG_CONSUMER_UST:
701 return lttng_ustconsumer_take_snapshot(ctx, stream);
702 default:
703 ERR("Unknown consumer_data type");
704 assert(0);
705 return -ENOSYS;
706 }
707
708}
709
710/*
711 * Get the produced position
712 *
713 * Returns 0 on success, < 0 on error
714 */
715int lttng_consumer_get_produced_snapshot(
716 struct lttng_consumer_local_data *ctx,
717 struct lttng_consumer_stream *stream,
718 unsigned long *pos)
719{
720 switch (consumer_data.type) {
721 case LTTNG_CONSUMER_KERNEL:
722 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
723 case LTTNG_CONSUMER_UST:
724 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
725 default:
726 ERR("Unknown consumer_data type");
727 assert(0);
728 return -ENOSYS;
729 }
730}
731
732int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
733 int sock, struct pollfd *consumer_sockpoll)
734{
735 switch (consumer_data.type) {
736 case LTTNG_CONSUMER_KERNEL:
737 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
738 case LTTNG_CONSUMER_UST:
739 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
740 default:
741 ERR("Unknown consumer_data type");
742 assert(0);
743 return -ENOSYS;
744 }
745}
746
747/*
748 * This thread polls the fds in the ltt_fd_list to consume the data and write
749 * it to tracefile if necessary.
750 */
751void *lttng_consumer_thread_poll_fds(void *data)
752{
753 int num_rdy, num_hup, high_prio, ret, i;
754 struct pollfd *pollfd = NULL;
755 /* local view of the streams */
756 struct lttng_consumer_stream **local_stream = NULL;
757 /* local view of consumer_data.fds_count */
758 int nb_fd = 0;
759 char tmp;
760 int tmp2;
761 struct lttng_consumer_local_data *ctx = data;
762
effcf122 763 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
764
765 while (1) {
766 high_prio = 0;
767 num_hup = 0;
768
769 /*
770 * the ltt_fd_list has been updated, we need to update our
771 * local array as well
772 */
773 pthread_mutex_lock(&consumer_data.lock);
774 if (consumer_data.need_update) {
775 if (pollfd != NULL) {
776 free(pollfd);
777 pollfd = NULL;
778 }
779 if (local_stream != NULL) {
780 free(local_stream);
781 local_stream = NULL;
782 }
783
784 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 785 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081
MD
786 if (pollfd == NULL) {
787 perror("pollfd malloc");
788 pthread_mutex_unlock(&consumer_data.lock);
789 goto end;
790 }
791
792 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 793 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
794 sizeof(struct lttng_consumer_stream));
795 if (local_stream == NULL) {
796 perror("local_stream malloc");
797 pthread_mutex_unlock(&consumer_data.lock);
798 goto end;
799 }
800 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
801 if (ret < 0) {
802 ERR("Error in allocating pollfd or local_outfds");
803 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
804 pthread_mutex_unlock(&consumer_data.lock);
805 goto end;
806 }
807 nb_fd = ret;
808 consumer_data.need_update = 0;
809 }
810 pthread_mutex_unlock(&consumer_data.lock);
811
812 /* poll on the array of fds */
813 DBG("polling on %d fd", nb_fd + 1);
814 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
815 DBG("poll num_rdy : %d", num_rdy);
816 if (num_rdy == -1) {
817 perror("Poll error");
818 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
819 goto end;
820 } else if (num_rdy == 0) {
821 DBG("Polling thread timed out");
822 goto end;
823 }
824
d41f73b7 825 /* No FDs and consumer_quit, consumer_cleanup the thread */
3bd1e081
MD
826 if (nb_fd == 0 && consumer_quit == 1) {
827 goto end;
828 }
829
830 /*
831 * If the consumer_poll_pipe triggered poll go
832 * directly to the beginning of the loop to update the
833 * array. We want to prioritize array update over
834 * low-priority reads.
835 */
d41f73b7 836 if (pollfd[nb_fd].revents & POLLIN) {
3bd1e081
MD
837 DBG("consumer_poll_pipe wake up");
838 tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
839 if (tmp2 < 0) {
d41f73b7 840 perror("read consumer poll");
3bd1e081
MD
841 }
842 continue;
843 }
844
845 /* Take care of high priority channels first. */
846 for (i = 0; i < nb_fd; i++) {
d41f73b7
MD
847 if (pollfd[i].revents & POLLPRI) {
848 DBG("Urgent read on fd %d", pollfd[i].fd);
849 high_prio = 1;
850 ret = ctx->on_buffer_ready(local_stream[i], ctx);
851 /* it's ok to have an unavailable sub-buffer */
852 if (ret == EAGAIN) {
853 ret = 0;
854 }
855 } else if (pollfd[i].revents & POLLERR) {
3bd1e081
MD
856 ERR("Error returned in polling fd %d.", pollfd[i].fd);
857 consumer_del_stream(local_stream[i]);
858 num_hup++;
d41f73b7
MD
859 } else if (pollfd[i].revents & POLLNVAL) {
860 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
3bd1e081
MD
861 consumer_del_stream(local_stream[i]);
862 num_hup++;
1c3c14ac
MD
863 } else if ((pollfd[i].revents & POLLHUP) &&
864 !(pollfd[i].revents & POLLIN)) {
d056b477
MD
865 if (consumer_data.type == LTTNG_CONSUMER_UST) {
866 DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
867 pollfd[i].fd);
868 if (!local_stream[i]->hangup_flush_done) {
869 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
effcf122
MD
870 /* read after flush */
871 do {
872 ret = ctx->on_buffer_ready(local_stream[i], ctx);
873 } while (ret == EAGAIN);
d056b477
MD
874 }
875 } else {
876 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
877 }
3bd1e081
MD
878 consumer_del_stream(local_stream[i]);
879 num_hup++;
3bd1e081
MD
880 }
881 }
882
883 /* If every buffer FD has hung up, we end the read loop here */
884 if (nb_fd > 0 && num_hup == nb_fd) {
885 DBG("every buffer FD has hung up\n");
886 if (consumer_quit == 1) {
887 goto end;
888 }
889 continue;
890 }
891
892 /* Take care of low priority channels. */
893 if (high_prio == 0) {
894 for (i = 0; i < nb_fd; i++) {
d41f73b7 895 if (pollfd[i].revents & POLLIN) {
3bd1e081 896 DBG("Normal read on fd %d", pollfd[i].fd);
d41f73b7 897 ret = ctx->on_buffer_ready(local_stream[i], ctx);
3bd1e081
MD
898 /* it's ok to have an unavailable subbuffer */
899 if (ret == EAGAIN) {
900 ret = 0;
901 }
902 }
903 }
904 }
905 }
906end:
907 DBG("polling thread exiting");
908 if (pollfd != NULL) {
909 free(pollfd);
910 pollfd = NULL;
911 }
912 if (local_stream != NULL) {
913 free(local_stream);
914 local_stream = NULL;
915 }
916 return NULL;
917}
918
919/*
920 * This thread listens on the consumerd socket and receives the file
921 * descriptors from the session daemon.
922 */
923void *lttng_consumer_thread_receive_fds(void *data)
924{
925 int sock, client_socket, ret;
926 /*
927 * structure to poll for incoming data on communication socket avoids
928 * making blocking sockets.
929 */
930 struct pollfd consumer_sockpoll[2];
931 struct lttng_consumer_local_data *ctx = data;
932
933 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
934 unlink(ctx->consumer_command_sock_path);
935 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
936 if (client_socket < 0) {
937 ERR("Cannot create command socket");
938 goto end;
939 }
940
941 ret = lttcomm_listen_unix_sock(client_socket);
942 if (ret < 0) {
943 goto end;
944 }
945
32258573 946 DBG("Sending ready command to lttng-sessiond");
3bd1e081
MD
947 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
948 /* return < 0 on error, but == 0 is not fatal */
949 if (ret < 0) {
32258573 950 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
951 goto end;
952 }
953
954 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
955 if (ret < 0) {
956 perror("fcntl O_NONBLOCK");
957 goto end;
958 }
959
960 /* prepare the FDs to poll : to client socket and the should_quit pipe */
961 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
962 consumer_sockpoll[0].events = POLLIN | POLLPRI;
963 consumer_sockpoll[1].fd = client_socket;
964 consumer_sockpoll[1].events = POLLIN | POLLPRI;
965
966 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
967 goto end;
968 }
969 DBG("Connection on client_socket");
970
971 /* Blocking call, waiting for transmission */
972 sock = lttcomm_accept_unix_sock(client_socket);
973 if (sock <= 0) {
974 WARN("On accept");
975 goto end;
976 }
977 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
978 if (ret < 0) {
979 perror("fcntl O_NONBLOCK");
980 goto end;
981 }
982
983 /* update the polling structure to poll on the established socket */
984 consumer_sockpoll[1].fd = sock;
985 consumer_sockpoll[1].events = POLLIN | POLLPRI;
986
987 while (1) {
988 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
989 goto end;
990 }
991 DBG("Incoming command on sock");
992 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
993 if (ret == -ENOENT) {
994 DBG("Received STOP command");
995 goto end;
996 }
997 if (ret < 0) {
998 ERR("Communication interrupted on command socket");
999 goto end;
1000 }
1001 if (consumer_quit) {
1002 DBG("consumer_thread_receive_fds received quit from signal");
1003 goto end;
1004 }
1005 DBG("received fds on sock");
1006 }
1007end:
1008 DBG("consumer_thread_receive_fds exiting");
1009
1010 /*
1011 * when all fds have hung up, the polling thread
1012 * can exit cleanly
1013 */
1014 consumer_quit = 1;
1015
1016 /*
1017 * 2s of grace period, if no polling events occur during
1018 * this period, the polling thread will exit even if there
1019 * are still open FDs (should not happen, but safety mechanism).
1020 */
1021 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1022
1023 /* wake up the polling thread */
1024 ret = write(ctx->consumer_poll_pipe[1], "4", 1);
1025 if (ret < 0) {
1026 perror("poll pipe write");
1027 }
1028 return NULL;
1029}
d41f73b7
MD
1030
1031int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
1032 struct lttng_consumer_local_data *ctx)
1033{
1034 switch (consumer_data.type) {
1035 case LTTNG_CONSUMER_KERNEL:
1036 return lttng_kconsumer_read_subbuffer(stream, ctx);
1037 case LTTNG_CONSUMER_UST:
1038 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1039 default:
1040 ERR("Unknown consumer_data type");
1041 assert(0);
1042 return -ENOSYS;
1043 }
1044}
1045
1046int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1047{
1048 switch (consumer_data.type) {
1049 case LTTNG_CONSUMER_KERNEL:
1050 return lttng_kconsumer_on_recv_stream(stream);
1051 case LTTNG_CONSUMER_UST:
1052 return lttng_ustconsumer_on_recv_stream(stream);
1053 default:
1054 ERR("Unknown consumer_data type");
1055 assert(0);
1056 return -ENOSYS;
1057 }
1058}
This page took 0.063842 seconds and 4 git commands to generate.