893df720882b7afd6798e789ca7fd90c967836fd
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <fcntl.h>
23 #include <poll.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/mman.h>
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 #include <unistd.h>
31
32 #include <lttng-kernel-ctl.h>
33 #include <lttng-sessiond-comm.h>
34 #include <lttng/lttng-consumer.h>
35 #include <lttng/lttng-kconsumer.h>
36 #include <lttng/lttng-ustconsumer.h>
37 #include <lttngerr.h>
38
39 struct lttng_consumer_global_data consumer_data = {
40 .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
41 .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
42 .stream_count = 0,
43 .need_update = 1,
44 .type = LTTNG_CONSUMER_UNKNOWN,
45 };
46
47 /* timeout parameter, to control the polling thread grace period. */
48 int consumer_poll_timeout = -1;
49
50 /*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56 volatile int consumer_quit = 0;
57
58 /*
59 * Find a stream. The consumer_data.lock must be locked during this
60 * call.
61 */
62 static struct lttng_consumer_stream *consumer_find_stream(int key)
63 {
64 struct lttng_consumer_stream *iter;
65
66 /* Negative keys are lookup failures */
67 if (key < 0)
68 return NULL;
69 cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
70 if (iter->key == key) {
71 DBG("Found stream key %d", key);
72 return iter;
73 }
74 }
75 return NULL;
76 }
77
78 static void consumer_steal_stream_key(int key)
79 {
80 struct lttng_consumer_stream *stream;
81
82 stream = consumer_find_stream(key);
83 if (stream)
84 stream->key = -1;
85 }
86
87 static struct lttng_consumer_channel *consumer_find_channel(int key)
88 {
89 struct lttng_consumer_channel *iter;
90
91 /* Negative keys are lookup failures */
92 if (key < 0)
93 return NULL;
94 cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
95 if (iter->key == key) {
96 DBG("Found channel key %d", key);
97 return iter;
98 }
99 }
100 return NULL;
101 }
102
103 static void consumer_steal_channel_key(int key)
104 {
105 struct lttng_consumer_channel *channel;
106
107 channel = consumer_find_channel(key);
108 if (channel)
109 channel->key = -1;
110 }
111
112 /*
113 * Remove a stream from the global list protected by a mutex. This
114 * function is also responsible for freeing its data structures.
115 */
116 void consumer_del_stream(struct lttng_consumer_stream *stream)
117 {
118 int ret;
119 struct lttng_consumer_channel *free_chan = NULL;
120
121 pthread_mutex_lock(&consumer_data.lock);
122
123 switch (consumer_data.type) {
124 case LTTNG_CONSUMER_KERNEL:
125 if (stream->mmap_base != NULL) {
126 ret = munmap(stream->mmap_base, stream->mmap_len);
127 if (ret != 0) {
128 perror("munmap");
129 }
130 }
131 break;
132 case LTTNG_CONSUMER32_UST:
133 case LTTNG_CONSUMER64_UST:
134 lttng_ustconsumer_del_stream(stream);
135 break;
136 default:
137 ERR("Unknown consumer_data type");
138 assert(0);
139 goto end;
140 }
141
142 cds_list_del(&stream->list);
143 if (consumer_data.stream_count <= 0) {
144 goto end;
145 }
146 consumer_data.stream_count--;
147 if (!stream) {
148 goto end;
149 }
150 if (stream->out_fd >= 0) {
151 close(stream->out_fd);
152 }
153 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
154 close(stream->wait_fd);
155 }
156 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd
157 && !stream->shm_fd_is_copy) {
158 close(stream->shm_fd);
159 }
160 if (!--stream->chan->refcount)
161 free_chan = stream->chan;
162 free(stream);
163 end:
164 consumer_data.need_update = 1;
165 pthread_mutex_unlock(&consumer_data.lock);
166
167 if (free_chan)
168 consumer_del_channel(free_chan);
169 }
170
171 struct lttng_consumer_stream *consumer_allocate_stream(
172 int channel_key, int stream_key,
173 int shm_fd, int wait_fd,
174 enum lttng_consumer_stream_state state,
175 uint64_t mmap_len,
176 enum lttng_event_output output,
177 const char *path_name)
178 {
179 struct lttng_consumer_stream *stream;
180 int ret;
181
182 stream = zmalloc(sizeof(*stream));
183 if (stream == NULL) {
184 perror("malloc struct lttng_consumer_stream");
185 goto end;
186 }
187 stream->chan = consumer_find_channel(channel_key);
188 if (!stream->chan) {
189 perror("Unable to find channel key");
190 goto end;
191 }
192 stream->chan->refcount++;
193 stream->key = stream_key;
194 stream->shm_fd = shm_fd;
195 stream->wait_fd = wait_fd;
196 stream->out_fd = -1;
197 stream->out_fd_offset = 0;
198 stream->state = state;
199 stream->mmap_len = mmap_len;
200 stream->mmap_base = NULL;
201 stream->output = output;
202 strncpy(stream->path_name, path_name, PATH_MAX - 1);
203 stream->path_name[PATH_MAX - 1] = '\0';
204
205 switch (consumer_data.type) {
206 case LTTNG_CONSUMER_KERNEL:
207 break;
208 case LTTNG_CONSUMER32_UST:
209 case LTTNG_CONSUMER64_UST:
210 stream->cpu = stream->chan->cpucount++;
211 ret = lttng_ustconsumer_allocate_stream(stream);
212 if (ret) {
213 free(stream);
214 return NULL;
215 }
216 break;
217 default:
218 ERR("Unknown consumer_data type");
219 assert(0);
220 goto end;
221 }
222 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
223 stream->path_name, stream->key,
224 stream->shm_fd,
225 stream->wait_fd,
226 (unsigned long long) stream->mmap_len,
227 stream->out_fd);
228 end:
229 return stream;
230 }
231
232 /*
233 * Add a stream to the global list protected by a mutex.
234 */
235 int consumer_add_stream(struct lttng_consumer_stream *stream)
236 {
237 int ret = 0;
238
239 pthread_mutex_lock(&consumer_data.lock);
240 /* Steal stream identifier, for UST */
241 consumer_steal_stream_key(stream->key);
242 cds_list_add(&stream->list, &consumer_data.stream_list.head);
243 consumer_data.stream_count++;
244 consumer_data.need_update = 1;
245
246 switch (consumer_data.type) {
247 case LTTNG_CONSUMER_KERNEL:
248 break;
249 case LTTNG_CONSUMER32_UST:
250 case LTTNG_CONSUMER64_UST:
251 /* Streams are in CPU number order (we rely on this) */
252 stream->cpu = stream->chan->nr_streams++;
253 break;
254 default:
255 ERR("Unknown consumer_data type");
256 assert(0);
257 goto end;
258 }
259
260 end:
261 pthread_mutex_unlock(&consumer_data.lock);
262 return ret;
263 }
264
265 /*
266 * Update a stream according to what we just received.
267 */
268 void consumer_change_stream_state(int stream_key,
269 enum lttng_consumer_stream_state state)
270 {
271 struct lttng_consumer_stream *stream;
272
273 pthread_mutex_lock(&consumer_data.lock);
274 stream = consumer_find_stream(stream_key);
275 if (stream) {
276 stream->state = state;
277 }
278 consumer_data.need_update = 1;
279 pthread_mutex_unlock(&consumer_data.lock);
280 }
281
282 /*
283 * Remove a channel from the global list protected by a mutex. This
284 * function is also responsible for freeing its data structures.
285 */
286 void consumer_del_channel(struct lttng_consumer_channel *channel)
287 {
288 int ret;
289
290 pthread_mutex_lock(&consumer_data.lock);
291
292 switch (consumer_data.type) {
293 case LTTNG_CONSUMER_KERNEL:
294 break;
295 case LTTNG_CONSUMER32_UST:
296 case LTTNG_CONSUMER64_UST:
297 lttng_ustconsumer_del_channel(channel);
298 break;
299 default:
300 ERR("Unknown consumer_data type");
301 assert(0);
302 goto end;
303 }
304
305 cds_list_del(&channel->list);
306 if (channel->mmap_base != NULL) {
307 ret = munmap(channel->mmap_base, channel->mmap_len);
308 if (ret != 0) {
309 perror("munmap");
310 }
311 }
312 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
313 close(channel->wait_fd);
314 }
315 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd
316 && !channel->shm_fd_is_copy) {
317 close(channel->shm_fd);
318 }
319 free(channel);
320 end:
321 pthread_mutex_unlock(&consumer_data.lock);
322 }
323
324 struct lttng_consumer_channel *consumer_allocate_channel(
325 int channel_key,
326 int shm_fd, int wait_fd,
327 uint64_t mmap_len,
328 uint64_t max_sb_size)
329 {
330 struct lttng_consumer_channel *channel;
331 int ret;
332
333 channel = zmalloc(sizeof(*channel));
334 if (channel == NULL) {
335 perror("malloc struct lttng_consumer_channel");
336 goto end;
337 }
338 channel->key = channel_key;
339 channel->shm_fd = shm_fd;
340 channel->wait_fd = wait_fd;
341 channel->mmap_len = mmap_len;
342 channel->max_sb_size = max_sb_size;
343 channel->refcount = 0;
344 channel->nr_streams = 0;
345
346 switch (consumer_data.type) {
347 case LTTNG_CONSUMER_KERNEL:
348 channel->mmap_base = NULL;
349 channel->mmap_len = 0;
350 break;
351 case LTTNG_CONSUMER32_UST:
352 case LTTNG_CONSUMER64_UST:
353 ret = lttng_ustconsumer_allocate_channel(channel);
354 if (ret) {
355 free(channel);
356 return NULL;
357 }
358 break;
359 default:
360 ERR("Unknown consumer_data type");
361 assert(0);
362 goto end;
363 }
364 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
365 channel->key,
366 channel->shm_fd,
367 channel->wait_fd,
368 (unsigned long long) channel->mmap_len,
369 (unsigned long long) channel->max_sb_size);
370 end:
371 return channel;
372 }
373
374 /*
375 * Add a channel to the global list protected by a mutex.
376 */
377 int consumer_add_channel(struct lttng_consumer_channel *channel)
378 {
379 pthread_mutex_lock(&consumer_data.lock);
380 /* Steal channel identifier, for UST */
381 consumer_steal_channel_key(channel->key);
382 cds_list_add(&channel->list, &consumer_data.channel_list.head);
383 pthread_mutex_unlock(&consumer_data.lock);
384 return 0;
385 }
386
387 /*
388 * Allocate the pollfd structure and the local view of the out fds to avoid
389 * doing a lookup in the linked list and concurrency issues when writing is
390 * needed. Called with consumer_data.lock held.
391 *
392 * Returns the number of fds in the structures.
393 */
394 int consumer_update_poll_array(
395 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
396 struct lttng_consumer_stream **local_stream)
397 {
398 struct lttng_consumer_stream *iter;
399 int i = 0;
400
401 DBG("Updating poll fd array");
402 cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
403 if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
404 continue;
405 }
406 DBG("Active FD %d", iter->wait_fd);
407 (*pollfd)[i].fd = iter->wait_fd;
408 (*pollfd)[i].events = POLLIN | POLLPRI;
409 local_stream[i] = iter;
410 i++;
411 }
412
413 /*
414 * Insert the consumer_poll_pipe at the end of the array and don't
415 * increment i so nb_fd is the number of real FD.
416 */
417 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
418 (*pollfd)[i].events = POLLIN;
419 return i;
420 }
421
422 /*
423 * Poll on the should_quit pipe and the command socket return -1 on error and
424 * should exit, 0 if data is available on the command socket
425 */
426 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
427 {
428 int num_rdy;
429
430 num_rdy = poll(consumer_sockpoll, 2, -1);
431 if (num_rdy == -1) {
432 perror("Poll error");
433 goto exit;
434 }
435 if (consumer_sockpoll[0].revents == POLLIN) {
436 DBG("consumer_should_quit wake up");
437 goto exit;
438 }
439 return 0;
440
441 exit:
442 return -1;
443 }
444
445 /*
446 * Set the error socket.
447 */
448 void lttng_consumer_set_error_sock(
449 struct lttng_consumer_local_data *ctx, int sock)
450 {
451 ctx->consumer_error_socket = sock;
452 }
453
454 /*
455 * Set the command socket path.
456 */
457
458 void lttng_consumer_set_command_sock_path(
459 struct lttng_consumer_local_data *ctx, char *sock)
460 {
461 ctx->consumer_command_sock_path = sock;
462 }
463
464 /*
465 * Send return code to the session daemon.
466 * If the socket is not defined, we return 0, it is not a fatal error
467 */
468 int lttng_consumer_send_error(
469 struct lttng_consumer_local_data *ctx, int cmd)
470 {
471 if (ctx->consumer_error_socket > 0) {
472 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
473 sizeof(enum lttcomm_sessiond_command));
474 }
475
476 return 0;
477 }
478
479 /*
480 * Close all the tracefiles and stream fds, should be called when all instances
481 * are destroyed.
482 */
483 void lttng_consumer_cleanup(void)
484 {
485 struct lttng_consumer_stream *iter, *tmp;
486 struct lttng_consumer_channel *citer, *ctmp;
487
488 /*
489 * close all outfd. Called when there are no more threads
490 * running (after joining on the threads), no need to protect
491 * list iteration with mutex.
492 */
493 cds_list_for_each_entry_safe(iter, tmp,
494 &consumer_data.stream_list.head, list) {
495 consumer_del_stream(iter);
496 }
497 cds_list_for_each_entry_safe(citer, ctmp,
498 &consumer_data.channel_list.head, list) {
499 consumer_del_channel(citer);
500 }
501 }
502
503 /*
504 * Called from signal handler.
505 */
506 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
507 {
508 int ret;
509 consumer_quit = 1;
510 ret = write(ctx->consumer_should_quit[1], "4", 1);
511 if (ret < 0) {
512 perror("write consumer quit");
513 }
514 }
515
516 void lttng_consumer_sync_trace_file(
517 struct lttng_consumer_stream *stream, off_t orig_offset)
518 {
519 int outfd = stream->out_fd;
520
521 /*
522 * This does a blocking write-and-wait on any page that belongs to the
523 * subbuffer prior to the one we just wrote.
524 * Don't care about error values, as these are just hints and ways to
525 * limit the amount of page cache used.
526 */
527 if (orig_offset < stream->chan->max_sb_size) {
528 return;
529 }
530 sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
531 stream->chan->max_sb_size,
532 SYNC_FILE_RANGE_WAIT_BEFORE
533 | SYNC_FILE_RANGE_WRITE
534 | SYNC_FILE_RANGE_WAIT_AFTER);
535 /*
536 * Give hints to the kernel about how we access the file:
537 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
538 * we write it.
539 *
540 * We need to call fadvise again after the file grows because the
541 * kernel does not seem to apply fadvise to non-existing parts of the
542 * file.
543 *
544 * Call fadvise _after_ having waited for the page writeback to
545 * complete because the dirty page writeback semantic is not well
546 * defined. So it can be expected to lead to lower throughput in
547 * streaming.
548 */
549 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
550 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
551 }
552
553 /*
554 * Initialise the necessary environnement :
555 * - create a new context
556 * - create the poll_pipe
557 * - create the should_quit pipe (for signal handler)
558 * - create the thread pipe (for splice)
559 *
560 * Takes a function pointer as argument, this function is called when data is
561 * available on a buffer. This function is responsible to do the
562 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
563 * buffer configuration and then kernctl_put_next_subbuf at the end.
564 *
565 * Returns a pointer to the new context or NULL on error.
566 */
567 struct lttng_consumer_local_data *lttng_consumer_create(
568 enum lttng_consumer_type type,
569 int (*buffer_ready)(struct lttng_consumer_stream *stream,
570 struct lttng_consumer_local_data *ctx),
571 int (*recv_channel)(struct lttng_consumer_channel *channel),
572 int (*recv_stream)(struct lttng_consumer_stream *stream),
573 int (*update_stream)(int stream_key, uint32_t state))
574 {
575 int ret, i;
576 struct lttng_consumer_local_data *ctx;
577
578 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
579 consumer_data.type == type);
580 consumer_data.type = type;
581
582 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
583 if (ctx == NULL) {
584 perror("allocating context");
585 goto error;
586 }
587
588 ctx->consumer_error_socket = -1;
589 /* assign the callbacks */
590 ctx->on_buffer_ready = buffer_ready;
591 ctx->on_recv_channel = recv_channel;
592 ctx->on_recv_stream = recv_stream;
593 ctx->on_update_stream = update_stream;
594
595 ret = pipe(ctx->consumer_poll_pipe);
596 if (ret < 0) {
597 perror("Error creating poll pipe");
598 goto error_poll_pipe;
599 }
600
601 ret = pipe(ctx->consumer_should_quit);
602 if (ret < 0) {
603 perror("Error creating recv pipe");
604 goto error_quit_pipe;
605 }
606
607 ret = pipe(ctx->consumer_thread_pipe);
608 if (ret < 0) {
609 perror("Error creating thread pipe");
610 goto error_thread_pipe;
611 }
612
613 return ctx;
614
615
616 error_thread_pipe:
617 for (i = 0; i < 2; i++) {
618 int err;
619
620 err = close(ctx->consumer_should_quit[i]);
621 assert(!err);
622 }
623 error_quit_pipe:
624 for (i = 0; i < 2; i++) {
625 int err;
626
627 err = close(ctx->consumer_poll_pipe[i]);
628 assert(!err);
629 }
630 error_poll_pipe:
631 free(ctx);
632 error:
633 return NULL;
634 }
635
636 /*
637 * Close all fds associated with the instance and free the context.
638 */
639 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
640 {
641 close(ctx->consumer_error_socket);
642 close(ctx->consumer_thread_pipe[0]);
643 close(ctx->consumer_thread_pipe[1]);
644 close(ctx->consumer_poll_pipe[0]);
645 close(ctx->consumer_poll_pipe[1]);
646 close(ctx->consumer_should_quit[0]);
647 close(ctx->consumer_should_quit[1]);
648 unlink(ctx->consumer_command_sock_path);
649 free(ctx);
650 }
651
652 /*
653 * Mmap the ring buffer, read it and write the data to the tracefile.
654 *
655 * Returns the number of bytes written
656 */
657 int lttng_consumer_on_read_subbuffer_mmap(
658 struct lttng_consumer_local_data *ctx,
659 struct lttng_consumer_stream *stream, unsigned long len)
660 {
661 switch (consumer_data.type) {
662 case LTTNG_CONSUMER_KERNEL:
663 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
664 case LTTNG_CONSUMER32_UST:
665 case LTTNG_CONSUMER64_UST:
666 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
667 default:
668 ERR("Unknown consumer_data type");
669 assert(0);
670 }
671 }
672
673 /*
674 * Splice the data from the ring buffer to the tracefile.
675 *
676 * Returns the number of bytes spliced.
677 */
678 int lttng_consumer_on_read_subbuffer_splice(
679 struct lttng_consumer_local_data *ctx,
680 struct lttng_consumer_stream *stream, unsigned long len)
681 {
682 switch (consumer_data.type) {
683 case LTTNG_CONSUMER_KERNEL:
684 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
685 case LTTNG_CONSUMER32_UST:
686 case LTTNG_CONSUMER64_UST:
687 return -ENOSYS;
688 default:
689 ERR("Unknown consumer_data type");
690 assert(0);
691 return -ENOSYS;
692 }
693
694 }
695
696 /*
697 * Take a snapshot for a specific fd
698 *
699 * Returns 0 on success, < 0 on error
700 */
701 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
702 struct lttng_consumer_stream *stream)
703 {
704 switch (consumer_data.type) {
705 case LTTNG_CONSUMER_KERNEL:
706 return lttng_kconsumer_take_snapshot(ctx, stream);
707 case LTTNG_CONSUMER32_UST:
708 case LTTNG_CONSUMER64_UST:
709 return lttng_ustconsumer_take_snapshot(ctx, stream);
710 default:
711 ERR("Unknown consumer_data type");
712 assert(0);
713 return -ENOSYS;
714 }
715
716 }
717
718 /*
719 * Get the produced position
720 *
721 * Returns 0 on success, < 0 on error
722 */
723 int lttng_consumer_get_produced_snapshot(
724 struct lttng_consumer_local_data *ctx,
725 struct lttng_consumer_stream *stream,
726 unsigned long *pos)
727 {
728 switch (consumer_data.type) {
729 case LTTNG_CONSUMER_KERNEL:
730 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
731 case LTTNG_CONSUMER32_UST:
732 case LTTNG_CONSUMER64_UST:
733 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
734 default:
735 ERR("Unknown consumer_data type");
736 assert(0);
737 return -ENOSYS;
738 }
739 }
740
741 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
742 int sock, struct pollfd *consumer_sockpoll)
743 {
744 switch (consumer_data.type) {
745 case LTTNG_CONSUMER_KERNEL:
746 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
747 case LTTNG_CONSUMER32_UST:
748 case LTTNG_CONSUMER64_UST:
749 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
750 default:
751 ERR("Unknown consumer_data type");
752 assert(0);
753 return -ENOSYS;
754 }
755 }
756
757 /*
758 * This thread polls the fds in the ltt_fd_list to consume the data and write
759 * it to tracefile if necessary.
760 */
761 void *lttng_consumer_thread_poll_fds(void *data)
762 {
763 int num_rdy, num_hup, high_prio, ret, i;
764 struct pollfd *pollfd = NULL;
765 /* local view of the streams */
766 struct lttng_consumer_stream **local_stream = NULL;
767 /* local view of consumer_data.fds_count */
768 int nb_fd = 0;
769 char tmp;
770 int tmp2;
771 struct lttng_consumer_local_data *ctx = data;
772
773 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
774
775 while (1) {
776 high_prio = 0;
777 num_hup = 0;
778
779 /*
780 * the ltt_fd_list has been updated, we need to update our
781 * local array as well
782 */
783 pthread_mutex_lock(&consumer_data.lock);
784 if (consumer_data.need_update) {
785 if (pollfd != NULL) {
786 free(pollfd);
787 pollfd = NULL;
788 }
789 if (local_stream != NULL) {
790 free(local_stream);
791 local_stream = NULL;
792 }
793
794 /* allocate for all fds + 1 for the consumer_poll_pipe */
795 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
796 if (pollfd == NULL) {
797 perror("pollfd malloc");
798 pthread_mutex_unlock(&consumer_data.lock);
799 goto end;
800 }
801
802 /* allocate for all fds + 1 for the consumer_poll_pipe */
803 local_stream = zmalloc((consumer_data.stream_count + 1) *
804 sizeof(struct lttng_consumer_stream));
805 if (local_stream == NULL) {
806 perror("local_stream malloc");
807 pthread_mutex_unlock(&consumer_data.lock);
808 goto end;
809 }
810 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
811 if (ret < 0) {
812 ERR("Error in allocating pollfd or local_outfds");
813 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
814 pthread_mutex_unlock(&consumer_data.lock);
815 goto end;
816 }
817 nb_fd = ret;
818 consumer_data.need_update = 0;
819 }
820 pthread_mutex_unlock(&consumer_data.lock);
821
822 /* poll on the array of fds */
823 DBG("polling on %d fd", nb_fd + 1);
824 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
825 DBG("poll num_rdy : %d", num_rdy);
826 if (num_rdy == -1) {
827 perror("Poll error");
828 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
829 goto end;
830 } else if (num_rdy == 0) {
831 DBG("Polling thread timed out");
832 goto end;
833 }
834
835 /* No FDs and consumer_quit, consumer_cleanup the thread */
836 if (nb_fd == 0 && consumer_quit == 1) {
837 goto end;
838 }
839
840 /*
841 * If the consumer_poll_pipe triggered poll go
842 * directly to the beginning of the loop to update the
843 * array. We want to prioritize array update over
844 * low-priority reads.
845 */
846 if (pollfd[nb_fd].revents & POLLIN) {
847 DBG("consumer_poll_pipe wake up");
848 tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
849 if (tmp2 < 0) {
850 perror("read consumer poll");
851 }
852 continue;
853 }
854
855 /* Take care of high priority channels first. */
856 for (i = 0; i < nb_fd; i++) {
857 if (pollfd[i].revents & POLLPRI) {
858 DBG("Urgent read on fd %d", pollfd[i].fd);
859 high_prio = 1;
860 ret = ctx->on_buffer_ready(local_stream[i], ctx);
861 /* it's ok to have an unavailable sub-buffer */
862 if (ret == EAGAIN) {
863 ret = 0;
864 }
865 } else if (pollfd[i].revents & POLLERR) {
866 ERR("Error returned in polling fd %d.", pollfd[i].fd);
867 consumer_del_stream(local_stream[i]);
868 num_hup++;
869 } else if (pollfd[i].revents & POLLNVAL) {
870 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
871 consumer_del_stream(local_stream[i]);
872 num_hup++;
873 } else if ((pollfd[i].revents & POLLHUP) &&
874 !(pollfd[i].revents & POLLIN)) {
875 if (consumer_data.type == LTTNG_CONSUMER32_UST
876 || consumer_data.type == LTTNG_CONSUMER64_UST) {
877 DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
878 pollfd[i].fd);
879 if (!local_stream[i]->hangup_flush_done) {
880 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
881 /* read after flush */
882 do {
883 ret = ctx->on_buffer_ready(local_stream[i], ctx);
884 } while (ret == EAGAIN);
885 }
886 } else {
887 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
888 }
889 consumer_del_stream(local_stream[i]);
890 num_hup++;
891 }
892 }
893
894 /* If every buffer FD has hung up, we end the read loop here */
895 if (nb_fd > 0 && num_hup == nb_fd) {
896 DBG("every buffer FD has hung up\n");
897 if (consumer_quit == 1) {
898 goto end;
899 }
900 continue;
901 }
902
903 /* Take care of low priority channels. */
904 if (high_prio == 0) {
905 for (i = 0; i < nb_fd; i++) {
906 if (pollfd[i].revents & POLLIN) {
907 DBG("Normal read on fd %d", pollfd[i].fd);
908 ret = ctx->on_buffer_ready(local_stream[i], ctx);
909 /* it's ok to have an unavailable subbuffer */
910 if (ret == EAGAIN) {
911 ret = 0;
912 }
913 }
914 }
915 }
916 }
917 end:
918 DBG("polling thread exiting");
919 if (pollfd != NULL) {
920 free(pollfd);
921 pollfd = NULL;
922 }
923 if (local_stream != NULL) {
924 free(local_stream);
925 local_stream = NULL;
926 }
927 return NULL;
928 }
929
930 /*
931 * This thread listens on the consumerd socket and receives the file
932 * descriptors from the session daemon.
933 */
934 void *lttng_consumer_thread_receive_fds(void *data)
935 {
936 int sock, client_socket, ret;
937 /*
938 * structure to poll for incoming data on communication socket avoids
939 * making blocking sockets.
940 */
941 struct pollfd consumer_sockpoll[2];
942 struct lttng_consumer_local_data *ctx = data;
943
944 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
945 unlink(ctx->consumer_command_sock_path);
946 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
947 if (client_socket < 0) {
948 ERR("Cannot create command socket");
949 goto end;
950 }
951
952 ret = lttcomm_listen_unix_sock(client_socket);
953 if (ret < 0) {
954 goto end;
955 }
956
957 DBG("Sending ready command to lttng-sessiond");
958 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
959 /* return < 0 on error, but == 0 is not fatal */
960 if (ret < 0) {
961 ERR("Error sending ready command to lttng-sessiond");
962 goto end;
963 }
964
965 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
966 if (ret < 0) {
967 perror("fcntl O_NONBLOCK");
968 goto end;
969 }
970
971 /* prepare the FDs to poll : to client socket and the should_quit pipe */
972 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
973 consumer_sockpoll[0].events = POLLIN | POLLPRI;
974 consumer_sockpoll[1].fd = client_socket;
975 consumer_sockpoll[1].events = POLLIN | POLLPRI;
976
977 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
978 goto end;
979 }
980 DBG("Connection on client_socket");
981
982 /* Blocking call, waiting for transmission */
983 sock = lttcomm_accept_unix_sock(client_socket);
984 if (sock <= 0) {
985 WARN("On accept");
986 goto end;
987 }
988 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
989 if (ret < 0) {
990 perror("fcntl O_NONBLOCK");
991 goto end;
992 }
993
994 /* update the polling structure to poll on the established socket */
995 consumer_sockpoll[1].fd = sock;
996 consumer_sockpoll[1].events = POLLIN | POLLPRI;
997
998 while (1) {
999 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1000 goto end;
1001 }
1002 DBG("Incoming command on sock");
1003 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1004 if (ret == -ENOENT) {
1005 DBG("Received STOP command");
1006 goto end;
1007 }
1008 if (ret < 0) {
1009 ERR("Communication interrupted on command socket");
1010 goto end;
1011 }
1012 if (consumer_quit) {
1013 DBG("consumer_thread_receive_fds received quit from signal");
1014 goto end;
1015 }
1016 DBG("received fds on sock");
1017 }
1018 end:
1019 DBG("consumer_thread_receive_fds exiting");
1020
1021 /*
1022 * when all fds have hung up, the polling thread
1023 * can exit cleanly
1024 */
1025 consumer_quit = 1;
1026
1027 /*
1028 * 2s of grace period, if no polling events occur during
1029 * this period, the polling thread will exit even if there
1030 * are still open FDs (should not happen, but safety mechanism).
1031 */
1032 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1033
1034 /* wake up the polling thread */
1035 ret = write(ctx->consumer_poll_pipe[1], "4", 1);
1036 if (ret < 0) {
1037 perror("poll pipe write");
1038 }
1039 return NULL;
1040 }
1041
1042 int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
1043 struct lttng_consumer_local_data *ctx)
1044 {
1045 switch (consumer_data.type) {
1046 case LTTNG_CONSUMER_KERNEL:
1047 return lttng_kconsumer_read_subbuffer(stream, ctx);
1048 case LTTNG_CONSUMER32_UST:
1049 case LTTNG_CONSUMER64_UST:
1050 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1051 default:
1052 ERR("Unknown consumer_data type");
1053 assert(0);
1054 return -ENOSYS;
1055 }
1056 }
1057
1058 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1059 {
1060 switch (consumer_data.type) {
1061 case LTTNG_CONSUMER_KERNEL:
1062 return lttng_kconsumer_on_recv_stream(stream);
1063 case LTTNG_CONSUMER32_UST:
1064 case LTTNG_CONSUMER64_UST:
1065 return lttng_ustconsumer_on_recv_stream(stream);
1066 default:
1067 ERR("Unknown consumer_data type");
1068 assert(0);
1069 return -ENOSYS;
1070 }
1071 }
This page took 0.081551 seconds and 3 git commands to generate.