Remove application from socket hash table upon unregister
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
22#include <fcntl.h>
23#include <poll.h>
24#include <pthread.h>
25#include <stdlib.h>
26#include <string.h>
27#include <sys/mman.h>
28#include <sys/socket.h>
29#include <sys/types.h>
30#include <unistd.h>
31
32#include <lttng-kernel-ctl.h>
33#include <lttng-sessiond-comm.h>
34#include <lttng/lttng-consumer.h>
35#include <lttng/lttng-kconsumer.h>
36#include <lttng/lttng-ustconsumer.h>
37#include <lttngerr.h>
38
39struct lttng_consumer_global_data consumer_data = {
40 .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
41 .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
42 .stream_count = 0,
43 .need_update = 1,
44 .type = LTTNG_CONSUMER_UNKNOWN,
45};
46
47/* timeout parameter, to control the polling thread grace period. */
48int consumer_poll_timeout = -1;
49
50/*
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
54 * polling threads.
55 */
56volatile int consumer_quit = 0;
57
58/*
59 * Find a stream. The consumer_data.lock must be locked during this
60 * call.
61 */
62static struct lttng_consumer_stream *consumer_find_stream(int key)
63{
64 struct lttng_consumer_stream *iter;
65
7ad0a0cb
MD
66 /* Negative keys are lookup failures */
67 if (key < 0)
68 return NULL;
3bd1e081
MD
69 cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
70 if (iter->key == key) {
71 DBG("Found stream key %d", key);
72 return iter;
73 }
74 }
75 return NULL;
76}
77
7ad0a0cb
MD
78static void consumer_steal_stream_key(int key)
79{
80 struct lttng_consumer_stream *stream;
81
82 stream = consumer_find_stream(key);
83 if (stream)
84 stream->key = -1;
85}
86
3bd1e081
MD
87static struct lttng_consumer_channel *consumer_find_channel(int key)
88{
89 struct lttng_consumer_channel *iter;
90
7ad0a0cb
MD
91 /* Negative keys are lookup failures */
92 if (key < 0)
93 return NULL;
3bd1e081
MD
94 cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
95 if (iter->key == key) {
96 DBG("Found channel key %d", key);
97 return iter;
98 }
99 }
100 return NULL;
101}
102
7ad0a0cb
MD
103static void consumer_steal_channel_key(int key)
104{
105 struct lttng_consumer_channel *channel;
106
107 channel = consumer_find_channel(key);
108 if (channel)
109 channel->key = -1;
110}
111
3bd1e081
MD
112/*
113 * Remove a stream from the global list protected by a mutex. This
114 * function is also responsible for freeing its data structures.
115 */
116void consumer_del_stream(struct lttng_consumer_stream *stream)
117{
118 int ret;
119 struct lttng_consumer_channel *free_chan = NULL;
120
121 pthread_mutex_lock(&consumer_data.lock);
122
123 switch (consumer_data.type) {
124 case LTTNG_CONSUMER_KERNEL:
125 if (stream->mmap_base != NULL) {
126 ret = munmap(stream->mmap_base, stream->mmap_len);
127 if (ret != 0) {
128 perror("munmap");
129 }
130 }
131 break;
7753dea8
MD
132 case LTTNG_CONSUMER32_UST:
133 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
134 lttng_ustconsumer_del_stream(stream);
135 break;
136 default:
137 ERR("Unknown consumer_data type");
138 assert(0);
139 goto end;
140 }
141
142 cds_list_del(&stream->list);
143 if (consumer_data.stream_count <= 0) {
144 goto end;
145 }
146 consumer_data.stream_count--;
147 if (!stream) {
148 goto end;
149 }
150 if (stream->out_fd >= 0) {
151 close(stream->out_fd);
152 }
b5c5fc29 153 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
3bd1e081
MD
154 close(stream->wait_fd);
155 }
b5c5fc29
MD
156 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd
157 && !stream->shm_fd_is_copy) {
3bd1e081
MD
158 close(stream->shm_fd);
159 }
160 if (!--stream->chan->refcount)
161 free_chan = stream->chan;
162 free(stream);
163end:
164 consumer_data.need_update = 1;
165 pthread_mutex_unlock(&consumer_data.lock);
166
167 if (free_chan)
168 consumer_del_channel(free_chan);
169}
170
171struct lttng_consumer_stream *consumer_allocate_stream(
172 int channel_key, int stream_key,
173 int shm_fd, int wait_fd,
174 enum lttng_consumer_stream_state state,
175 uint64_t mmap_len,
176 enum lttng_event_output output,
6df2e2c9
MD
177 const char *path_name,
178 uid_t uid,
179 gid_t gid)
3bd1e081
MD
180{
181 struct lttng_consumer_stream *stream;
182 int ret;
183
effcf122 184 stream = zmalloc(sizeof(*stream));
3bd1e081
MD
185 if (stream == NULL) {
186 perror("malloc struct lttng_consumer_stream");
187 goto end;
188 }
189 stream->chan = consumer_find_channel(channel_key);
190 if (!stream->chan) {
191 perror("Unable to find channel key");
192 goto end;
193 }
194 stream->chan->refcount++;
195 stream->key = stream_key;
196 stream->shm_fd = shm_fd;
197 stream->wait_fd = wait_fd;
198 stream->out_fd = -1;
199 stream->out_fd_offset = 0;
200 stream->state = state;
201 stream->mmap_len = mmap_len;
202 stream->mmap_base = NULL;
203 stream->output = output;
6df2e2c9
MD
204 stream->uid = uid;
205 stream->gid = gid;
3bd1e081
MD
206 strncpy(stream->path_name, path_name, PATH_MAX - 1);
207 stream->path_name[PATH_MAX - 1] = '\0';
208
209 switch (consumer_data.type) {
210 case LTTNG_CONSUMER_KERNEL:
211 break;
7753dea8
MD
212 case LTTNG_CONSUMER32_UST:
213 case LTTNG_CONSUMER64_UST:
5af2f756 214 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
215 ret = lttng_ustconsumer_allocate_stream(stream);
216 if (ret) {
217 free(stream);
218 return NULL;
219 }
220 break;
221 default:
222 ERR("Unknown consumer_data type");
223 assert(0);
224 goto end;
225 }
226 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
227 stream->path_name, stream->key,
228 stream->shm_fd,
229 stream->wait_fd,
230 (unsigned long long) stream->mmap_len,
231 stream->out_fd);
232end:
233 return stream;
234}
235
236/*
237 * Add a stream to the global list protected by a mutex.
238 */
239int consumer_add_stream(struct lttng_consumer_stream *stream)
240{
241 int ret = 0;
242
243 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
244 /* Steal stream identifier, for UST */
245 consumer_steal_stream_key(stream->key);
3bd1e081
MD
246 cds_list_add(&stream->list, &consumer_data.stream_list.head);
247 consumer_data.stream_count++;
248 consumer_data.need_update = 1;
249
250 switch (consumer_data.type) {
251 case LTTNG_CONSUMER_KERNEL:
252 break;
7753dea8
MD
253 case LTTNG_CONSUMER32_UST:
254 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
255 /* Streams are in CPU number order (we rely on this) */
256 stream->cpu = stream->chan->nr_streams++;
257 break;
258 default:
259 ERR("Unknown consumer_data type");
260 assert(0);
261 goto end;
262 }
263
264end:
265 pthread_mutex_unlock(&consumer_data.lock);
266 return ret;
267}
268
269/*
270 * Update a stream according to what we just received.
271 */
272void consumer_change_stream_state(int stream_key,
273 enum lttng_consumer_stream_state state)
274{
275 struct lttng_consumer_stream *stream;
276
277 pthread_mutex_lock(&consumer_data.lock);
278 stream = consumer_find_stream(stream_key);
279 if (stream) {
280 stream->state = state;
281 }
282 consumer_data.need_update = 1;
283 pthread_mutex_unlock(&consumer_data.lock);
284}
285
286/*
287 * Remove a channel from the global list protected by a mutex. This
288 * function is also responsible for freeing its data structures.
289 */
290void consumer_del_channel(struct lttng_consumer_channel *channel)
291{
292 int ret;
293
294 pthread_mutex_lock(&consumer_data.lock);
295
296 switch (consumer_data.type) {
297 case LTTNG_CONSUMER_KERNEL:
298 break;
7753dea8
MD
299 case LTTNG_CONSUMER32_UST:
300 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
301 lttng_ustconsumer_del_channel(channel);
302 break;
303 default:
304 ERR("Unknown consumer_data type");
305 assert(0);
306 goto end;
307 }
308
309 cds_list_del(&channel->list);
310 if (channel->mmap_base != NULL) {
311 ret = munmap(channel->mmap_base, channel->mmap_len);
312 if (ret != 0) {
313 perror("munmap");
314 }
315 }
b5c5fc29 316 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
3bd1e081
MD
317 close(channel->wait_fd);
318 }
b5c5fc29
MD
319 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd
320 && !channel->shm_fd_is_copy) {
3bd1e081
MD
321 close(channel->shm_fd);
322 }
323 free(channel);
324end:
325 pthread_mutex_unlock(&consumer_data.lock);
326}
327
328struct lttng_consumer_channel *consumer_allocate_channel(
329 int channel_key,
330 int shm_fd, int wait_fd,
331 uint64_t mmap_len,
332 uint64_t max_sb_size)
333{
334 struct lttng_consumer_channel *channel;
335 int ret;
336
276b26d1 337 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
338 if (channel == NULL) {
339 perror("malloc struct lttng_consumer_channel");
340 goto end;
341 }
342 channel->key = channel_key;
343 channel->shm_fd = shm_fd;
344 channel->wait_fd = wait_fd;
345 channel->mmap_len = mmap_len;
346 channel->max_sb_size = max_sb_size;
347 channel->refcount = 0;
348 channel->nr_streams = 0;
349
350 switch (consumer_data.type) {
351 case LTTNG_CONSUMER_KERNEL:
352 channel->mmap_base = NULL;
353 channel->mmap_len = 0;
354 break;
7753dea8
MD
355 case LTTNG_CONSUMER32_UST:
356 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
357 ret = lttng_ustconsumer_allocate_channel(channel);
358 if (ret) {
359 free(channel);
360 return NULL;
361 }
362 break;
363 default:
364 ERR("Unknown consumer_data type");
365 assert(0);
366 goto end;
367 }
368 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
369 channel->key,
370 channel->shm_fd,
371 channel->wait_fd,
372 (unsigned long long) channel->mmap_len,
373 (unsigned long long) channel->max_sb_size);
374end:
375 return channel;
376}
377
378/*
379 * Add a channel to the global list protected by a mutex.
380 */
381int consumer_add_channel(struct lttng_consumer_channel *channel)
382{
3bd1e081 383 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
384 /* Steal channel identifier, for UST */
385 consumer_steal_channel_key(channel->key);
3bd1e081 386 cds_list_add(&channel->list, &consumer_data.channel_list.head);
3bd1e081 387 pthread_mutex_unlock(&consumer_data.lock);
7ad0a0cb 388 return 0;
3bd1e081
MD
389}
390
391/*
392 * Allocate the pollfd structure and the local view of the out fds to avoid
393 * doing a lookup in the linked list and concurrency issues when writing is
394 * needed. Called with consumer_data.lock held.
395 *
396 * Returns the number of fds in the structures.
397 */
398int consumer_update_poll_array(
399 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
400 struct lttng_consumer_stream **local_stream)
401{
402 struct lttng_consumer_stream *iter;
403 int i = 0;
404
405 DBG("Updating poll fd array");
406 cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
407 if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
408 continue;
409 }
410 DBG("Active FD %d", iter->wait_fd);
411 (*pollfd)[i].fd = iter->wait_fd;
412 (*pollfd)[i].events = POLLIN | POLLPRI;
413 local_stream[i] = iter;
414 i++;
415 }
416
417 /*
418 * Insert the consumer_poll_pipe at the end of the array and don't
419 * increment i so nb_fd is the number of real FD.
420 */
421 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
422 (*pollfd)[i].events = POLLIN;
423 return i;
424}
425
426/*
427 * Poll on the should_quit pipe and the command socket return -1 on error and
428 * should exit, 0 if data is available on the command socket
429 */
430int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
431{
432 int num_rdy;
433
434 num_rdy = poll(consumer_sockpoll, 2, -1);
435 if (num_rdy == -1) {
436 perror("Poll error");
437 goto exit;
438 }
439 if (consumer_sockpoll[0].revents == POLLIN) {
440 DBG("consumer_should_quit wake up");
441 goto exit;
442 }
443 return 0;
444
445exit:
446 return -1;
447}
448
449/*
450 * Set the error socket.
451 */
452void lttng_consumer_set_error_sock(
453 struct lttng_consumer_local_data *ctx, int sock)
454{
455 ctx->consumer_error_socket = sock;
456}
457
458/*
459 * Set the command socket path.
460 */
461
462void lttng_consumer_set_command_sock_path(
463 struct lttng_consumer_local_data *ctx, char *sock)
464{
465 ctx->consumer_command_sock_path = sock;
466}
467
468/*
469 * Send return code to the session daemon.
470 * If the socket is not defined, we return 0, it is not a fatal error
471 */
472int lttng_consumer_send_error(
473 struct lttng_consumer_local_data *ctx, int cmd)
474{
475 if (ctx->consumer_error_socket > 0) {
476 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
477 sizeof(enum lttcomm_sessiond_command));
478 }
479
480 return 0;
481}
482
483/*
484 * Close all the tracefiles and stream fds, should be called when all instances
485 * are destroyed.
486 */
487void lttng_consumer_cleanup(void)
488{
489 struct lttng_consumer_stream *iter, *tmp;
490 struct lttng_consumer_channel *citer, *ctmp;
491
492 /*
493 * close all outfd. Called when there are no more threads
494 * running (after joining on the threads), no need to protect
495 * list iteration with mutex.
496 */
497 cds_list_for_each_entry_safe(iter, tmp,
498 &consumer_data.stream_list.head, list) {
499 consumer_del_stream(iter);
500 }
501 cds_list_for_each_entry_safe(citer, ctmp,
502 &consumer_data.channel_list.head, list) {
503 consumer_del_channel(citer);
504 }
505}
506
507/*
508 * Called from signal handler.
509 */
510void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
511{
512 int ret;
513 consumer_quit = 1;
514 ret = write(ctx->consumer_should_quit[1], "4", 1);
515 if (ret < 0) {
516 perror("write consumer quit");
517 }
518}
519
520void lttng_consumer_sync_trace_file(
521 struct lttng_consumer_stream *stream, off_t orig_offset)
522{
523 int outfd = stream->out_fd;
524
525 /*
526 * This does a blocking write-and-wait on any page that belongs to the
527 * subbuffer prior to the one we just wrote.
528 * Don't care about error values, as these are just hints and ways to
529 * limit the amount of page cache used.
530 */
531 if (orig_offset < stream->chan->max_sb_size) {
532 return;
533 }
534 sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
535 stream->chan->max_sb_size,
536 SYNC_FILE_RANGE_WAIT_BEFORE
537 | SYNC_FILE_RANGE_WRITE
538 | SYNC_FILE_RANGE_WAIT_AFTER);
539 /*
540 * Give hints to the kernel about how we access the file:
541 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
542 * we write it.
543 *
544 * We need to call fadvise again after the file grows because the
545 * kernel does not seem to apply fadvise to non-existing parts of the
546 * file.
547 *
548 * Call fadvise _after_ having waited for the page writeback to
549 * complete because the dirty page writeback semantic is not well
550 * defined. So it can be expected to lead to lower throughput in
551 * streaming.
552 */
553 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
554 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
555}
556
557/*
558 * Initialise the necessary environnement :
559 * - create a new context
560 * - create the poll_pipe
561 * - create the should_quit pipe (for signal handler)
562 * - create the thread pipe (for splice)
563 *
564 * Takes a function pointer as argument, this function is called when data is
565 * available on a buffer. This function is responsible to do the
566 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
567 * buffer configuration and then kernctl_put_next_subbuf at the end.
568 *
569 * Returns a pointer to the new context or NULL on error.
570 */
571struct lttng_consumer_local_data *lttng_consumer_create(
572 enum lttng_consumer_type type,
d41f73b7
MD
573 int (*buffer_ready)(struct lttng_consumer_stream *stream,
574 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
575 int (*recv_channel)(struct lttng_consumer_channel *channel),
576 int (*recv_stream)(struct lttng_consumer_stream *stream),
577 int (*update_stream)(int stream_key, uint32_t state))
578{
579 int ret, i;
580 struct lttng_consumer_local_data *ctx;
581
582 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
583 consumer_data.type == type);
584 consumer_data.type = type;
585
effcf122 586 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081
MD
587 if (ctx == NULL) {
588 perror("allocating context");
589 goto error;
590 }
591
592 ctx->consumer_error_socket = -1;
593 /* assign the callbacks */
594 ctx->on_buffer_ready = buffer_ready;
595 ctx->on_recv_channel = recv_channel;
596 ctx->on_recv_stream = recv_stream;
597 ctx->on_update_stream = update_stream;
598
599 ret = pipe(ctx->consumer_poll_pipe);
600 if (ret < 0) {
601 perror("Error creating poll pipe");
602 goto error_poll_pipe;
603 }
604
605 ret = pipe(ctx->consumer_should_quit);
606 if (ret < 0) {
607 perror("Error creating recv pipe");
608 goto error_quit_pipe;
609 }
610
611 ret = pipe(ctx->consumer_thread_pipe);
612 if (ret < 0) {
613 perror("Error creating thread pipe");
614 goto error_thread_pipe;
615 }
616
617 return ctx;
618
619
620error_thread_pipe:
621 for (i = 0; i < 2; i++) {
622 int err;
623
624 err = close(ctx->consumer_should_quit[i]);
625 assert(!err);
626 }
627error_quit_pipe:
628 for (i = 0; i < 2; i++) {
629 int err;
630
631 err = close(ctx->consumer_poll_pipe[i]);
632 assert(!err);
633 }
634error_poll_pipe:
635 free(ctx);
636error:
637 return NULL;
638}
639
640/*
641 * Close all fds associated with the instance and free the context.
642 */
643void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
644{
645 close(ctx->consumer_error_socket);
646 close(ctx->consumer_thread_pipe[0]);
647 close(ctx->consumer_thread_pipe[1]);
648 close(ctx->consumer_poll_pipe[0]);
649 close(ctx->consumer_poll_pipe[1]);
650 close(ctx->consumer_should_quit[0]);
651 close(ctx->consumer_should_quit[1]);
652 unlink(ctx->consumer_command_sock_path);
653 free(ctx);
654}
655
656/*
657 * Mmap the ring buffer, read it and write the data to the tracefile.
658 *
659 * Returns the number of bytes written
660 */
661int lttng_consumer_on_read_subbuffer_mmap(
662 struct lttng_consumer_local_data *ctx,
663 struct lttng_consumer_stream *stream, unsigned long len)
664{
665 switch (consumer_data.type) {
666 case LTTNG_CONSUMER_KERNEL:
667 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
7753dea8
MD
668 case LTTNG_CONSUMER32_UST:
669 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
670 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
671 default:
672 ERR("Unknown consumer_data type");
673 assert(0);
674 }
675}
676
677/*
678 * Splice the data from the ring buffer to the tracefile.
679 *
680 * Returns the number of bytes spliced.
681 */
682int lttng_consumer_on_read_subbuffer_splice(
683 struct lttng_consumer_local_data *ctx,
684 struct lttng_consumer_stream *stream, unsigned long len)
685{
686 switch (consumer_data.type) {
687 case LTTNG_CONSUMER_KERNEL:
688 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
7753dea8
MD
689 case LTTNG_CONSUMER32_UST:
690 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
691 return -ENOSYS;
692 default:
693 ERR("Unknown consumer_data type");
694 assert(0);
695 return -ENOSYS;
696 }
697
698}
699
700/*
701 * Take a snapshot for a specific fd
702 *
703 * Returns 0 on success, < 0 on error
704 */
705int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
706 struct lttng_consumer_stream *stream)
707{
708 switch (consumer_data.type) {
709 case LTTNG_CONSUMER_KERNEL:
710 return lttng_kconsumer_take_snapshot(ctx, stream);
7753dea8
MD
711 case LTTNG_CONSUMER32_UST:
712 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
713 return lttng_ustconsumer_take_snapshot(ctx, stream);
714 default:
715 ERR("Unknown consumer_data type");
716 assert(0);
717 return -ENOSYS;
718 }
719
720}
721
722/*
723 * Get the produced position
724 *
725 * Returns 0 on success, < 0 on error
726 */
727int lttng_consumer_get_produced_snapshot(
728 struct lttng_consumer_local_data *ctx,
729 struct lttng_consumer_stream *stream,
730 unsigned long *pos)
731{
732 switch (consumer_data.type) {
733 case LTTNG_CONSUMER_KERNEL:
734 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
7753dea8
MD
735 case LTTNG_CONSUMER32_UST:
736 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
737 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
738 default:
739 ERR("Unknown consumer_data type");
740 assert(0);
741 return -ENOSYS;
742 }
743}
744
745int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
746 int sock, struct pollfd *consumer_sockpoll)
747{
748 switch (consumer_data.type) {
749 case LTTNG_CONSUMER_KERNEL:
750 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
751 case LTTNG_CONSUMER32_UST:
752 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
753 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
754 default:
755 ERR("Unknown consumer_data type");
756 assert(0);
757 return -ENOSYS;
758 }
759}
760
761/*
762 * This thread polls the fds in the ltt_fd_list to consume the data and write
763 * it to tracefile if necessary.
764 */
765void *lttng_consumer_thread_poll_fds(void *data)
766{
767 int num_rdy, num_hup, high_prio, ret, i;
768 struct pollfd *pollfd = NULL;
769 /* local view of the streams */
770 struct lttng_consumer_stream **local_stream = NULL;
771 /* local view of consumer_data.fds_count */
772 int nb_fd = 0;
773 char tmp;
774 int tmp2;
775 struct lttng_consumer_local_data *ctx = data;
776
effcf122 777 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
778
779 while (1) {
780 high_prio = 0;
781 num_hup = 0;
782
783 /*
784 * the ltt_fd_list has been updated, we need to update our
785 * local array as well
786 */
787 pthread_mutex_lock(&consumer_data.lock);
788 if (consumer_data.need_update) {
789 if (pollfd != NULL) {
790 free(pollfd);
791 pollfd = NULL;
792 }
793 if (local_stream != NULL) {
794 free(local_stream);
795 local_stream = NULL;
796 }
797
798 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 799 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081
MD
800 if (pollfd == NULL) {
801 perror("pollfd malloc");
802 pthread_mutex_unlock(&consumer_data.lock);
803 goto end;
804 }
805
806 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 807 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
808 sizeof(struct lttng_consumer_stream));
809 if (local_stream == NULL) {
810 perror("local_stream malloc");
811 pthread_mutex_unlock(&consumer_data.lock);
812 goto end;
813 }
814 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
815 if (ret < 0) {
816 ERR("Error in allocating pollfd or local_outfds");
817 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
818 pthread_mutex_unlock(&consumer_data.lock);
819 goto end;
820 }
821 nb_fd = ret;
822 consumer_data.need_update = 0;
823 }
824 pthread_mutex_unlock(&consumer_data.lock);
825
826 /* poll on the array of fds */
827 DBG("polling on %d fd", nb_fd + 1);
828 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
829 DBG("poll num_rdy : %d", num_rdy);
830 if (num_rdy == -1) {
831 perror("Poll error");
832 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
833 goto end;
834 } else if (num_rdy == 0) {
835 DBG("Polling thread timed out");
836 goto end;
837 }
838
d41f73b7 839 /* No FDs and consumer_quit, consumer_cleanup the thread */
3bd1e081
MD
840 if (nb_fd == 0 && consumer_quit == 1) {
841 goto end;
842 }
843
844 /*
845 * If the consumer_poll_pipe triggered poll go
846 * directly to the beginning of the loop to update the
847 * array. We want to prioritize array update over
848 * low-priority reads.
849 */
d41f73b7 850 if (pollfd[nb_fd].revents & POLLIN) {
3bd1e081
MD
851 DBG("consumer_poll_pipe wake up");
852 tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
853 if (tmp2 < 0) {
d41f73b7 854 perror("read consumer poll");
3bd1e081
MD
855 }
856 continue;
857 }
858
859 /* Take care of high priority channels first. */
860 for (i = 0; i < nb_fd; i++) {
d41f73b7
MD
861 if (pollfd[i].revents & POLLPRI) {
862 DBG("Urgent read on fd %d", pollfd[i].fd);
863 high_prio = 1;
864 ret = ctx->on_buffer_ready(local_stream[i], ctx);
865 /* it's ok to have an unavailable sub-buffer */
866 if (ret == EAGAIN) {
867 ret = 0;
868 }
869 } else if (pollfd[i].revents & POLLERR) {
3bd1e081
MD
870 ERR("Error returned in polling fd %d.", pollfd[i].fd);
871 consumer_del_stream(local_stream[i]);
872 num_hup++;
d41f73b7
MD
873 } else if (pollfd[i].revents & POLLNVAL) {
874 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
3bd1e081
MD
875 consumer_del_stream(local_stream[i]);
876 num_hup++;
1c3c14ac
MD
877 } else if ((pollfd[i].revents & POLLHUP) &&
878 !(pollfd[i].revents & POLLIN)) {
7753dea8
MD
879 if (consumer_data.type == LTTNG_CONSUMER32_UST
880 || consumer_data.type == LTTNG_CONSUMER64_UST) {
d056b477
MD
881 DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
882 pollfd[i].fd);
883 if (!local_stream[i]->hangup_flush_done) {
884 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
effcf122
MD
885 /* read after flush */
886 do {
887 ret = ctx->on_buffer_ready(local_stream[i], ctx);
888 } while (ret == EAGAIN);
d056b477
MD
889 }
890 } else {
891 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
892 }
3bd1e081
MD
893 consumer_del_stream(local_stream[i]);
894 num_hup++;
3bd1e081
MD
895 }
896 }
897
898 /* If every buffer FD has hung up, we end the read loop here */
899 if (nb_fd > 0 && num_hup == nb_fd) {
900 DBG("every buffer FD has hung up\n");
901 if (consumer_quit == 1) {
902 goto end;
903 }
904 continue;
905 }
906
907 /* Take care of low priority channels. */
908 if (high_prio == 0) {
909 for (i = 0; i < nb_fd; i++) {
d41f73b7 910 if (pollfd[i].revents & POLLIN) {
3bd1e081 911 DBG("Normal read on fd %d", pollfd[i].fd);
d41f73b7 912 ret = ctx->on_buffer_ready(local_stream[i], ctx);
3bd1e081
MD
913 /* it's ok to have an unavailable subbuffer */
914 if (ret == EAGAIN) {
915 ret = 0;
916 }
917 }
918 }
919 }
920 }
921end:
922 DBG("polling thread exiting");
923 if (pollfd != NULL) {
924 free(pollfd);
925 pollfd = NULL;
926 }
927 if (local_stream != NULL) {
928 free(local_stream);
929 local_stream = NULL;
930 }
931 return NULL;
932}
933
934/*
935 * This thread listens on the consumerd socket and receives the file
936 * descriptors from the session daemon.
937 */
938void *lttng_consumer_thread_receive_fds(void *data)
939{
940 int sock, client_socket, ret;
941 /*
942 * structure to poll for incoming data on communication socket avoids
943 * making blocking sockets.
944 */
945 struct pollfd consumer_sockpoll[2];
946 struct lttng_consumer_local_data *ctx = data;
947
948 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
949 unlink(ctx->consumer_command_sock_path);
950 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
951 if (client_socket < 0) {
952 ERR("Cannot create command socket");
953 goto end;
954 }
955
956 ret = lttcomm_listen_unix_sock(client_socket);
957 if (ret < 0) {
958 goto end;
959 }
960
32258573 961 DBG("Sending ready command to lttng-sessiond");
3bd1e081
MD
962 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
963 /* return < 0 on error, but == 0 is not fatal */
964 if (ret < 0) {
32258573 965 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
966 goto end;
967 }
968
969 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
970 if (ret < 0) {
971 perror("fcntl O_NONBLOCK");
972 goto end;
973 }
974
975 /* prepare the FDs to poll : to client socket and the should_quit pipe */
976 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
977 consumer_sockpoll[0].events = POLLIN | POLLPRI;
978 consumer_sockpoll[1].fd = client_socket;
979 consumer_sockpoll[1].events = POLLIN | POLLPRI;
980
981 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
982 goto end;
983 }
984 DBG("Connection on client_socket");
985
986 /* Blocking call, waiting for transmission */
987 sock = lttcomm_accept_unix_sock(client_socket);
988 if (sock <= 0) {
989 WARN("On accept");
990 goto end;
991 }
992 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
993 if (ret < 0) {
994 perror("fcntl O_NONBLOCK");
995 goto end;
996 }
997
998 /* update the polling structure to poll on the established socket */
999 consumer_sockpoll[1].fd = sock;
1000 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1001
1002 while (1) {
1003 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1004 goto end;
1005 }
1006 DBG("Incoming command on sock");
1007 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1008 if (ret == -ENOENT) {
1009 DBG("Received STOP command");
1010 goto end;
1011 }
1012 if (ret < 0) {
1013 ERR("Communication interrupted on command socket");
1014 goto end;
1015 }
1016 if (consumer_quit) {
1017 DBG("consumer_thread_receive_fds received quit from signal");
1018 goto end;
1019 }
1020 DBG("received fds on sock");
1021 }
1022end:
1023 DBG("consumer_thread_receive_fds exiting");
1024
1025 /*
1026 * when all fds have hung up, the polling thread
1027 * can exit cleanly
1028 */
1029 consumer_quit = 1;
1030
1031 /*
1032 * 2s of grace period, if no polling events occur during
1033 * this period, the polling thread will exit even if there
1034 * are still open FDs (should not happen, but safety mechanism).
1035 */
1036 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1037
1038 /* wake up the polling thread */
1039 ret = write(ctx->consumer_poll_pipe[1], "4", 1);
1040 if (ret < 0) {
1041 perror("poll pipe write");
1042 }
1043 return NULL;
1044}
d41f73b7
MD
1045
1046int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
1047 struct lttng_consumer_local_data *ctx)
1048{
1049 switch (consumer_data.type) {
1050 case LTTNG_CONSUMER_KERNEL:
1051 return lttng_kconsumer_read_subbuffer(stream, ctx);
7753dea8
MD
1052 case LTTNG_CONSUMER32_UST:
1053 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1054 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1055 default:
1056 ERR("Unknown consumer_data type");
1057 assert(0);
1058 return -ENOSYS;
1059 }
1060}
1061
1062int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1063{
1064 switch (consumer_data.type) {
1065 case LTTNG_CONSUMER_KERNEL:
1066 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
1067 case LTTNG_CONSUMER32_UST:
1068 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1069 return lttng_ustconsumer_on_recv_stream(stream);
1070 default:
1071 ERR("Unknown consumer_data type");
1072 assert(0);
1073 return -ENOSYS;
1074 }
1075}
This page took 0.06396 seconds and 4 git commands to generate.