Add lttng hash table support to liblttng-consumer
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <fcntl.h>
23 #include <poll.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/mman.h>
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 #include <unistd.h>
31
32 #include <lttng-kernel-ctl.h>
33 #include <lttng-sessiond-comm.h>
34 #include <lttng/lttng-consumer.h>
35 #include <lttng/lttng-kconsumer.h>
36 #include <lttng/lttng-ustconsumer.h>
37 #include <lttngerr.h>
38
39 struct lttng_consumer_global_data consumer_data = {
40 .stream_count = 0,
41 .need_update = 1,
42 .type = LTTNG_CONSUMER_UNKNOWN,
43 };
44
45 /* timeout parameter, to control the polling thread grace period. */
46 int consumer_poll_timeout = -1;
47
48 /*
49 * Flag to inform the polling thread to quit when all fd hung up. Updated by
50 * the consumer_thread_receive_fds when it notices that all fds has hung up.
51 * Also updated by the signal handler (consumer_should_exit()). Read by the
52 * polling threads.
53 */
54 volatile int consumer_quit = 0;
55
56 /*
57 * Find a stream. The consumer_data.lock must be locked during this
58 * call.
59 */
60 static struct lttng_consumer_stream *consumer_find_stream(int key)
61 {
62 struct lttng_ht_iter iter;
63 struct lttng_ht_node_ulong *node;
64 struct lttng_consumer_stream *stream = NULL;
65
66 /* Negative keys are lookup failures */
67 if (key < 0)
68 return NULL;
69
70 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
71 &iter);
72 node = lttng_ht_iter_get_node_ulong(&iter);
73 if (node != NULL) {
74 stream = caa_container_of(node, struct lttng_consumer_stream, node);
75 }
76
77 return stream;
78 }
79
80 static void consumer_steal_stream_key(int key)
81 {
82 struct lttng_consumer_stream *stream;
83
84 stream = consumer_find_stream(key);
85 if (stream)
86 stream->key = -1;
87 }
88
89 static struct lttng_consumer_channel *consumer_find_channel(int key)
90 {
91 struct lttng_ht_iter iter;
92 struct lttng_ht_node_ulong *node;
93 struct lttng_consumer_channel *channel = NULL;
94
95 /* Negative keys are lookup failures */
96 if (key < 0)
97 return NULL;
98
99 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
100 &iter);
101 node = lttng_ht_iter_get_node_ulong(&iter);
102 if (node != NULL) {
103 channel = caa_container_of(node, struct lttng_consumer_channel, node);
104 }
105
106 return channel;
107 }
108
109 static void consumer_steal_channel_key(int key)
110 {
111 struct lttng_consumer_channel *channel;
112
113 channel = consumer_find_channel(key);
114 if (channel)
115 channel->key = -1;
116 }
117
118 /*
119 * Remove a stream from the global list protected by a mutex. This
120 * function is also responsible for freeing its data structures.
121 */
122 void consumer_del_stream(struct lttng_consumer_stream *stream)
123 {
124 int ret;
125 struct lttng_ht_iter iter;
126 struct lttng_consumer_channel *free_chan = NULL;
127
128 pthread_mutex_lock(&consumer_data.lock);
129
130 switch (consumer_data.type) {
131 case LTTNG_CONSUMER_KERNEL:
132 if (stream->mmap_base != NULL) {
133 ret = munmap(stream->mmap_base, stream->mmap_len);
134 if (ret != 0) {
135 perror("munmap");
136 }
137 }
138 break;
139 case LTTNG_CONSUMER32_UST:
140 case LTTNG_CONSUMER64_UST:
141 lttng_ustconsumer_del_stream(stream);
142 break;
143 default:
144 ERR("Unknown consumer_data type");
145 assert(0);
146 goto end;
147 }
148
149 /* Get stream node from hash table */
150 lttng_ht_lookup(consumer_data.stream_ht,
151 (void *)((unsigned long) stream->key), &iter);
152 /* Remove stream node from hash table */
153 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
154 assert(!ret);
155
156 if (consumer_data.stream_count <= 0) {
157 goto end;
158 }
159 consumer_data.stream_count--;
160 if (!stream) {
161 goto end;
162 }
163 if (stream->out_fd >= 0) {
164 close(stream->out_fd);
165 }
166 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
167 close(stream->wait_fd);
168 }
169 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd
170 && !stream->shm_fd_is_copy) {
171 close(stream->shm_fd);
172 }
173 if (!--stream->chan->refcount)
174 free_chan = stream->chan;
175 free(stream);
176 end:
177 consumer_data.need_update = 1;
178 pthread_mutex_unlock(&consumer_data.lock);
179
180 if (free_chan)
181 consumer_del_channel(free_chan);
182 }
183
184 struct lttng_consumer_stream *consumer_allocate_stream(
185 int channel_key, int stream_key,
186 int shm_fd, int wait_fd,
187 enum lttng_consumer_stream_state state,
188 uint64_t mmap_len,
189 enum lttng_event_output output,
190 const char *path_name,
191 uid_t uid,
192 gid_t gid)
193 {
194 struct lttng_consumer_stream *stream;
195 int ret;
196
197 stream = zmalloc(sizeof(*stream));
198 if (stream == NULL) {
199 perror("malloc struct lttng_consumer_stream");
200 goto end;
201 }
202 stream->chan = consumer_find_channel(channel_key);
203 if (!stream->chan) {
204 perror("Unable to find channel key");
205 goto end;
206 }
207 stream->chan->refcount++;
208 stream->key = stream_key;
209 stream->shm_fd = shm_fd;
210 stream->wait_fd = wait_fd;
211 stream->out_fd = -1;
212 stream->out_fd_offset = 0;
213 stream->state = state;
214 stream->mmap_len = mmap_len;
215 stream->mmap_base = NULL;
216 stream->output = output;
217 stream->uid = uid;
218 stream->gid = gid;
219 strncpy(stream->path_name, path_name, PATH_MAX - 1);
220 stream->path_name[PATH_MAX - 1] = '\0';
221 lttng_ht_node_init_ulong(&stream->node, stream->key);
222
223 switch (consumer_data.type) {
224 case LTTNG_CONSUMER_KERNEL:
225 break;
226 case LTTNG_CONSUMER32_UST:
227 case LTTNG_CONSUMER64_UST:
228 stream->cpu = stream->chan->cpucount++;
229 ret = lttng_ustconsumer_allocate_stream(stream);
230 if (ret) {
231 free(stream);
232 return NULL;
233 }
234 break;
235 default:
236 ERR("Unknown consumer_data type");
237 assert(0);
238 goto end;
239 }
240 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
241 stream->path_name, stream->key,
242 stream->shm_fd,
243 stream->wait_fd,
244 (unsigned long long) stream->mmap_len,
245 stream->out_fd);
246 end:
247 return stream;
248 }
249
250 /*
251 * Add a stream to the global list protected by a mutex.
252 */
253 int consumer_add_stream(struct lttng_consumer_stream *stream)
254 {
255 int ret = 0;
256
257 pthread_mutex_lock(&consumer_data.lock);
258 /* Steal stream identifier, for UST */
259 consumer_steal_stream_key(stream->key);
260 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
261 consumer_data.stream_count++;
262 consumer_data.need_update = 1;
263
264 switch (consumer_data.type) {
265 case LTTNG_CONSUMER_KERNEL:
266 break;
267 case LTTNG_CONSUMER32_UST:
268 case LTTNG_CONSUMER64_UST:
269 /* Streams are in CPU number order (we rely on this) */
270 stream->cpu = stream->chan->nr_streams++;
271 break;
272 default:
273 ERR("Unknown consumer_data type");
274 assert(0);
275 goto end;
276 }
277
278 end:
279 pthread_mutex_unlock(&consumer_data.lock);
280 return ret;
281 }
282
283 /*
284 * Update a stream according to what we just received.
285 */
286 void consumer_change_stream_state(int stream_key,
287 enum lttng_consumer_stream_state state)
288 {
289 struct lttng_consumer_stream *stream;
290
291 pthread_mutex_lock(&consumer_data.lock);
292 stream = consumer_find_stream(stream_key);
293 if (stream) {
294 stream->state = state;
295 }
296 consumer_data.need_update = 1;
297 pthread_mutex_unlock(&consumer_data.lock);
298 }
299
300 /*
301 * Remove a channel from the global list protected by a mutex. This
302 * function is also responsible for freeing its data structures.
303 */
304 void consumer_del_channel(struct lttng_consumer_channel *channel)
305 {
306 int ret;
307 struct lttng_ht_iter iter;
308
309 pthread_mutex_lock(&consumer_data.lock);
310
311 switch (consumer_data.type) {
312 case LTTNG_CONSUMER_KERNEL:
313 break;
314 case LTTNG_CONSUMER32_UST:
315 case LTTNG_CONSUMER64_UST:
316 lttng_ustconsumer_del_channel(channel);
317 break;
318 default:
319 ERR("Unknown consumer_data type");
320 assert(0);
321 goto end;
322 }
323
324 lttng_ht_lookup(consumer_data.channel_ht,
325 (void *)((unsigned long) channel->key), &iter);
326 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
327 assert(!ret);
328
329 if (channel->mmap_base != NULL) {
330 ret = munmap(channel->mmap_base, channel->mmap_len);
331 if (ret != 0) {
332 perror("munmap");
333 }
334 }
335 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
336 close(channel->wait_fd);
337 }
338 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd
339 && !channel->shm_fd_is_copy) {
340 close(channel->shm_fd);
341 }
342 free(channel);
343 end:
344 pthread_mutex_unlock(&consumer_data.lock);
345 }
346
347 struct lttng_consumer_channel *consumer_allocate_channel(
348 int channel_key,
349 int shm_fd, int wait_fd,
350 uint64_t mmap_len,
351 uint64_t max_sb_size)
352 {
353 struct lttng_consumer_channel *channel;
354 int ret;
355
356 channel = zmalloc(sizeof(*channel));
357 if (channel == NULL) {
358 perror("malloc struct lttng_consumer_channel");
359 goto end;
360 }
361 channel->key = channel_key;
362 channel->shm_fd = shm_fd;
363 channel->wait_fd = wait_fd;
364 channel->mmap_len = mmap_len;
365 channel->max_sb_size = max_sb_size;
366 channel->refcount = 0;
367 channel->nr_streams = 0;
368 lttng_ht_node_init_ulong(&channel->node, channel->key);
369
370 switch (consumer_data.type) {
371 case LTTNG_CONSUMER_KERNEL:
372 channel->mmap_base = NULL;
373 channel->mmap_len = 0;
374 break;
375 case LTTNG_CONSUMER32_UST:
376 case LTTNG_CONSUMER64_UST:
377 ret = lttng_ustconsumer_allocate_channel(channel);
378 if (ret) {
379 free(channel);
380 return NULL;
381 }
382 break;
383 default:
384 ERR("Unknown consumer_data type");
385 assert(0);
386 goto end;
387 }
388 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
389 channel->key,
390 channel->shm_fd,
391 channel->wait_fd,
392 (unsigned long long) channel->mmap_len,
393 (unsigned long long) channel->max_sb_size);
394 end:
395 return channel;
396 }
397
398 /*
399 * Add a channel to the global list protected by a mutex.
400 */
401 int consumer_add_channel(struct lttng_consumer_channel *channel)
402 {
403 pthread_mutex_lock(&consumer_data.lock);
404 /* Steal channel identifier, for UST */
405 consumer_steal_channel_key(channel->key);
406 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
407 pthread_mutex_unlock(&consumer_data.lock);
408 return 0;
409 }
410
411 /*
412 * Allocate the pollfd structure and the local view of the out fds to avoid
413 * doing a lookup in the linked list and concurrency issues when writing is
414 * needed. Called with consumer_data.lock held.
415 *
416 * Returns the number of fds in the structures.
417 */
418 int consumer_update_poll_array(
419 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
420 struct lttng_consumer_stream **local_stream)
421 {
422 int i = 0;
423 struct lttng_ht_iter iter;
424 struct lttng_consumer_stream *stream;
425
426 DBG("Updating poll fd array");
427 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
428 node.node) {
429 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
430 continue;
431 }
432 DBG("Active FD %d", stream->wait_fd);
433 (*pollfd)[i].fd = stream->wait_fd;
434 (*pollfd)[i].events = POLLIN | POLLPRI;
435 local_stream[i] = stream;
436 i++;
437 }
438
439 /*
440 * Insert the consumer_poll_pipe at the end of the array and don't
441 * increment i so nb_fd is the number of real FD.
442 */
443 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
444 (*pollfd)[i].events = POLLIN;
445 return i;
446 }
447
448 /*
449 * Poll on the should_quit pipe and the command socket return -1 on error and
450 * should exit, 0 if data is available on the command socket
451 */
452 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
453 {
454 int num_rdy;
455
456 num_rdy = poll(consumer_sockpoll, 2, -1);
457 if (num_rdy == -1) {
458 perror("Poll error");
459 goto exit;
460 }
461 if (consumer_sockpoll[0].revents == POLLIN) {
462 DBG("consumer_should_quit wake up");
463 goto exit;
464 }
465 return 0;
466
467 exit:
468 return -1;
469 }
470
471 /*
472 * Set the error socket.
473 */
474 void lttng_consumer_set_error_sock(
475 struct lttng_consumer_local_data *ctx, int sock)
476 {
477 ctx->consumer_error_socket = sock;
478 }
479
480 /*
481 * Set the command socket path.
482 */
483
484 void lttng_consumer_set_command_sock_path(
485 struct lttng_consumer_local_data *ctx, char *sock)
486 {
487 ctx->consumer_command_sock_path = sock;
488 }
489
490 /*
491 * Send return code to the session daemon.
492 * If the socket is not defined, we return 0, it is not a fatal error
493 */
494 int lttng_consumer_send_error(
495 struct lttng_consumer_local_data *ctx, int cmd)
496 {
497 if (ctx->consumer_error_socket > 0) {
498 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
499 sizeof(enum lttcomm_sessiond_command));
500 }
501
502 return 0;
503 }
504
505 /*
506 * Close all the tracefiles and stream fds, should be called when all instances
507 * are destroyed.
508 */
509 void lttng_consumer_cleanup(void)
510 {
511 int ret;
512 struct lttng_ht_iter iter;
513 struct lttng_consumer_stream *stream;
514 struct lttng_consumer_channel *channel;
515
516 /*
517 * close all outfd. Called when there are no more threads
518 * running (after joining on the threads), no need to protect
519 * list iteration with mutex.
520 */
521 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
522 node.node) {
523 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
524 assert(!ret);
525 consumer_del_stream(stream);
526 }
527
528 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
529 node.node) {
530 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
531 assert(!ret);
532 consumer_del_channel(channel);
533 }
534 }
535
536 /*
537 * Called from signal handler.
538 */
539 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
540 {
541 int ret;
542 consumer_quit = 1;
543 ret = write(ctx->consumer_should_quit[1], "4", 1);
544 if (ret < 0) {
545 perror("write consumer quit");
546 }
547 }
548
549 void lttng_consumer_sync_trace_file(
550 struct lttng_consumer_stream *stream, off_t orig_offset)
551 {
552 int outfd = stream->out_fd;
553
554 /*
555 * This does a blocking write-and-wait on any page that belongs to the
556 * subbuffer prior to the one we just wrote.
557 * Don't care about error values, as these are just hints and ways to
558 * limit the amount of page cache used.
559 */
560 if (orig_offset < stream->chan->max_sb_size) {
561 return;
562 }
563 sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
564 stream->chan->max_sb_size,
565 SYNC_FILE_RANGE_WAIT_BEFORE
566 | SYNC_FILE_RANGE_WRITE
567 | SYNC_FILE_RANGE_WAIT_AFTER);
568 /*
569 * Give hints to the kernel about how we access the file:
570 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
571 * we write it.
572 *
573 * We need to call fadvise again after the file grows because the
574 * kernel does not seem to apply fadvise to non-existing parts of the
575 * file.
576 *
577 * Call fadvise _after_ having waited for the page writeback to
578 * complete because the dirty page writeback semantic is not well
579 * defined. So it can be expected to lead to lower throughput in
580 * streaming.
581 */
582 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
583 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
584 }
585
586 /*
587 * Initialise the necessary environnement :
588 * - create a new context
589 * - create the poll_pipe
590 * - create the should_quit pipe (for signal handler)
591 * - create the thread pipe (for splice)
592 *
593 * Takes a function pointer as argument, this function is called when data is
594 * available on a buffer. This function is responsible to do the
595 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
596 * buffer configuration and then kernctl_put_next_subbuf at the end.
597 *
598 * Returns a pointer to the new context or NULL on error.
599 */
600 struct lttng_consumer_local_data *lttng_consumer_create(
601 enum lttng_consumer_type type,
602 int (*buffer_ready)(struct lttng_consumer_stream *stream,
603 struct lttng_consumer_local_data *ctx),
604 int (*recv_channel)(struct lttng_consumer_channel *channel),
605 int (*recv_stream)(struct lttng_consumer_stream *stream),
606 int (*update_stream)(int stream_key, uint32_t state))
607 {
608 int ret, i;
609 struct lttng_consumer_local_data *ctx;
610
611 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
612 consumer_data.type == type);
613 consumer_data.type = type;
614
615 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
616 if (ctx == NULL) {
617 perror("allocating context");
618 goto error;
619 }
620
621 ctx->consumer_error_socket = -1;
622 /* assign the callbacks */
623 ctx->on_buffer_ready = buffer_ready;
624 ctx->on_recv_channel = recv_channel;
625 ctx->on_recv_stream = recv_stream;
626 ctx->on_update_stream = update_stream;
627
628 ret = pipe(ctx->consumer_poll_pipe);
629 if (ret < 0) {
630 perror("Error creating poll pipe");
631 goto error_poll_pipe;
632 }
633
634 ret = pipe(ctx->consumer_should_quit);
635 if (ret < 0) {
636 perror("Error creating recv pipe");
637 goto error_quit_pipe;
638 }
639
640 ret = pipe(ctx->consumer_thread_pipe);
641 if (ret < 0) {
642 perror("Error creating thread pipe");
643 goto error_thread_pipe;
644 }
645
646 return ctx;
647
648
649 error_thread_pipe:
650 for (i = 0; i < 2; i++) {
651 int err;
652
653 err = close(ctx->consumer_should_quit[i]);
654 assert(!err);
655 }
656 error_quit_pipe:
657 for (i = 0; i < 2; i++) {
658 int err;
659
660 err = close(ctx->consumer_poll_pipe[i]);
661 assert(!err);
662 }
663 error_poll_pipe:
664 free(ctx);
665 error:
666 return NULL;
667 }
668
669 /*
670 * Close all fds associated with the instance and free the context.
671 */
672 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
673 {
674 close(ctx->consumer_error_socket);
675 close(ctx->consumer_thread_pipe[0]);
676 close(ctx->consumer_thread_pipe[1]);
677 close(ctx->consumer_poll_pipe[0]);
678 close(ctx->consumer_poll_pipe[1]);
679 close(ctx->consumer_should_quit[0]);
680 close(ctx->consumer_should_quit[1]);
681 unlink(ctx->consumer_command_sock_path);
682 free(ctx);
683 }
684
685 /*
686 * Mmap the ring buffer, read it and write the data to the tracefile.
687 *
688 * Returns the number of bytes written
689 */
690 int lttng_consumer_on_read_subbuffer_mmap(
691 struct lttng_consumer_local_data *ctx,
692 struct lttng_consumer_stream *stream, unsigned long len)
693 {
694 switch (consumer_data.type) {
695 case LTTNG_CONSUMER_KERNEL:
696 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
697 case LTTNG_CONSUMER32_UST:
698 case LTTNG_CONSUMER64_UST:
699 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
700 default:
701 ERR("Unknown consumer_data type");
702 assert(0);
703 }
704 }
705
706 /*
707 * Splice the data from the ring buffer to the tracefile.
708 *
709 * Returns the number of bytes spliced.
710 */
711 int lttng_consumer_on_read_subbuffer_splice(
712 struct lttng_consumer_local_data *ctx,
713 struct lttng_consumer_stream *stream, unsigned long len)
714 {
715 switch (consumer_data.type) {
716 case LTTNG_CONSUMER_KERNEL:
717 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
718 case LTTNG_CONSUMER32_UST:
719 case LTTNG_CONSUMER64_UST:
720 return -ENOSYS;
721 default:
722 ERR("Unknown consumer_data type");
723 assert(0);
724 return -ENOSYS;
725 }
726
727 }
728
729 /*
730 * Take a snapshot for a specific fd
731 *
732 * Returns 0 on success, < 0 on error
733 */
734 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
735 struct lttng_consumer_stream *stream)
736 {
737 switch (consumer_data.type) {
738 case LTTNG_CONSUMER_KERNEL:
739 return lttng_kconsumer_take_snapshot(ctx, stream);
740 case LTTNG_CONSUMER32_UST:
741 case LTTNG_CONSUMER64_UST:
742 return lttng_ustconsumer_take_snapshot(ctx, stream);
743 default:
744 ERR("Unknown consumer_data type");
745 assert(0);
746 return -ENOSYS;
747 }
748
749 }
750
751 /*
752 * Get the produced position
753 *
754 * Returns 0 on success, < 0 on error
755 */
756 int lttng_consumer_get_produced_snapshot(
757 struct lttng_consumer_local_data *ctx,
758 struct lttng_consumer_stream *stream,
759 unsigned long *pos)
760 {
761 switch (consumer_data.type) {
762 case LTTNG_CONSUMER_KERNEL:
763 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
764 case LTTNG_CONSUMER32_UST:
765 case LTTNG_CONSUMER64_UST:
766 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
767 default:
768 ERR("Unknown consumer_data type");
769 assert(0);
770 return -ENOSYS;
771 }
772 }
773
774 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
775 int sock, struct pollfd *consumer_sockpoll)
776 {
777 switch (consumer_data.type) {
778 case LTTNG_CONSUMER_KERNEL:
779 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
780 case LTTNG_CONSUMER32_UST:
781 case LTTNG_CONSUMER64_UST:
782 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
783 default:
784 ERR("Unknown consumer_data type");
785 assert(0);
786 return -ENOSYS;
787 }
788 }
789
790 /*
791 * This thread polls the fds in the set to consume the data and write
792 * it to tracefile if necessary.
793 */
794 void *lttng_consumer_thread_poll_fds(void *data)
795 {
796 int num_rdy, num_hup, high_prio, ret, i;
797 struct pollfd *pollfd = NULL;
798 /* local view of the streams */
799 struct lttng_consumer_stream **local_stream = NULL;
800 /* local view of consumer_data.fds_count */
801 int nb_fd = 0;
802 char tmp;
803 int tmp2;
804 struct lttng_consumer_local_data *ctx = data;
805
806 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
807
808 while (1) {
809 high_prio = 0;
810 num_hup = 0;
811
812 /*
813 * the fds set has been updated, we need to update our
814 * local array as well
815 */
816 pthread_mutex_lock(&consumer_data.lock);
817 if (consumer_data.need_update) {
818 if (pollfd != NULL) {
819 free(pollfd);
820 pollfd = NULL;
821 }
822 if (local_stream != NULL) {
823 free(local_stream);
824 local_stream = NULL;
825 }
826
827 /* allocate for all fds + 1 for the consumer_poll_pipe */
828 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
829 if (pollfd == NULL) {
830 perror("pollfd malloc");
831 pthread_mutex_unlock(&consumer_data.lock);
832 goto end;
833 }
834
835 /* allocate for all fds + 1 for the consumer_poll_pipe */
836 local_stream = zmalloc((consumer_data.stream_count + 1) *
837 sizeof(struct lttng_consumer_stream));
838 if (local_stream == NULL) {
839 perror("local_stream malloc");
840 pthread_mutex_unlock(&consumer_data.lock);
841 goto end;
842 }
843 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
844 if (ret < 0) {
845 ERR("Error in allocating pollfd or local_outfds");
846 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
847 pthread_mutex_unlock(&consumer_data.lock);
848 goto end;
849 }
850 nb_fd = ret;
851 consumer_data.need_update = 0;
852 }
853 pthread_mutex_unlock(&consumer_data.lock);
854
855 /* poll on the array of fds */
856 DBG("polling on %d fd", nb_fd + 1);
857 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
858 DBG("poll num_rdy : %d", num_rdy);
859 if (num_rdy == -1) {
860 perror("Poll error");
861 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
862 goto end;
863 } else if (num_rdy == 0) {
864 DBG("Polling thread timed out");
865 goto end;
866 }
867
868 /* No FDs and consumer_quit, consumer_cleanup the thread */
869 if (nb_fd == 0 && consumer_quit == 1) {
870 goto end;
871 }
872
873 /*
874 * If the consumer_poll_pipe triggered poll go
875 * directly to the beginning of the loop to update the
876 * array. We want to prioritize array update over
877 * low-priority reads.
878 */
879 if (pollfd[nb_fd].revents & POLLIN) {
880 DBG("consumer_poll_pipe wake up");
881 tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
882 if (tmp2 < 0) {
883 perror("read consumer poll");
884 }
885 continue;
886 }
887
888 /* Take care of high priority channels first. */
889 for (i = 0; i < nb_fd; i++) {
890 if (pollfd[i].revents & POLLPRI) {
891 DBG("Urgent read on fd %d", pollfd[i].fd);
892 high_prio = 1;
893 ret = ctx->on_buffer_ready(local_stream[i], ctx);
894 /* it's ok to have an unavailable sub-buffer */
895 if (ret == EAGAIN) {
896 ret = 0;
897 }
898 } else if (pollfd[i].revents & POLLERR) {
899 ERR("Error returned in polling fd %d.", pollfd[i].fd);
900 consumer_del_stream(local_stream[i]);
901 num_hup++;
902 } else if (pollfd[i].revents & POLLNVAL) {
903 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
904 consumer_del_stream(local_stream[i]);
905 num_hup++;
906 } else if ((pollfd[i].revents & POLLHUP) &&
907 !(pollfd[i].revents & POLLIN)) {
908 if (consumer_data.type == LTTNG_CONSUMER32_UST
909 || consumer_data.type == LTTNG_CONSUMER64_UST) {
910 DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
911 pollfd[i].fd);
912 if (!local_stream[i]->hangup_flush_done) {
913 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
914 /* read after flush */
915 do {
916 ret = ctx->on_buffer_ready(local_stream[i], ctx);
917 } while (ret == EAGAIN);
918 }
919 } else {
920 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
921 }
922 consumer_del_stream(local_stream[i]);
923 num_hup++;
924 }
925 }
926
927 /* If every buffer FD has hung up, we end the read loop here */
928 if (nb_fd > 0 && num_hup == nb_fd) {
929 DBG("every buffer FD has hung up\n");
930 if (consumer_quit == 1) {
931 goto end;
932 }
933 continue;
934 }
935
936 /* Take care of low priority channels. */
937 if (high_prio == 0) {
938 for (i = 0; i < nb_fd; i++) {
939 if (pollfd[i].revents & POLLIN) {
940 DBG("Normal read on fd %d", pollfd[i].fd);
941 ret = ctx->on_buffer_ready(local_stream[i], ctx);
942 /* it's ok to have an unavailable subbuffer */
943 if (ret == EAGAIN) {
944 ret = 0;
945 }
946 }
947 }
948 }
949 }
950 end:
951 DBG("polling thread exiting");
952 if (pollfd != NULL) {
953 free(pollfd);
954 pollfd = NULL;
955 }
956 if (local_stream != NULL) {
957 free(local_stream);
958 local_stream = NULL;
959 }
960 return NULL;
961 }
962
963 /*
964 * This thread listens on the consumerd socket and receives the file
965 * descriptors from the session daemon.
966 */
967 void *lttng_consumer_thread_receive_fds(void *data)
968 {
969 int sock, client_socket, ret;
970 /*
971 * structure to poll for incoming data on communication socket avoids
972 * making blocking sockets.
973 */
974 struct pollfd consumer_sockpoll[2];
975 struct lttng_consumer_local_data *ctx = data;
976
977 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
978 unlink(ctx->consumer_command_sock_path);
979 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
980 if (client_socket < 0) {
981 ERR("Cannot create command socket");
982 goto end;
983 }
984
985 ret = lttcomm_listen_unix_sock(client_socket);
986 if (ret < 0) {
987 goto end;
988 }
989
990 DBG("Sending ready command to lttng-sessiond");
991 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
992 /* return < 0 on error, but == 0 is not fatal */
993 if (ret < 0) {
994 ERR("Error sending ready command to lttng-sessiond");
995 goto end;
996 }
997
998 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
999 if (ret < 0) {
1000 perror("fcntl O_NONBLOCK");
1001 goto end;
1002 }
1003
1004 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1005 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
1006 consumer_sockpoll[0].events = POLLIN | POLLPRI;
1007 consumer_sockpoll[1].fd = client_socket;
1008 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1009
1010 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1011 goto end;
1012 }
1013 DBG("Connection on client_socket");
1014
1015 /* Blocking call, waiting for transmission */
1016 sock = lttcomm_accept_unix_sock(client_socket);
1017 if (sock <= 0) {
1018 WARN("On accept");
1019 goto end;
1020 }
1021 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
1022 if (ret < 0) {
1023 perror("fcntl O_NONBLOCK");
1024 goto end;
1025 }
1026
1027 /* update the polling structure to poll on the established socket */
1028 consumer_sockpoll[1].fd = sock;
1029 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1030
1031 while (1) {
1032 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1033 goto end;
1034 }
1035 DBG("Incoming command on sock");
1036 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1037 if (ret == -ENOENT) {
1038 DBG("Received STOP command");
1039 goto end;
1040 }
1041 if (ret < 0) {
1042 ERR("Communication interrupted on command socket");
1043 goto end;
1044 }
1045 if (consumer_quit) {
1046 DBG("consumer_thread_receive_fds received quit from signal");
1047 goto end;
1048 }
1049 DBG("received fds on sock");
1050 }
1051 end:
1052 DBG("consumer_thread_receive_fds exiting");
1053
1054 /*
1055 * when all fds have hung up, the polling thread
1056 * can exit cleanly
1057 */
1058 consumer_quit = 1;
1059
1060 /*
1061 * 2s of grace period, if no polling events occur during
1062 * this period, the polling thread will exit even if there
1063 * are still open FDs (should not happen, but safety mechanism).
1064 */
1065 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1066
1067 /* wake up the polling thread */
1068 ret = write(ctx->consumer_poll_pipe[1], "4", 1);
1069 if (ret < 0) {
1070 perror("poll pipe write");
1071 }
1072 return NULL;
1073 }
1074
1075 int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
1076 struct lttng_consumer_local_data *ctx)
1077 {
1078 switch (consumer_data.type) {
1079 case LTTNG_CONSUMER_KERNEL:
1080 return lttng_kconsumer_read_subbuffer(stream, ctx);
1081 case LTTNG_CONSUMER32_UST:
1082 case LTTNG_CONSUMER64_UST:
1083 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1084 default:
1085 ERR("Unknown consumer_data type");
1086 assert(0);
1087 return -ENOSYS;
1088 }
1089 }
1090
1091 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1092 {
1093 switch (consumer_data.type) {
1094 case LTTNG_CONSUMER_KERNEL:
1095 return lttng_kconsumer_on_recv_stream(stream);
1096 case LTTNG_CONSUMER32_UST:
1097 case LTTNG_CONSUMER64_UST:
1098 return lttng_ustconsumer_on_recv_stream(stream);
1099 default:
1100 ERR("Unknown consumer_data type");
1101 assert(0);
1102 return -ENOSYS;
1103 }
1104 }
1105
1106 /*
1107 * Allocate and set consumer data hash tables.
1108 */
1109 void lttng_consumer_init(void)
1110 {
1111 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1112 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1113 }
1114
This page took 0.085052 seconds and 4 git commands to generate.