Add lttng hash table support to liblttng-consumer
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <assert.h>
22#include <fcntl.h>
23#include <poll.h>
24#include <pthread.h>
25#include <stdlib.h>
26#include <string.h>
27#include <sys/mman.h>
28#include <sys/socket.h>
29#include <sys/types.h>
30#include <unistd.h>
31
32#include <lttng-kernel-ctl.h>
33#include <lttng-sessiond-comm.h>
34#include <lttng/lttng-consumer.h>
35#include <lttng/lttng-kconsumer.h>
36#include <lttng/lttng-ustconsumer.h>
37#include <lttngerr.h>
38
39struct lttng_consumer_global_data consumer_data = {
3bd1e081
MD
40 .stream_count = 0,
41 .need_update = 1,
42 .type = LTTNG_CONSUMER_UNKNOWN,
43};
44
45/* timeout parameter, to control the polling thread grace period. */
46int consumer_poll_timeout = -1;
47
48/*
49 * Flag to inform the polling thread to quit when all fd hung up. Updated by
50 * the consumer_thread_receive_fds when it notices that all fds has hung up.
51 * Also updated by the signal handler (consumer_should_exit()). Read by the
52 * polling threads.
53 */
54volatile int consumer_quit = 0;
55
56/*
57 * Find a stream. The consumer_data.lock must be locked during this
58 * call.
59 */
60static struct lttng_consumer_stream *consumer_find_stream(int key)
61{
e4421fec
DG
62 struct lttng_ht_iter iter;
63 struct lttng_ht_node_ulong *node;
64 struct lttng_consumer_stream *stream = NULL;
3bd1e081 65
7ad0a0cb
MD
66 /* Negative keys are lookup failures */
67 if (key < 0)
68 return NULL;
e4421fec
DG
69
70 lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
71 &iter);
72 node = lttng_ht_iter_get_node_ulong(&iter);
73 if (node != NULL) {
74 stream = caa_container_of(node, struct lttng_consumer_stream, node);
3bd1e081 75 }
e4421fec
DG
76
77 return stream;
3bd1e081
MD
78}
79
7ad0a0cb
MD
80static void consumer_steal_stream_key(int key)
81{
82 struct lttng_consumer_stream *stream;
83
84 stream = consumer_find_stream(key);
85 if (stream)
86 stream->key = -1;
87}
88
3bd1e081
MD
89static struct lttng_consumer_channel *consumer_find_channel(int key)
90{
e4421fec
DG
91 struct lttng_ht_iter iter;
92 struct lttng_ht_node_ulong *node;
93 struct lttng_consumer_channel *channel = NULL;
3bd1e081 94
7ad0a0cb
MD
95 /* Negative keys are lookup failures */
96 if (key < 0)
97 return NULL;
e4421fec
DG
98
99 lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
100 &iter);
101 node = lttng_ht_iter_get_node_ulong(&iter);
102 if (node != NULL) {
103 channel = caa_container_of(node, struct lttng_consumer_channel, node);
3bd1e081 104 }
e4421fec
DG
105
106 return channel;
3bd1e081
MD
107}
108
7ad0a0cb
MD
109static void consumer_steal_channel_key(int key)
110{
111 struct lttng_consumer_channel *channel;
112
113 channel = consumer_find_channel(key);
114 if (channel)
115 channel->key = -1;
116}
117
3bd1e081
MD
118/*
119 * Remove a stream from the global list protected by a mutex. This
120 * function is also responsible for freeing its data structures.
121 */
122void consumer_del_stream(struct lttng_consumer_stream *stream)
123{
124 int ret;
e4421fec 125 struct lttng_ht_iter iter;
3bd1e081
MD
126 struct lttng_consumer_channel *free_chan = NULL;
127
128 pthread_mutex_lock(&consumer_data.lock);
129
130 switch (consumer_data.type) {
131 case LTTNG_CONSUMER_KERNEL:
132 if (stream->mmap_base != NULL) {
133 ret = munmap(stream->mmap_base, stream->mmap_len);
134 if (ret != 0) {
135 perror("munmap");
136 }
137 }
138 break;
7753dea8
MD
139 case LTTNG_CONSUMER32_UST:
140 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
141 lttng_ustconsumer_del_stream(stream);
142 break;
143 default:
144 ERR("Unknown consumer_data type");
145 assert(0);
146 goto end;
147 }
148
e4421fec
DG
149 /* Get stream node from hash table */
150 lttng_ht_lookup(consumer_data.stream_ht,
151 (void *)((unsigned long) stream->key), &iter);
152 /* Remove stream node from hash table */
153 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
154 assert(!ret);
155
3bd1e081
MD
156 if (consumer_data.stream_count <= 0) {
157 goto end;
158 }
159 consumer_data.stream_count--;
160 if (!stream) {
161 goto end;
162 }
163 if (stream->out_fd >= 0) {
164 close(stream->out_fd);
165 }
b5c5fc29 166 if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
3bd1e081
MD
167 close(stream->wait_fd);
168 }
b5c5fc29
MD
169 if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd
170 && !stream->shm_fd_is_copy) {
3bd1e081
MD
171 close(stream->shm_fd);
172 }
173 if (!--stream->chan->refcount)
174 free_chan = stream->chan;
175 free(stream);
176end:
177 consumer_data.need_update = 1;
178 pthread_mutex_unlock(&consumer_data.lock);
179
180 if (free_chan)
181 consumer_del_channel(free_chan);
182}
183
184struct lttng_consumer_stream *consumer_allocate_stream(
185 int channel_key, int stream_key,
186 int shm_fd, int wait_fd,
187 enum lttng_consumer_stream_state state,
188 uint64_t mmap_len,
189 enum lttng_event_output output,
6df2e2c9
MD
190 const char *path_name,
191 uid_t uid,
192 gid_t gid)
3bd1e081
MD
193{
194 struct lttng_consumer_stream *stream;
195 int ret;
196
effcf122 197 stream = zmalloc(sizeof(*stream));
3bd1e081
MD
198 if (stream == NULL) {
199 perror("malloc struct lttng_consumer_stream");
200 goto end;
201 }
202 stream->chan = consumer_find_channel(channel_key);
203 if (!stream->chan) {
204 perror("Unable to find channel key");
205 goto end;
206 }
207 stream->chan->refcount++;
208 stream->key = stream_key;
209 stream->shm_fd = shm_fd;
210 stream->wait_fd = wait_fd;
211 stream->out_fd = -1;
212 stream->out_fd_offset = 0;
213 stream->state = state;
214 stream->mmap_len = mmap_len;
215 stream->mmap_base = NULL;
216 stream->output = output;
6df2e2c9
MD
217 stream->uid = uid;
218 stream->gid = gid;
3bd1e081
MD
219 strncpy(stream->path_name, path_name, PATH_MAX - 1);
220 stream->path_name[PATH_MAX - 1] = '\0';
e4421fec 221 lttng_ht_node_init_ulong(&stream->node, stream->key);
3bd1e081
MD
222
223 switch (consumer_data.type) {
224 case LTTNG_CONSUMER_KERNEL:
225 break;
7753dea8
MD
226 case LTTNG_CONSUMER32_UST:
227 case LTTNG_CONSUMER64_UST:
5af2f756 228 stream->cpu = stream->chan->cpucount++;
3bd1e081
MD
229 ret = lttng_ustconsumer_allocate_stream(stream);
230 if (ret) {
231 free(stream);
232 return NULL;
233 }
234 break;
235 default:
236 ERR("Unknown consumer_data type");
237 assert(0);
238 goto end;
239 }
240 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
241 stream->path_name, stream->key,
242 stream->shm_fd,
243 stream->wait_fd,
244 (unsigned long long) stream->mmap_len,
245 stream->out_fd);
246end:
247 return stream;
248}
249
250/*
251 * Add a stream to the global list protected by a mutex.
252 */
253int consumer_add_stream(struct lttng_consumer_stream *stream)
254{
255 int ret = 0;
256
257 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
258 /* Steal stream identifier, for UST */
259 consumer_steal_stream_key(stream->key);
e4421fec 260 lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
3bd1e081
MD
261 consumer_data.stream_count++;
262 consumer_data.need_update = 1;
263
264 switch (consumer_data.type) {
265 case LTTNG_CONSUMER_KERNEL:
266 break;
7753dea8
MD
267 case LTTNG_CONSUMER32_UST:
268 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
269 /* Streams are in CPU number order (we rely on this) */
270 stream->cpu = stream->chan->nr_streams++;
271 break;
272 default:
273 ERR("Unknown consumer_data type");
274 assert(0);
275 goto end;
276 }
277
278end:
279 pthread_mutex_unlock(&consumer_data.lock);
280 return ret;
281}
282
283/*
284 * Update a stream according to what we just received.
285 */
286void consumer_change_stream_state(int stream_key,
287 enum lttng_consumer_stream_state state)
288{
289 struct lttng_consumer_stream *stream;
290
291 pthread_mutex_lock(&consumer_data.lock);
292 stream = consumer_find_stream(stream_key);
293 if (stream) {
294 stream->state = state;
295 }
296 consumer_data.need_update = 1;
297 pthread_mutex_unlock(&consumer_data.lock);
298}
299
300/*
301 * Remove a channel from the global list protected by a mutex. This
302 * function is also responsible for freeing its data structures.
303 */
304void consumer_del_channel(struct lttng_consumer_channel *channel)
305{
306 int ret;
e4421fec 307 struct lttng_ht_iter iter;
3bd1e081
MD
308
309 pthread_mutex_lock(&consumer_data.lock);
310
311 switch (consumer_data.type) {
312 case LTTNG_CONSUMER_KERNEL:
313 break;
7753dea8
MD
314 case LTTNG_CONSUMER32_UST:
315 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
316 lttng_ustconsumer_del_channel(channel);
317 break;
318 default:
319 ERR("Unknown consumer_data type");
320 assert(0);
321 goto end;
322 }
323
e4421fec
DG
324 lttng_ht_lookup(consumer_data.channel_ht,
325 (void *)((unsigned long) channel->key), &iter);
326 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
327 assert(!ret);
328
3bd1e081
MD
329 if (channel->mmap_base != NULL) {
330 ret = munmap(channel->mmap_base, channel->mmap_len);
331 if (ret != 0) {
332 perror("munmap");
333 }
334 }
b5c5fc29 335 if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
3bd1e081
MD
336 close(channel->wait_fd);
337 }
b5c5fc29
MD
338 if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd
339 && !channel->shm_fd_is_copy) {
3bd1e081
MD
340 close(channel->shm_fd);
341 }
342 free(channel);
343end:
344 pthread_mutex_unlock(&consumer_data.lock);
345}
346
347struct lttng_consumer_channel *consumer_allocate_channel(
348 int channel_key,
349 int shm_fd, int wait_fd,
350 uint64_t mmap_len,
351 uint64_t max_sb_size)
352{
353 struct lttng_consumer_channel *channel;
354 int ret;
355
276b26d1 356 channel = zmalloc(sizeof(*channel));
3bd1e081
MD
357 if (channel == NULL) {
358 perror("malloc struct lttng_consumer_channel");
359 goto end;
360 }
361 channel->key = channel_key;
362 channel->shm_fd = shm_fd;
363 channel->wait_fd = wait_fd;
364 channel->mmap_len = mmap_len;
365 channel->max_sb_size = max_sb_size;
366 channel->refcount = 0;
367 channel->nr_streams = 0;
e4421fec 368 lttng_ht_node_init_ulong(&channel->node, channel->key);
3bd1e081
MD
369
370 switch (consumer_data.type) {
371 case LTTNG_CONSUMER_KERNEL:
372 channel->mmap_base = NULL;
373 channel->mmap_len = 0;
374 break;
7753dea8
MD
375 case LTTNG_CONSUMER32_UST:
376 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
377 ret = lttng_ustconsumer_allocate_channel(channel);
378 if (ret) {
379 free(channel);
380 return NULL;
381 }
382 break;
383 default:
384 ERR("Unknown consumer_data type");
385 assert(0);
386 goto end;
387 }
388 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
389 channel->key,
390 channel->shm_fd,
391 channel->wait_fd,
392 (unsigned long long) channel->mmap_len,
393 (unsigned long long) channel->max_sb_size);
394end:
395 return channel;
396}
397
398/*
399 * Add a channel to the global list protected by a mutex.
400 */
401int consumer_add_channel(struct lttng_consumer_channel *channel)
402{
3bd1e081 403 pthread_mutex_lock(&consumer_data.lock);
7ad0a0cb
MD
404 /* Steal channel identifier, for UST */
405 consumer_steal_channel_key(channel->key);
e4421fec 406 lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
3bd1e081 407 pthread_mutex_unlock(&consumer_data.lock);
7ad0a0cb 408 return 0;
3bd1e081
MD
409}
410
411/*
412 * Allocate the pollfd structure and the local view of the out fds to avoid
413 * doing a lookup in the linked list and concurrency issues when writing is
414 * needed. Called with consumer_data.lock held.
415 *
416 * Returns the number of fds in the structures.
417 */
418int consumer_update_poll_array(
419 struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
420 struct lttng_consumer_stream **local_stream)
421{
3bd1e081 422 int i = 0;
e4421fec
DG
423 struct lttng_ht_iter iter;
424 struct lttng_consumer_stream *stream;
3bd1e081
MD
425
426 DBG("Updating poll fd array");
e4421fec
DG
427 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
428 node.node) {
429 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
3bd1e081
MD
430 continue;
431 }
e4421fec
DG
432 DBG("Active FD %d", stream->wait_fd);
433 (*pollfd)[i].fd = stream->wait_fd;
3bd1e081 434 (*pollfd)[i].events = POLLIN | POLLPRI;
e4421fec 435 local_stream[i] = stream;
3bd1e081
MD
436 i++;
437 }
438
439 /*
440 * Insert the consumer_poll_pipe at the end of the array and don't
441 * increment i so nb_fd is the number of real FD.
442 */
443 (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
444 (*pollfd)[i].events = POLLIN;
445 return i;
446}
447
448/*
449 * Poll on the should_quit pipe and the command socket return -1 on error and
450 * should exit, 0 if data is available on the command socket
451 */
452int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
453{
454 int num_rdy;
455
456 num_rdy = poll(consumer_sockpoll, 2, -1);
457 if (num_rdy == -1) {
458 perror("Poll error");
459 goto exit;
460 }
461 if (consumer_sockpoll[0].revents == POLLIN) {
462 DBG("consumer_should_quit wake up");
463 goto exit;
464 }
465 return 0;
466
467exit:
468 return -1;
469}
470
471/*
472 * Set the error socket.
473 */
474void lttng_consumer_set_error_sock(
475 struct lttng_consumer_local_data *ctx, int sock)
476{
477 ctx->consumer_error_socket = sock;
478}
479
480/*
481 * Set the command socket path.
482 */
483
484void lttng_consumer_set_command_sock_path(
485 struct lttng_consumer_local_data *ctx, char *sock)
486{
487 ctx->consumer_command_sock_path = sock;
488}
489
490/*
491 * Send return code to the session daemon.
492 * If the socket is not defined, we return 0, it is not a fatal error
493 */
494int lttng_consumer_send_error(
495 struct lttng_consumer_local_data *ctx, int cmd)
496{
497 if (ctx->consumer_error_socket > 0) {
498 return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
499 sizeof(enum lttcomm_sessiond_command));
500 }
501
502 return 0;
503}
504
505/*
506 * Close all the tracefiles and stream fds, should be called when all instances
507 * are destroyed.
508 */
509void lttng_consumer_cleanup(void)
510{
e4421fec
DG
511 int ret;
512 struct lttng_ht_iter iter;
513 struct lttng_consumer_stream *stream;
514 struct lttng_consumer_channel *channel;
3bd1e081
MD
515
516 /*
517 * close all outfd. Called when there are no more threads
518 * running (after joining on the threads), no need to protect
519 * list iteration with mutex.
520 */
e4421fec
DG
521 cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
522 node.node) {
523 ret = lttng_ht_del(consumer_data.stream_ht, &iter);
524 assert(!ret);
525 consumer_del_stream(stream);
3bd1e081 526 }
e4421fec
DG
527
528 cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
529 node.node) {
530 ret = lttng_ht_del(consumer_data.channel_ht, &iter);
531 assert(!ret);
532 consumer_del_channel(channel);
3bd1e081
MD
533 }
534}
535
536/*
537 * Called from signal handler.
538 */
539void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
540{
541 int ret;
542 consumer_quit = 1;
543 ret = write(ctx->consumer_should_quit[1], "4", 1);
544 if (ret < 0) {
545 perror("write consumer quit");
546 }
547}
548
549void lttng_consumer_sync_trace_file(
550 struct lttng_consumer_stream *stream, off_t orig_offset)
551{
552 int outfd = stream->out_fd;
553
554 /*
555 * This does a blocking write-and-wait on any page that belongs to the
556 * subbuffer prior to the one we just wrote.
557 * Don't care about error values, as these are just hints and ways to
558 * limit the amount of page cache used.
559 */
560 if (orig_offset < stream->chan->max_sb_size) {
561 return;
562 }
563 sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
564 stream->chan->max_sb_size,
565 SYNC_FILE_RANGE_WAIT_BEFORE
566 | SYNC_FILE_RANGE_WRITE
567 | SYNC_FILE_RANGE_WAIT_AFTER);
568 /*
569 * Give hints to the kernel about how we access the file:
570 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
571 * we write it.
572 *
573 * We need to call fadvise again after the file grows because the
574 * kernel does not seem to apply fadvise to non-existing parts of the
575 * file.
576 *
577 * Call fadvise _after_ having waited for the page writeback to
578 * complete because the dirty page writeback semantic is not well
579 * defined. So it can be expected to lead to lower throughput in
580 * streaming.
581 */
582 posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
583 stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
584}
585
586/*
587 * Initialise the necessary environnement :
588 * - create a new context
589 * - create the poll_pipe
590 * - create the should_quit pipe (for signal handler)
591 * - create the thread pipe (for splice)
592 *
593 * Takes a function pointer as argument, this function is called when data is
594 * available on a buffer. This function is responsible to do the
595 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
596 * buffer configuration and then kernctl_put_next_subbuf at the end.
597 *
598 * Returns a pointer to the new context or NULL on error.
599 */
600struct lttng_consumer_local_data *lttng_consumer_create(
601 enum lttng_consumer_type type,
d41f73b7
MD
602 int (*buffer_ready)(struct lttng_consumer_stream *stream,
603 struct lttng_consumer_local_data *ctx),
3bd1e081
MD
604 int (*recv_channel)(struct lttng_consumer_channel *channel),
605 int (*recv_stream)(struct lttng_consumer_stream *stream),
606 int (*update_stream)(int stream_key, uint32_t state))
607{
608 int ret, i;
609 struct lttng_consumer_local_data *ctx;
610
611 assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
612 consumer_data.type == type);
613 consumer_data.type = type;
614
effcf122 615 ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
3bd1e081
MD
616 if (ctx == NULL) {
617 perror("allocating context");
618 goto error;
619 }
620
621 ctx->consumer_error_socket = -1;
622 /* assign the callbacks */
623 ctx->on_buffer_ready = buffer_ready;
624 ctx->on_recv_channel = recv_channel;
625 ctx->on_recv_stream = recv_stream;
626 ctx->on_update_stream = update_stream;
627
628 ret = pipe(ctx->consumer_poll_pipe);
629 if (ret < 0) {
630 perror("Error creating poll pipe");
631 goto error_poll_pipe;
632 }
633
634 ret = pipe(ctx->consumer_should_quit);
635 if (ret < 0) {
636 perror("Error creating recv pipe");
637 goto error_quit_pipe;
638 }
639
640 ret = pipe(ctx->consumer_thread_pipe);
641 if (ret < 0) {
642 perror("Error creating thread pipe");
643 goto error_thread_pipe;
644 }
645
646 return ctx;
647
648
649error_thread_pipe:
650 for (i = 0; i < 2; i++) {
651 int err;
652
653 err = close(ctx->consumer_should_quit[i]);
654 assert(!err);
655 }
656error_quit_pipe:
657 for (i = 0; i < 2; i++) {
658 int err;
659
660 err = close(ctx->consumer_poll_pipe[i]);
661 assert(!err);
662 }
663error_poll_pipe:
664 free(ctx);
665error:
666 return NULL;
667}
668
669/*
670 * Close all fds associated with the instance and free the context.
671 */
672void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
673{
674 close(ctx->consumer_error_socket);
675 close(ctx->consumer_thread_pipe[0]);
676 close(ctx->consumer_thread_pipe[1]);
677 close(ctx->consumer_poll_pipe[0]);
678 close(ctx->consumer_poll_pipe[1]);
679 close(ctx->consumer_should_quit[0]);
680 close(ctx->consumer_should_quit[1]);
681 unlink(ctx->consumer_command_sock_path);
682 free(ctx);
683}
684
685/*
686 * Mmap the ring buffer, read it and write the data to the tracefile.
687 *
688 * Returns the number of bytes written
689 */
690int lttng_consumer_on_read_subbuffer_mmap(
691 struct lttng_consumer_local_data *ctx,
692 struct lttng_consumer_stream *stream, unsigned long len)
693{
694 switch (consumer_data.type) {
695 case LTTNG_CONSUMER_KERNEL:
696 return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
7753dea8
MD
697 case LTTNG_CONSUMER32_UST:
698 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
699 return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
700 default:
701 ERR("Unknown consumer_data type");
702 assert(0);
703 }
704}
705
706/*
707 * Splice the data from the ring buffer to the tracefile.
708 *
709 * Returns the number of bytes spliced.
710 */
711int lttng_consumer_on_read_subbuffer_splice(
712 struct lttng_consumer_local_data *ctx,
713 struct lttng_consumer_stream *stream, unsigned long len)
714{
715 switch (consumer_data.type) {
716 case LTTNG_CONSUMER_KERNEL:
717 return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
7753dea8
MD
718 case LTTNG_CONSUMER32_UST:
719 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
720 return -ENOSYS;
721 default:
722 ERR("Unknown consumer_data type");
723 assert(0);
724 return -ENOSYS;
725 }
726
727}
728
729/*
730 * Take a snapshot for a specific fd
731 *
732 * Returns 0 on success, < 0 on error
733 */
734int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
735 struct lttng_consumer_stream *stream)
736{
737 switch (consumer_data.type) {
738 case LTTNG_CONSUMER_KERNEL:
739 return lttng_kconsumer_take_snapshot(ctx, stream);
7753dea8
MD
740 case LTTNG_CONSUMER32_UST:
741 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
742 return lttng_ustconsumer_take_snapshot(ctx, stream);
743 default:
744 ERR("Unknown consumer_data type");
745 assert(0);
746 return -ENOSYS;
747 }
748
749}
750
751/*
752 * Get the produced position
753 *
754 * Returns 0 on success, < 0 on error
755 */
756int lttng_consumer_get_produced_snapshot(
757 struct lttng_consumer_local_data *ctx,
758 struct lttng_consumer_stream *stream,
759 unsigned long *pos)
760{
761 switch (consumer_data.type) {
762 case LTTNG_CONSUMER_KERNEL:
763 return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
7753dea8
MD
764 case LTTNG_CONSUMER32_UST:
765 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
766 return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
767 default:
768 ERR("Unknown consumer_data type");
769 assert(0);
770 return -ENOSYS;
771 }
772}
773
774int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
775 int sock, struct pollfd *consumer_sockpoll)
776{
777 switch (consumer_data.type) {
778 case LTTNG_CONSUMER_KERNEL:
779 return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
7753dea8
MD
780 case LTTNG_CONSUMER32_UST:
781 case LTTNG_CONSUMER64_UST:
3bd1e081
MD
782 return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
783 default:
784 ERR("Unknown consumer_data type");
785 assert(0);
786 return -ENOSYS;
787 }
788}
789
790/*
e4421fec 791 * This thread polls the fds in the set to consume the data and write
3bd1e081
MD
792 * it to tracefile if necessary.
793 */
794void *lttng_consumer_thread_poll_fds(void *data)
795{
796 int num_rdy, num_hup, high_prio, ret, i;
797 struct pollfd *pollfd = NULL;
798 /* local view of the streams */
799 struct lttng_consumer_stream **local_stream = NULL;
800 /* local view of consumer_data.fds_count */
801 int nb_fd = 0;
802 char tmp;
803 int tmp2;
804 struct lttng_consumer_local_data *ctx = data;
805
effcf122 806 local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
3bd1e081
MD
807
808 while (1) {
809 high_prio = 0;
810 num_hup = 0;
811
812 /*
e4421fec 813 * the fds set has been updated, we need to update our
3bd1e081
MD
814 * local array as well
815 */
816 pthread_mutex_lock(&consumer_data.lock);
817 if (consumer_data.need_update) {
818 if (pollfd != NULL) {
819 free(pollfd);
820 pollfd = NULL;
821 }
822 if (local_stream != NULL) {
823 free(local_stream);
824 local_stream = NULL;
825 }
826
827 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 828 pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
3bd1e081
MD
829 if (pollfd == NULL) {
830 perror("pollfd malloc");
831 pthread_mutex_unlock(&consumer_data.lock);
832 goto end;
833 }
834
835 /* allocate for all fds + 1 for the consumer_poll_pipe */
effcf122 836 local_stream = zmalloc((consumer_data.stream_count + 1) *
3bd1e081
MD
837 sizeof(struct lttng_consumer_stream));
838 if (local_stream == NULL) {
839 perror("local_stream malloc");
840 pthread_mutex_unlock(&consumer_data.lock);
841 goto end;
842 }
843 ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
844 if (ret < 0) {
845 ERR("Error in allocating pollfd or local_outfds");
846 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
847 pthread_mutex_unlock(&consumer_data.lock);
848 goto end;
849 }
850 nb_fd = ret;
851 consumer_data.need_update = 0;
852 }
853 pthread_mutex_unlock(&consumer_data.lock);
854
855 /* poll on the array of fds */
856 DBG("polling on %d fd", nb_fd + 1);
857 num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
858 DBG("poll num_rdy : %d", num_rdy);
859 if (num_rdy == -1) {
860 perror("Poll error");
861 lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
862 goto end;
863 } else if (num_rdy == 0) {
864 DBG("Polling thread timed out");
865 goto end;
866 }
867
d41f73b7 868 /* No FDs and consumer_quit, consumer_cleanup the thread */
3bd1e081
MD
869 if (nb_fd == 0 && consumer_quit == 1) {
870 goto end;
871 }
872
873 /*
874 * If the consumer_poll_pipe triggered poll go
875 * directly to the beginning of the loop to update the
876 * array. We want to prioritize array update over
877 * low-priority reads.
878 */
d41f73b7 879 if (pollfd[nb_fd].revents & POLLIN) {
3bd1e081
MD
880 DBG("consumer_poll_pipe wake up");
881 tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
882 if (tmp2 < 0) {
d41f73b7 883 perror("read consumer poll");
3bd1e081
MD
884 }
885 continue;
886 }
887
888 /* Take care of high priority channels first. */
889 for (i = 0; i < nb_fd; i++) {
d41f73b7
MD
890 if (pollfd[i].revents & POLLPRI) {
891 DBG("Urgent read on fd %d", pollfd[i].fd);
892 high_prio = 1;
893 ret = ctx->on_buffer_ready(local_stream[i], ctx);
894 /* it's ok to have an unavailable sub-buffer */
895 if (ret == EAGAIN) {
896 ret = 0;
897 }
898 } else if (pollfd[i].revents & POLLERR) {
3bd1e081
MD
899 ERR("Error returned in polling fd %d.", pollfd[i].fd);
900 consumer_del_stream(local_stream[i]);
901 num_hup++;
d41f73b7
MD
902 } else if (pollfd[i].revents & POLLNVAL) {
903 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
3bd1e081
MD
904 consumer_del_stream(local_stream[i]);
905 num_hup++;
1c3c14ac
MD
906 } else if ((pollfd[i].revents & POLLHUP) &&
907 !(pollfd[i].revents & POLLIN)) {
7753dea8
MD
908 if (consumer_data.type == LTTNG_CONSUMER32_UST
909 || consumer_data.type == LTTNG_CONSUMER64_UST) {
d056b477
MD
910 DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
911 pollfd[i].fd);
912 if (!local_stream[i]->hangup_flush_done) {
913 lttng_ustconsumer_on_stream_hangup(local_stream[i]);
effcf122
MD
914 /* read after flush */
915 do {
916 ret = ctx->on_buffer_ready(local_stream[i], ctx);
917 } while (ret == EAGAIN);
d056b477
MD
918 }
919 } else {
920 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
921 }
3bd1e081
MD
922 consumer_del_stream(local_stream[i]);
923 num_hup++;
3bd1e081
MD
924 }
925 }
926
927 /* If every buffer FD has hung up, we end the read loop here */
928 if (nb_fd > 0 && num_hup == nb_fd) {
929 DBG("every buffer FD has hung up\n");
930 if (consumer_quit == 1) {
931 goto end;
932 }
933 continue;
934 }
935
936 /* Take care of low priority channels. */
937 if (high_prio == 0) {
938 for (i = 0; i < nb_fd; i++) {
d41f73b7 939 if (pollfd[i].revents & POLLIN) {
3bd1e081 940 DBG("Normal read on fd %d", pollfd[i].fd);
d41f73b7 941 ret = ctx->on_buffer_ready(local_stream[i], ctx);
3bd1e081
MD
942 /* it's ok to have an unavailable subbuffer */
943 if (ret == EAGAIN) {
944 ret = 0;
945 }
946 }
947 }
948 }
949 }
950end:
951 DBG("polling thread exiting");
952 if (pollfd != NULL) {
953 free(pollfd);
954 pollfd = NULL;
955 }
956 if (local_stream != NULL) {
957 free(local_stream);
958 local_stream = NULL;
959 }
960 return NULL;
961}
962
963/*
964 * This thread listens on the consumerd socket and receives the file
965 * descriptors from the session daemon.
966 */
967void *lttng_consumer_thread_receive_fds(void *data)
968{
969 int sock, client_socket, ret;
970 /*
971 * structure to poll for incoming data on communication socket avoids
972 * making blocking sockets.
973 */
974 struct pollfd consumer_sockpoll[2];
975 struct lttng_consumer_local_data *ctx = data;
976
977 DBG("Creating command socket %s", ctx->consumer_command_sock_path);
978 unlink(ctx->consumer_command_sock_path);
979 client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
980 if (client_socket < 0) {
981 ERR("Cannot create command socket");
982 goto end;
983 }
984
985 ret = lttcomm_listen_unix_sock(client_socket);
986 if (ret < 0) {
987 goto end;
988 }
989
32258573 990 DBG("Sending ready command to lttng-sessiond");
3bd1e081
MD
991 ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
992 /* return < 0 on error, but == 0 is not fatal */
993 if (ret < 0) {
32258573 994 ERR("Error sending ready command to lttng-sessiond");
3bd1e081
MD
995 goto end;
996 }
997
998 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
999 if (ret < 0) {
1000 perror("fcntl O_NONBLOCK");
1001 goto end;
1002 }
1003
1004 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1005 consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
1006 consumer_sockpoll[0].events = POLLIN | POLLPRI;
1007 consumer_sockpoll[1].fd = client_socket;
1008 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1009
1010 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1011 goto end;
1012 }
1013 DBG("Connection on client_socket");
1014
1015 /* Blocking call, waiting for transmission */
1016 sock = lttcomm_accept_unix_sock(client_socket);
1017 if (sock <= 0) {
1018 WARN("On accept");
1019 goto end;
1020 }
1021 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
1022 if (ret < 0) {
1023 perror("fcntl O_NONBLOCK");
1024 goto end;
1025 }
1026
1027 /* update the polling structure to poll on the established socket */
1028 consumer_sockpoll[1].fd = sock;
1029 consumer_sockpoll[1].events = POLLIN | POLLPRI;
1030
1031 while (1) {
1032 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
1033 goto end;
1034 }
1035 DBG("Incoming command on sock");
1036 ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
1037 if (ret == -ENOENT) {
1038 DBG("Received STOP command");
1039 goto end;
1040 }
1041 if (ret < 0) {
1042 ERR("Communication interrupted on command socket");
1043 goto end;
1044 }
1045 if (consumer_quit) {
1046 DBG("consumer_thread_receive_fds received quit from signal");
1047 goto end;
1048 }
1049 DBG("received fds on sock");
1050 }
1051end:
1052 DBG("consumer_thread_receive_fds exiting");
1053
1054 /*
1055 * when all fds have hung up, the polling thread
1056 * can exit cleanly
1057 */
1058 consumer_quit = 1;
1059
1060 /*
1061 * 2s of grace period, if no polling events occur during
1062 * this period, the polling thread will exit even if there
1063 * are still open FDs (should not happen, but safety mechanism).
1064 */
1065 consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
1066
1067 /* wake up the polling thread */
1068 ret = write(ctx->consumer_poll_pipe[1], "4", 1);
1069 if (ret < 0) {
1070 perror("poll pipe write");
1071 }
1072 return NULL;
1073}
d41f73b7
MD
1074
1075int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
1076 struct lttng_consumer_local_data *ctx)
1077{
1078 switch (consumer_data.type) {
1079 case LTTNG_CONSUMER_KERNEL:
1080 return lttng_kconsumer_read_subbuffer(stream, ctx);
7753dea8
MD
1081 case LTTNG_CONSUMER32_UST:
1082 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1083 return lttng_ustconsumer_read_subbuffer(stream, ctx);
1084 default:
1085 ERR("Unknown consumer_data type");
1086 assert(0);
1087 return -ENOSYS;
1088 }
1089}
1090
1091int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
1092{
1093 switch (consumer_data.type) {
1094 case LTTNG_CONSUMER_KERNEL:
1095 return lttng_kconsumer_on_recv_stream(stream);
7753dea8
MD
1096 case LTTNG_CONSUMER32_UST:
1097 case LTTNG_CONSUMER64_UST:
d41f73b7
MD
1098 return lttng_ustconsumer_on_recv_stream(stream);
1099 default:
1100 ERR("Unknown consumer_data type");
1101 assert(0);
1102 return -ENOSYS;
1103 }
1104}
e4421fec
DG
1105
1106/*
1107 * Allocate and set consumer data hash tables.
1108 */
1109void lttng_consumer_init(void)
1110{
1111 consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1112 consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
1113}
1114
This page took 0.067217 seconds and 4 git commands to generate.