Merge branch 'master' of git://git.lttng.org/lttng-tools
[lttng-tools.git] / src / common / consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <unistd.h>
30
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/sessiond-comm.h>
34 #include <common/kernel-consumer/kernel-consumer.h>
35 #include <common/ust-consumer/ust-consumer.h>
36
37 #include "consumer.h"
38
39 struct lttng_consumer_global_data consumer_data = {
40 .stream_count = 0,
41 .need_update = 1,
42 .type = LTTNG_CONSUMER_UNKNOWN,
43 };
44
45 /* timeout parameter, to control the polling thread grace period. */
46 int consumer_poll_timeout = -1;
47
48 /*
49 * Flag to inform the polling thread to quit when all fd hung up. Updated by
50 * the consumer_thread_receive_fds when it notices that all fds has hung up.
51 * Also updated by the signal handler (consumer_should_exit()). Read by the
52 * polling threads.
53 */
54 volatile int consumer_quit = 0;
55
56 /*
57 * Find a stream. The consumer_data.lock must be locked during this
58 * call.
59 */
60 static struct lttng_consumer_stream *consumer_find_stream(int key)
61 {
62 struct lttng_ht_iter iter;
63 struct lttng_ht_node_ulong *node;
64 struct lttng_consumer_stream *stream = NULL;
65
66 /* Negative keys are lookup failures */
67 if (key < 0)
68 return NULL;
69
70 rcu_read_lock();
71
72 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
73 &iter);
74 node = lttng_ht_iter_get_node_ulong(&iter);
75 if (node != NULL) {
76 stream = caa_container_of(node, struct lttng_consumer_stream, node);
77 }
78
79 rcu_read_unlock();
80
81 return stream;
82 }
83
84 static void consumer_steal_stream_key(int key)
85 {
86 struct lttng_consumer_stream *stream;
87
88 rcu_read_lock();
89 stream = consumer_find_stream(key);
90 if (stream) {
91 stream->key = -1;
92 /*
93 * We don't want the lookup to match, but we still need
94 * to iterate on this stream when iterating over the hash table. Just
95 * change the node key.
96 */
97 stream->node.key = -1;
98 }
99 rcu_read_unlock();
100 }
101
102 static struct lttng_consumer_channel *consumer_find_channel(int key)
103 {
104 struct lttng_ht_iter iter;
105 struct lttng_ht_node_ulong *node;
106 struct lttng_consumer_channel *channel = NULL;
107
108 /* Negative keys are lookup failures */
109 if (key < 0)
110 return NULL;
111
112 rcu_read_lock();
113
114 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
115 &iter);
116 node = lttng_ht_iter_get_node_ulong(&iter);
117 if (node != NULL) {
118 channel = caa_container_of(node, struct lttng_consumer_channel, node);
119 }
120
121 rcu_read_unlock();
122
123 return channel;
124 }
125
126 static void consumer_steal_channel_key(int key)
127 {
128 struct lttng_consumer_channel *channel;
129
130 rcu_read_lock();
131 channel = consumer_find_channel(key);
132 if (channel) {
133 channel->key = -1;
134 /*
135 * We don't want the lookup to match, but we still need
136 * to iterate on this channel when iterating over the hash table. Just
137 * change the node key.
138 */
139 channel->node.key = -1;
140 }
141 rcu_read_unlock();
142 }
143
144 static
145 void consumer_free_stream(struct rcu_head *head)
146 {
147 struct lttng_ht_node_ulong *node =
148 caa_container_of(head, struct lttng_ht_node_ulong, head);
149 struct lttng_consumer_stream *stream =
150 caa_container_of(node, struct lttng_consumer_stream, node);
151
152 free(stream);
153 }
154
155 /*
156 * Remove a stream from the global list protected by a mutex. This
157 * function is also responsible for freeing its data structures.
158 */
159 void consumer_del_stream(struct lttng_consumer_stream *stream)
160 {
161 int ret;
162 struct lttng_ht_iter iter;
163 struct lttng_consumer_channel *free_chan = NULL;
164
165 pthread_mutex_lock(&consumer_data.lock);
166
167 switch (consumer_data.type) {
168 case LTTNG_CONSUMER_KERNEL:
169 if (stream->mmap_base != NULL) {
170 ret = munmap(stream->mmap_base, stream->mmap_len);
171 if (ret != 0) {
172 perror("munmap");
173 }
174 }
175 break;
176 case LTTNG_CONSUMER32_UST:
177 case LTTNG_CONSUMER64_UST:
178 lttng_ustconsumer_del_stream(stream);
179 break;
180 default:
181 ERR("Unknown consumer_data type");
182 assert(0);
183 goto end;
184 }
185
186 rcu_read_lock();
187 iter.iter.node = &stream->node.node;
188 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
189 assert(!ret);
190
191 rcu_read_unlock();
192
193 if (consumer_data.stream_count <= 0) {
194 goto end;
195 }
196 consumer_data.stream_count--;
197 if (!stream) {
198 goto end;
199 }
200 if (stream->out_fd >= 0) {
201 ret = close(stream->out_fd);
202 if (ret) {
203 PERROR("close");
204 }
205 }
206 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
207 ret = close(stream->wait_fd);
208 if (ret) {
209 PERROR("close");
210 }
211 }
212 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
213 ret = close(stream->shm_fd);
214 if (ret) {
215 PERROR("close");
216 }
217 }
218 if (!--stream->chan->refcount)
219 free_chan = stream->chan;
220
221 call_rcu(&stream->node.head, consumer_free_stream);
222 end:
223 consumer_data.need_update = 1;
224 pthread_mutex_unlock(&consumer_data.lock);
225
226 if (free_chan)
227 consumer_del_channel(free_chan);
228 }
229
230 struct lttng_consumer_stream *consumer_allocate_stream(
231 int channel_key, int stream_key,
232 int shm_fd, int wait_fd,
233 enum lttng_consumer_stream_state state,
234 uint64_t mmap_len,
235 enum lttng_event_output output,
236 const char *path_name,
237 uid_t uid,
238 gid_t gid)
239 {
240 struct lttng_consumer_stream *stream;
241 int ret;
242
243 stream = zmalloc(sizeof(*stream));
244 if (stream == NULL) {
245 perror("malloc struct lttng_consumer_stream");
246 goto end;
247 }
248 stream->chan = consumer_find_channel(channel_key);
249 if (!stream->chan) {
250 perror("Unable to find channel key");
251 goto end;
252 }
253 stream->chan->refcount++;
254 stream->key = stream_key;
255 stream->shm_fd = shm_fd;
256 stream->wait_fd = wait_fd;
257 stream->out_fd = -1;
258 stream->out_fd_offset = 0;
259 stream->state = state;
260 stream->mmap_len = mmap_len;
261 stream->mmap_base = NULL;
262 stream->output = output;
263 stream->uid = uid;
264 stream->gid = gid;
265 strncpy(stream->path_name, path_name, PATH_MAX - 1);
266 stream->path_name[PATH_MAX - 1] = '\0';
267 lttng_ht_node_init_ulong(&stream->node, stream->key);
268
269 switch (consumer_data.type) {
270 case LTTNG_CONSUMER_KERNEL:
271 break;
272 case LTTNG_CONSUMER32_UST:
273 case LTTNG_CONSUMER64_UST:
274 stream->cpu = stream->chan->cpucount++;
275 ret = lttng_ustconsumer_allocate_stream(stream);
276 if (ret) {
277 free(stream);
278 return NULL;
279 }
280 break;
281 default:
282 ERR("Unknown consumer_data type");
283 assert(0);
284 goto end;
285 }
286 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
287 stream->path_name, stream->key,
288 stream->shm_fd,
289 stream->wait_fd,
290 (unsigned long long) stream->mmap_len,
291 stream->out_fd);
292 end:
293 return stream;
294 }
295
296 /*
297 * Add a stream to the global list protected by a mutex.
298 */
299 int consumer_add_stream(struct lttng_consumer_stream *stream)
300 {
301 int ret = 0;
302
303 pthread_mutex_lock(&consumer_data.lock);
304 /* Steal stream identifier, for UST */
305 consumer_steal_stream_key(stream->key);
306 rcu_read_lock();
307 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
308 rcu_read_unlock();
309 consumer_data.stream_count++;
310 consumer_data.need_update = 1;
311
312 switch (consumer_data.type) {
313 case LTTNG_CONSUMER_KERNEL:
314 break;
315 case LTTNG_CONSUMER32_UST:
316 case LTTNG_CONSUMER64_UST:
317 /* Streams are in CPU number order (we rely on this) */
318 stream->cpu = stream->chan->nr_streams++;
319 break;
320 default:
321 ERR("Unknown consumer_data type");
322 assert(0);
323 goto end;
324 }
325
326 end:
327 pthread_mutex_unlock(&consumer_data.lock);
328
329 return ret;
330 }
331
332 /*
333 * Update a stream according to what we just received.
334 */
335 void consumer_change_stream_state(int stream_key,
336 enum lttng_consumer_stream_state state)
337 {
338 struct lttng_consumer_stream *stream;
339
340 pthread_mutex_lock(&consumer_data.lock);
341 stream = consumer_find_stream(stream_key);
342 if (stream) {
343 stream->state = state;
344 }
345 consumer_data.need_update = 1;
346 pthread_mutex_unlock(&consumer_data.lock);
347 }
348
349 static
350 void consumer_free_channel(struct rcu_head *head)
351 {
352 struct lttng_ht_node_ulong *node =
353 caa_container_of(head, struct lttng_ht_node_ulong, head);
354 struct lttng_consumer_channel *channel =
355 caa_container_of(node, struct lttng_consumer_channel, node);
356
357 free(channel);
358 }
359
360 /*
361 * Remove a channel from the global list protected by a mutex. This
362 * function is also responsible for freeing its data structures.
363 */
364 void consumer_del_channel(struct lttng_consumer_channel *channel)
365 {
366 int ret;
367 struct lttng_ht_iter iter;
368
369 pthread_mutex_lock(&consumer_data.lock);
370
371 switch (consumer_data.type) {
372 case LTTNG_CONSUMER_KERNEL:
373 break;
374 case LTTNG_CONSUMER32_UST:
375 case LTTNG_CONSUMER64_UST:
376 lttng_ustconsumer_del_channel(channel);
377 break;
378 default:
379 ERR("Unknown consumer_data type");
380 assert(0);
381 goto end;
382 }
383
384 rcu_read_lock();
385 iter.iter.node = &channel->node.node;
386 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
387 assert(!ret);
388 rcu_read_unlock();
389
390 if (channel->mmap_base != NULL) {
391 ret = munmap(channel->mmap_base, channel->mmap_len);
392 if (ret != 0) {
393 perror("munmap");
394 }
395 }
396 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
397 ret = close(channel->wait_fd);
398 if (ret) {
399 PERROR("close");
400 }
401 }
402 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
403 ret = close(channel->shm_fd);
404 if (ret) {
405 PERROR("close");
406 }
407 }
408
409 call_rcu(&channel->node.head, consumer_free_channel);
410 end:
411 pthread_mutex_unlock(&consumer_data.lock);
412 }
413
414 struct lttng_consumer_channel *consumer_allocate_channel(
415 int channel_key,
416 int shm_fd, int wait_fd,
417 uint64_t mmap_len,
418 uint64_t max_sb_size)
419 {
420 struct lttng_consumer_channel *channel;
421 int ret;
422
423 channel = zmalloc(sizeof(*channel));
424 if (channel == NULL) {
425 perror("malloc struct lttng_consumer_channel");
426 goto end;
427 }
428 channel->key = channel_key;
429 channel->shm_fd = shm_fd;
430 channel->wait_fd = wait_fd;
431 channel->mmap_len = mmap_len;
432 channel->max_sb_size = max_sb_size;
433 channel->refcount = 0;
434 channel->nr_streams = 0;
435 lttng_ht_node_init_ulong(&channel->node, channel->key);
436
437 switch (consumer_data.type) {
438 case LTTNG_CONSUMER_KERNEL:
439 channel->mmap_base = NULL;
440 channel->mmap_len = 0;
441 break;
442 case LTTNG_CONSUMER32_UST:
443 case LTTNG_CONSUMER64_UST:
444 ret = lttng_ustconsumer_allocate_channel(channel);
445 if (ret) {
446 free(channel);
447 return NULL;
448 }
449 break;
450 default:
451 ERR("Unknown consumer_data type");
452 assert(0);
453 goto end;
454 }
455 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
456 channel->key,
457 channel->shm_fd,
458 channel->wait_fd,
459 (unsigned long long) channel->mmap_len,
460 (unsigned long long) channel->max_sb_size);
461 end:
462 return channel;
463 }
464
465 /*
466 * Add a channel to the global list protected by a mutex.
467 */
468 int consumer_add_channel(struct lttng_consumer_channel *channel)
469 {
470 pthread_mutex_lock(&consumer_data.lock);
471 /* Steal channel identifier, for UST */
472 consumer_steal_channel_key(channel->key);
473 rcu_read_lock();
474 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
475 rcu_read_unlock();
476 pthread_mutex_unlock(&consumer_data.lock);
477
478 return 0;
479 }
480
481 /*
482 * Allocate the pollfd structure and the local view of the out fds to avoid
483 * doing a lookup in the linked list and concurrency issues when writing is
484 * needed. Called with consumer_data.lock held.
485 *
486 * Returns the number of fds in the structures.
487 */
488 int consumer_update_poll_array(
489 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
490 struct lttng_consumer_stream **local_stream)
491 {
492 int i = 0;
493 struct lttng_ht_iter iter;
494 struct lttng_consumer_stream *stream;
495
496 DBG("Updating poll fd array");
497 rcu_read_lock();
498 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
499 node.node) {
500 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
501 continue;
502 }
503 DBG("Active FD %d", stream->wait_fd);
504 (*pollfd)[i].fd = stream->wait_fd;
505 (*pollfd)[i].events = POLLIN | POLLPRI;
506 local_stream[i] = stream;
507 i++;
508 }
509 rcu_read_unlock();
510
511 /*
512 * Insert the consumer_poll_pipe at the end of the array and don't
513 * increment i so nb_fd is the number of real FD.
514 */
515 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
516 (*pollfd)[i].events = POLLIN;
517 return i;
518 }
519
520 /*
521 * Poll on the should_quit pipe and the command socket return -1 on error and
522 * should exit, 0 if data is available on the command socket
523 */
524 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
525 {
526 int num_rdy;
527
528 restart:
529 num_rdy = poll(consumer_sockpoll, 2, -1);
530 if (num_rdy == -1) {
531 /*
532 * Restart interrupted system call.
533 */
534 if (errno == EINTR) {
535 goto restart;
536 }
537 perror("Poll error");
538 goto exit;
539 }
540 if (consumer_sockpoll[0].revents == POLLIN) {
541 DBG("consumer_should_quit wake up");
542 goto exit;
543 }
544 return 0;
545
546 exit:
547 return -1;
548 }
549
550 /*
551 * Set the error socket.
552 */
553 void lttng_consumer_set_error_sock(
554 struct lttng_consumer_local_data *ctx, int sock)
555 {
556 ctx->consumer_error_socket = sock;
557 }
558
559 /*
560 * Set the command socket path.
561 */
562
563 void lttng_consumer_set_command_sock_path(
564 struct lttng_consumer_local_data *ctx, char *sock)
565 {
566 ctx->consumer_command_sock_path = sock;
567 }
568
569 /*
570 * Send return code to the session daemon.
571 * If the socket is not defined, we return 0, it is not a fatal error
572 */
573 int lttng_consumer_send_error(
574 struct lttng_consumer_local_data *ctx, int cmd)
575 {
576 if (ctx->consumer_error_socket > 0) {
577 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
578 sizeof(enum lttcomm_sessiond_command));
579 }
580
581 return 0;
582 }
583
584 /*
585 * Close all the tracefiles and stream fds, should be called when all instances
586 * are destroyed.
587 */
588 void lttng_consumer_cleanup(void)
589 {
590 struct lttng_ht_iter iter;
591 struct lttng_ht_node_ulong *node;
592
593 rcu_read_lock();
594
595 /*
596 * close all outfd. Called when there are no more threads running (after
597 * joining on the threads), no need to protect list iteration with mutex.
598 */
599 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
600 node) {
601 struct lttng_consumer_stream *stream =
602 caa_container_of(node, struct lttng_consumer_stream, node);
603 consumer_del_stream(stream);
604 }
605
606 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
607 node) {
608 struct lttng_consumer_channel *channel =
609 caa_container_of(node, struct lttng_consumer_channel, node);
610 consumer_del_channel(channel);
611 }
612
613 rcu_read_unlock();
614 }
615
616 /*
617 * Called from signal handler.
618 */
619 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
620 {
621 int ret;
622 consumer_quit = 1;
623 ret = write(ctx->consumer_should_quit[1], "4", 1);
624 if (ret < 0) {
625 perror("write consumer quit");
626 }
627 }
628
629 void lttng_consumer_sync_trace_file(
630 struct lttng_consumer_stream *stream, off_t orig_offset)
631 {
632 int outfd = stream->out_fd;
633
634 /*
635 * This does a blocking write-and-wait on any page that belongs to the
636 * subbuffer prior to the one we just wrote.
637 * Don't care about error values, as these are just hints and ways to
638 * limit the amount of page cache used.
639 */
640 if (orig_offset < stream->chan->max_sb_size) {
641 return;
642 }
643 lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
644 stream->chan->max_sb_size,
645 SYNC_FILE_RANGE_WAIT_BEFORE
646 | SYNC_FILE_RANGE_WRITE
647 | SYNC_FILE_RANGE_WAIT_AFTER);
648 /*
649 * Give hints to the kernel about how we access the file:
650 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
651 * we write it.
652 *
653 * We need to call fadvise again after the file grows because the
654 * kernel does not seem to apply fadvise to non-existing parts of the
655 * file.
656 *
657 * Call fadvise _after_ having waited for the page writeback to
658 * complete because the dirty page writeback semantic is not well
659 * defined. So it can be expected to lead to lower throughput in
660 * streaming.
661 */
662 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
663 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
664 }
665
666 /*
667 * Initialise the necessary environnement :
668 * - create a new context
669 * - create the poll_pipe
670 * - create the should_quit pipe (for signal handler)
671 * - create the thread pipe (for splice)
672 *
673 * Takes a function pointer as argument, this function is called when data is
674 * available on a buffer. This function is responsible to do the
675 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
676 * buffer configuration and then kernctl_put_next_subbuf at the end.
677 *
678 * Returns a pointer to the new context or NULL on error.
679 */
680 struct lttng_consumer_local_data *lttng_consumer_create(
681 enum lttng_consumer_type type,
682 ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
683 struct lttng_consumer_local_data *ctx),
684 int (*recv_channel)(struct lttng_consumer_channel *channel),
685 int (*recv_stream)(struct lttng_consumer_stream *stream),
686 int (*update_stream)(int stream_key, uint32_t state))
687 {
688 int ret, i;
689 struct lttng_consumer_local_data *ctx;
690
691 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
692 consumer_data.type == type);
693 consumer_data.type = type;
694
695 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
696 if (ctx == NULL) {
697 perror("allocating context");
698 goto error;
699 }
700
701 ctx->consumer_error_socket = -1;
702 /* assign the callbacks */
703 ctx->on_buffer_ready = buffer_ready;
704 ctx->on_recv_channel = recv_channel;
705 ctx->on_recv_stream = recv_stream;
706 ctx->on_update_stream = update_stream;
707
708 ret = pipe(ctx->consumer_poll_pipe);
709 if (ret < 0) {
710 perror("Error creating poll pipe");
711 goto error_poll_pipe;
712 }
713
714 ret = pipe(ctx->consumer_should_quit);
715 if (ret < 0) {
716 perror("Error creating recv pipe");
717 goto error_quit_pipe;
718 }
719
720 ret = pipe(ctx->consumer_thread_pipe);
721 if (ret < 0) {
722 perror("Error creating thread pipe");
723 goto error_thread_pipe;
724 }
725
726 return ctx;
727
728
729 error_thread_pipe:
730 for (i = 0; i < 2; i++) {
731 int err;
732
733 err = close(ctx->consumer_should_quit[i]);
734 if (err) {
735 PERROR("close");
736 }
737 }
738 error_quit_pipe:
739 for (i = 0; i < 2; i++) {
740 int err;
741
742 err = close(ctx->consumer_poll_pipe[i]);
743 if (err) {
744 PERROR("close");
745 }
746 }
747 error_poll_pipe:
748 free(ctx);
749 error:
750 return NULL;
751 }
752
753 /*
754 * Close all fds associated with the instance and free the context.
755 */
756 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
757 {
758 int ret;
759
760 ret = close(ctx->consumer_error_socket);
761 if (ret) {
762 PERROR("close");
763 }
764 ret = close(ctx->consumer_thread_pipe[0]);
765 if (ret) {
766 PERROR("close");
767 }
768 ret = close(ctx->consumer_thread_pipe[1]);
769 if (ret) {
770 PERROR("close");
771 }
772 ret = close(ctx->consumer_poll_pipe[0]);
773 if (ret) {
774 PERROR("close");
775 }
776 ret = close(ctx->consumer_poll_pipe[1]);
777 if (ret) {
778 PERROR("close");
779 }
780 ret = close(ctx->consumer_should_quit[0]);
781 if (ret) {
782 PERROR("close");
783 }
784 ret = close(ctx->consumer_should_quit[1]);
785 if (ret) {
786 PERROR("close");
787 }
788 unlink(ctx->consumer_command_sock_path);
789 free(ctx);
790 }
791
792 /*
793 * Mmap the ring buffer, read it and write the data to the tracefile.
794 *
795 * Returns the number of bytes written
796 */
797 ssize_t lttng_consumer_on_read_subbuffer_mmap(
798 struct lttng_consumer_local_data *ctx,
799 struct lttng_consumer_stream *stream, unsigned long len)
800 {
801 switch (consumer_data.type) {
802 case LTTNG_CONSUMER_KERNEL:
803 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
804 case LTTNG_CONSUMER32_UST:
805 case LTTNG_CONSUMER64_UST:
806 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
807 default:
808 ERR("Unknown consumer_data type");
809 assert(0);
810 }
811
812 return 0;
813 }
814
815 /*
816 * Splice the data from the ring buffer to the tracefile.
817 *
818 * Returns the number of bytes spliced.
819 */
820 ssize_t lttng_consumer_on_read_subbuffer_splice(
821 struct lttng_consumer_local_data *ctx,
822 struct lttng_consumer_stream *stream, unsigned long len)
823 {
824 switch (consumer_data.type) {
825 case LTTNG_CONSUMER_KERNEL:
826 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
827 case LTTNG_CONSUMER32_UST:
828 case LTTNG_CONSUMER64_UST:
829 return -ENOSYS;
830 default:
831 ERR("Unknown consumer_data type");
832 assert(0);
833 return -ENOSYS;
834 }
835
836 }
837
838 /*
839 * Take a snapshot for a specific fd
840 *
841 * Returns 0 on success, < 0 on error
842 */
843 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
844 struct lttng_consumer_stream *stream)
845 {
846 switch (consumer_data.type) {
847 case LTTNG_CONSUMER_KERNEL:
848 return lttng_kconsumer_take_snapshot(ctx, stream);
849 case LTTNG_CONSUMER32_UST:
850 case LTTNG_CONSUMER64_UST:
851 return lttng_ustconsumer_take_snapshot(ctx, stream);
852 default:
853 ERR("Unknown consumer_data type");
854 assert(0);
855 return -ENOSYS;
856 }
857
858 }
859
860 /*
861 * Get the produced position
862 *
863 * Returns 0 on success, < 0 on error
864 */
865 int lttng_consumer_get_produced_snapshot(
866 struct lttng_consumer_local_data *ctx,
867 struct lttng_consumer_stream *stream,
868 unsigned long *pos)
869 {
870 switch (consumer_data.type) {
871 case LTTNG_CONSUMER_KERNEL:
872 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
873 case LTTNG_CONSUMER32_UST:
874 case LTTNG_CONSUMER64_UST:
875 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
876 default:
877 ERR("Unknown consumer_data type");
878 assert(0);
879 return -ENOSYS;
880 }
881 }
882
883 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
884 int sock, struct pollfd *consumer_sockpoll)
885 {
886 switch (consumer_data.type) {
887 case LTTNG_CONSUMER_KERNEL:
888 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
889 case LTTNG_CONSUMER32_UST:
890 case LTTNG_CONSUMER64_UST:
891 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
892 default:
893 ERR("Unknown consumer_data type");
894 assert(0);
895 return -ENOSYS;
896 }
897 }
898
899 /*
900 * This thread polls the fds in the set to consume the data and write
901 * it to tracefile if necessary.
902 */
903 void *lttng_consumer_thread_poll_fds(void *data)
904 {
905 int num_rdy, num_hup, high_prio, ret, i;
906 struct pollfd *pollfd = NULL;
907 /* local view of the streams */
908 struct lttng_consumer_stream **local_stream = NULL;
909 /* local view of consumer_data.fds_count */
910 int nb_fd = 0;
911 char tmp;
912 int tmp2;
913 struct lttng_consumer_local_data *ctx = data;
914
915 rcu_register_thread();
916
917 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
918
919 while (1) {
920 high_prio = 0;
921 num_hup = 0;
922
923 /*
924 * the fds set has been updated, we need to update our
925 * local array as well
926 */
927 pthread_mutex_lock(&consumer_data.lock);
928 if (consumer_data.need_update) {
929 if (pollfd != NULL) {
930 free(pollfd);
931 pollfd = NULL;
932 }
933 if (local_stream != NULL) {
934 free(local_stream);
935 local_stream = NULL;
936 }
937
938 /* allocate for all fds + 1 for the consumer_poll_pipe */
939 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
940 if (pollfd == NULL) {
941 perror("pollfd malloc");
942 pthread_mutex_unlock(&consumer_data.lock);
943 goto end;
944 }
945
946 /* allocate for all fds + 1 for the consumer_poll_pipe */
947 local_stream = zmalloc((consumer_data.stream_count + 1) *
948 sizeof(struct lttng_consumer_stream));
949 if (local_stream == NULL) {
950 perror("local_stream malloc");
951 pthread_mutex_unlock(&consumer_data.lock);
952 goto end;
953 }
954 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
955 if (ret < 0) {
956 ERR("Error in allocating pollfd or local_outfds");
957 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
958 pthread_mutex_unlock(&consumer_data.lock);
959 goto end;
960 }
961 nb_fd = ret;
962 consumer_data.need_update = 0;
963 }
964 pthread_mutex_unlock(&consumer_data.lock);
965
966 /* No FDs and consumer_quit, consumer_cleanup the thread */
967 if (nb_fd == 0 && consumer_quit == 1) {
968 goto end;
969 }
970 /* poll on the array of fds */
971 restart:
972 DBG("polling on %d fd", nb_fd + 1);
973 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
974 DBG("poll num_rdy : %d", num_rdy);
975 if (num_rdy == -1) {
976 /*
977 * Restart interrupted system call.
978 */
979 if (errno == EINTR) {
980 goto restart;
981 }
982 perror("Poll error");
983 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
984 goto end;
985 } else if (num_rdy == 0) {
986 DBG("Polling thread timed out");
987 goto end;
988 }
989
990 /*
991 * If the consumer_poll_pipe triggered poll go
992 * directly to the beginning of the loop to update the
993 * array. We want to prioritize array update over
994 * low-priority reads.
995 */
996 if (pollfd[nb_fd].revents & POLLIN) {
997 DBG("consumer_poll_pipe wake up");
998 tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
999 if (tmp2 < 0) {
1000 perror("read consumer poll");
1001 }
1002 continue;
1003 }
1004
1005 /* Take care of high priority channels first. */
1006 for (i = 0; i < nb_fd; i++) {
1007 if (pollfd[i].revents & POLLPRI) {
1008 ssize_t len;
1009
1010 DBG("Urgent read on fd %d", pollfd[i].fd);
1011 high_prio = 1;
1012 len = ctx->on_buffer_ready(local_stream[i], ctx);
1013 /* it's ok to have an unavailable sub-buffer */
1014 if (len < 0 && len != -EAGAIN) {
1015 goto end;
1016 } else if (len > 0) {
1017 local_stream[i]->data_read = 1;
1018 }
1019 }
1020 }
1021
1022 /*
1023 * If we read high prio channel in this loop, try again
1024 * for more high prio data.
1025 */
1026 if (high_prio) {
1027 continue;
1028 }
1029
1030 /* Take care of low priority channels. */
1031 for (i = 0; i < nb_fd; i++) {
1032 if ((pollfd[i].revents & POLLIN) ||
1033 local_stream[i]->hangup_flush_done) {
1034 ssize_t len;
1035
1036 DBG("Normal read on fd %d", pollfd[i].fd);
1037 len = ctx->on_buffer_ready(local_stream[i], ctx);
1038 /* it's ok to have an unavailable sub-buffer */
1039 if (len < 0 && len != -EAGAIN) {
1040 goto end;
1041 } else if (len > 0) {
1042 local_stream[i]->data_read = 1;
1043 }
1044 }
1045 }
1046
1047 /* Handle hangup and errors */
1048 for (i = 0; i < nb_fd; i++) {
1049 if (!local_stream[i]->hangup_flush_done
1050 && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
1051 && (consumer_data.type == LTTNG_CONSUMER32_UST
1052 || consumer_data.type == LTTNG_CONSUMER64_UST)) {
1053 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1054 pollfd[i].fd);
1055 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
1056 /* Attempt read again, for the data we just flushed. */
1057 local_stream[i]->data_read = 1;
1058 }
1059 /*
1060 * If the poll flag is HUP/ERR/NVAL and we have
1061 * read no data in this pass, we can remove the
1062 * stream from its hash table.
1063 */
1064 if ((pollfd[i].revents & POLLHUP)) {
1065 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
1066 if (!local_stream[i]->data_read) {
1067 consumer_del_stream(local_stream[i]);
1068 num_hup++;
1069 }
1070 } else if (pollfd[i].revents & POLLERR) {
1071 ERR("Error returned in polling fd %d.", pollfd[i].fd);
1072 if (!local_stream[i]->data_read) {
1073 consumer_del_stream(local_stream[i]);
1074 num_hup++;
1075 }
1076 } else if (pollfd[i].revents & POLLNVAL) {
1077 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
1078 if (!local_stream[i]->data_read) {
1079 consumer_del_stream(local_stream[i]);
1080 num_hup++;
1081 }
1082 }
1083 local_stream[i]->data_read = 0;
1084 }
1085 }
1086 end:
1087 DBG("polling thread exiting");
1088 if (pollfd != NULL) {
1089 free(pollfd);
1090 pollfd = NULL;
1091 }
1092 if (local_stream != NULL) {
1093 free(local_stream);
1094 local_stream = NULL;
1095 }
1096 rcu_unregister_thread();
1097 return NULL;
1098 }
1099
1100 /*
1101 * This thread listens on the consumerd socket and receives the file
1102 * descriptors from the session daemon.
1103 */
1104 void *lttng_consumer_thread_receive_fds(void *data)
1105 {
1106 int sock, client_socket, ret;
1107 /*
1108 * structure to poll for incoming data on communication socket avoids
1109 * making blocking sockets.
1110 */
1111 struct pollfd consumer_sockpoll[2];
1112 struct lttng_consumer_local_data *ctx = data;
1113
1114 rcu_register_thread();
1115
1116 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
1117 unlink(ctx->consumer_command_sock_path);
1118 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
1119 if (client_socket < 0) {
1120 ERR("Cannot create command socket");
1121 goto end;
1122 }
1123
1124 ret = lttcomm_listen_unix_sock(client_socket);
1125 if (ret < 0) {
1126 goto end;
1127 }
1128
1129 DBG("Sending ready command to lttng-sessiond");
1130 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
1131 /* return < 0 on error, but == 0 is not fatal */
1132 if (ret < 0) {
1133 ERR("Error sending ready command to lttng-sessiond");
1134 goto end;
1135 }
1136
1137 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
1138 if (ret < 0) {
1139 perror("fcntl O_NONBLOCK");
1140 goto end;
1141 }
1142
1143 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1144 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
1145 consumer_sockpoll[0].events = POLLIN | POLLPRI;
1146 consumer_sockpoll[1].fd = client_socket;
1147 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1148
1149 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1150 goto end;
1151 }
1152 DBG("Connection on client_socket");
1153
1154 /* Blocking call, waiting for transmission */
1155 sock = lttcomm_accept_unix_sock(client_socket);
1156 if (sock <= 0) {
1157 WARN("On accept");
1158 goto end;
1159 }
1160 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
1161 if (ret < 0) {
1162 perror("fcntl O_NONBLOCK");
1163 goto end;
1164 }
1165
1166 /* update the polling structure to poll on the established socket */
1167 consumer_sockpoll[1].fd = sock;
1168 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1169
1170 while (1) {
1171 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1172 goto end;
1173 }
1174 DBG("Incoming command on sock");
1175 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1176 if (ret == -ENOENT) {
1177 DBG("Received STOP command");
1178 goto end;
1179 }
1180 if (ret < 0) {
1181 ERR("Communication interrupted on command socket");
1182 goto end;
1183 }
1184 if (consumer_quit) {
1185 DBG("consumer_thread_receive_fds received quit from signal");
1186 goto end;
1187 }
1188 DBG("received fds on sock");
1189 }
1190 end:
1191 DBG("consumer_thread_receive_fds exiting");
1192
1193 /*
1194 * when all fds have hung up, the polling thread
1195 * can exit cleanly
1196 */
1197 consumer_quit = 1;
1198
1199 /*
1200 * 2s of grace period, if no polling events occur during
1201 * this period, the polling thread will exit even if there
1202 * are still open FDs (should not happen, but safety mechanism).
1203 */
1204 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1205
1206 /* wake up the polling thread */
1207 ret = write(ctx->consumer_poll_pipe[1], "4", 1);
1208 if (ret < 0) {
1209 perror("poll pipe write");
1210 }
1211 rcu_unregister_thread();
1212 return NULL;
1213 }
1214
1215 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
1216 struct lttng_consumer_local_data *ctx)
1217 {
1218 switch (consumer_data.type) {
1219 case LTTNG_CONSUMER_KERNEL:
1220 return lttng_kconsumer_read_subbuffer(stream, ctx);
1221 case LTTNG_CONSUMER32_UST:
1222 case LTTNG_CONSUMER64_UST:
1223 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1224 default:
1225 ERR("Unknown consumer_data type");
1226 assert(0);
1227 return -ENOSYS;
1228 }
1229 }
1230
1231 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1232 {
1233 switch (consumer_data.type) {
1234 case LTTNG_CONSUMER_KERNEL:
1235 return lttng_kconsumer_on_recv_stream(stream);
1236 case LTTNG_CONSUMER32_UST:
1237 case LTTNG_CONSUMER64_UST:
1238 return lttng_ustconsumer_on_recv_stream(stream);
1239 default:
1240 ERR("Unknown consumer_data type");
1241 assert(0);
1242 return -ENOSYS;
1243 }
1244 }
1245
1246 /*
1247 * Allocate and set consumer data hash tables.
1248 */
1249 void lttng_consumer_init(void)
1250 {
1251 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1252 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1253 }
1254
This page took 0.084655 seconds and 4 git commands to generate.