From d41f73b7cb50d57974bd1fad22342abaf9718566 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Sat, 5 Nov 2011 13:45:35 -0400 Subject: [PATCH] UST consumer: implement missing ust-specific callbacks + fixes. Also fix consumer handling of poll fd: a mask should be used. Also fix UST stream output passed to the consumer (the fix is a hack currently, left a FIXME). Signed-off-by: Mathieu Desnoyers --- include/lttng-share.h | 2 +- include/lttng/lttng-consumer.h | 10 +- include/lttng/lttng-kconsumer.h | 5 + include/lttng/lttng-ustconsumer.h | 17 +++ liblttng-consumer/lttng-consumer.c | 74 ++++++++---- liblttng-kconsumer/lttng-kconsumer.c | 138 ++++++++++++++++++++++ liblttng-ustconsumer/lttng-ustconsumer.c | 96 ++++++++++++++++ lttng-consumerd/lttng-consumerd.c | 139 +---------------------- lttng-sessiond/trace-ust.c | 2 +- lttng-sessiond/ust-consumer.c | 6 +- 10 files changed, 323 insertions(+), 166 deletions(-) diff --git a/include/lttng-share.h b/include/lttng-share.h index cac819d63..708a1f009 100644 --- a/include/lttng-share.h +++ b/include/lttng-share.h @@ -56,7 +56,7 @@ /* Must be a power of 2 */ #define DEFAULT_UST_CHANNEL_SUBBUF_NUM 4 /* See lttng-ust.h enum lttng_ust_output */ -#define DEFAULT_UST_CHANNEL_OUTPUT LTTNG_UST_MMAP +#define DEFAULT_UST_CHANNEL_OUTPUT LTTNG_EVENT_MMAP /* * Default timeout value for the sem_timedwait() call. Blocking forever is not diff --git a/include/lttng/lttng-consumer.h b/include/lttng/lttng-consumer.h index 1a6afc22a..256293b81 100644 --- a/include/lttng/lttng-consumer.h +++ b/include/lttng/lttng-consumer.h @@ -126,7 +126,8 @@ struct lttng_consumer_stream { */ struct lttng_consumer_local_data { /* function to call when data is available on a buffer */ - int (*on_buffer_ready)(struct lttng_consumer_stream *stream); + int (*on_buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -277,7 +278,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel); extern struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, - int (*buffer_ready)(struct lttng_consumer_stream *stream), + int (*buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(int sessiond_key, uint32_t state)); @@ -299,4 +301,8 @@ extern void *lttng_consumer_thread_receive_fds(void *data); extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); +int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx); +int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); + #endif /* _LTTNG_CONSUMER_H */ diff --git a/include/lttng/lttng-kconsumer.h b/include/lttng/lttng-kconsumer.h index 764a3ef43..4753b5c59 100644 --- a/include/lttng/lttng-kconsumer.h +++ b/include/lttng/lttng-kconsumer.h @@ -61,4 +61,9 @@ int lttng_kconsumer_get_produced_snapshot( int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); + +int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx); +int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream); + #endif /* _LTTNG_KCONSUMER_H */ diff --git a/include/lttng/lttng-ustconsumer.h b/include/lttng/lttng-ustconsumer.h index eacff654f..90d6588b9 100644 --- a/include/lttng/lttng-ustconsumer.h +++ b/include/lttng/lttng-ustconsumer.h @@ -66,6 +66,10 @@ extern void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan); extern int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream); extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream); +int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx); +int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream); + #else /* HAVE_LIBLTTNG_UST_CTL */ static inline @@ -129,6 +133,19 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) { } +static inline +int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + return -ENOSYS; +} + +static inline +int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + return -ENOSYS; +} + #endif /* HAVE_LIBLTTNG_UST_CTL */ #endif /* _LTTNG_USTCONSUMER_H */ diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index 2fcb39a27..7644eac2c 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -546,7 +546,8 @@ void lttng_consumer_sync_trace_file( */ struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, - int (*buffer_ready)(struct lttng_consumer_stream *stream), + int (*buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(int stream_key, uint32_t state)) @@ -806,7 +807,7 @@ void *lttng_consumer_thread_poll_fds(void *data) goto end; } - /* No FDs and consumer_quit, kconsumer_cleanup the thread */ + /* No FDs and consumer_quit, consumer_cleanup the thread */ if (nb_fd == 0 && consumer_quit == 1) { goto end; } @@ -817,42 +818,38 @@ void *lttng_consumer_thread_poll_fds(void *data) * array. We want to prioritize array update over * low-priority reads. */ - if (pollfd[nb_fd].revents == POLLIN) { + if (pollfd[nb_fd].revents & POLLIN) { DBG("consumer_poll_pipe wake up"); tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1); if (tmp2 < 0) { - perror("read kconsumer poll"); + perror("read consumer poll"); } continue; } /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { - switch(pollfd[i].revents) { - case POLLERR: + if (pollfd[i].revents & POLLPRI) { + DBG("Urgent read on fd %d", pollfd[i].fd); + high_prio = 1; + ret = ctx->on_buffer_ready(local_stream[i], ctx); + /* it's ok to have an unavailable sub-buffer */ + if (ret == EAGAIN) { + ret = 0; + } + } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); consumer_del_stream(local_stream[i]); num_hup++; - break; - case POLLHUP: - DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); + } else if (pollfd[i].revents & POLLNVAL) { + ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); consumer_del_stream(local_stream[i]); num_hup++; - break; - case POLLNVAL: - ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); + } else if ((pollfd[i].revents & POLLHUP && + !(pollfd[i].revents & POLLIN))) { + DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); consumer_del_stream(local_stream[i]); num_hup++; - break; - case POLLPRI: - DBG("Urgent read on fd %d", pollfd[i].fd); - high_prio = 1; - ret = ctx->on_buffer_ready(local_stream[i]); - /* it's ok to have an unavailable sub-buffer */ - if (ret == EAGAIN) { - ret = 0; - } - break; } } @@ -868,9 +865,9 @@ void *lttng_consumer_thread_poll_fds(void *data) /* Take care of low priority channels. */ if (high_prio == 0) { for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents == POLLIN) { + if (pollfd[i].revents & POLLIN) { DBG("Normal read on fd %d", pollfd[i].fd); - ret = ctx->on_buffer_ready(local_stream[i]); + ret = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable subbuffer */ if (ret == EAGAIN) { ret = 0; @@ -1003,3 +1000,32 @@ end: } return NULL; } + +int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_read_subbuffer(stream, ctx); + case LTTNG_CONSUMER_UST: + return lttng_ustconsumer_read_subbuffer(stream, ctx); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + +int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_on_recv_stream(stream); + case LTTNG_CONSUMER_UST: + return lttng_ustconsumer_on_recv_stream(stream); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} diff --git a/liblttng-kconsumer/lttng-kconsumer.c b/liblttng-kconsumer/lttng-kconsumer.c index 778dd1631..e9861f20f 100644 --- a/liblttng-kconsumer/lttng-kconsumer.c +++ b/liblttng-kconsumer/lttng-kconsumer.c @@ -300,3 +300,141 @@ end: end_nosignal: return 0; } + +/* + * Consume data on a file descriptor and write it on a trace file. + */ +int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + unsigned long len; + int err; + long ret = 0; + int infd = stream->wait_fd; + + DBG("In read_subbuffer (infd : %d)", infd); + /* Get the next subbuffer */ + err = kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + /* + * This is a debug message even for single-threaded consumer, + * because poll() have more relaxed criterions than get subbuf, + * so get_subbuf may fail for short race windows where poll() + * would issue wakeups. + */ + DBG("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + + switch (stream->output) { + case LTTNG_EVENT_SPLICE: + /* read the whole subbuffer */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + + /* splice the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile"); + } + break; + case LTTNG_EVENT_MMAP: + /* read the used subbuffer size */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + /* write the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error writing to tracefile"); + } + break; + default: + ERR("Unknown output method"); + ret = -1; + } + + err = kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + if (errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if (errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto end; + } + +end: + return ret; +} + +int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + /* Opening the tracefile in write mode */ + if (stream->path_name != NULL) { + ret = open(stream->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", stream->path_name); + perror("open"); + goto error; + } + stream->out_fd = ret; + } + + if (stream->output == LTTNG_EVENT_MMAP) { + /* get the len of the mmap region */ + unsigned long mmap_len; + + ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_len"); + goto error_close_fd; + } + stream->mmap_len = (size_t) mmap_len; + + stream->mmap_base = mmap(NULL, stream->mmap_len, + PROT_READ, MAP_PRIVATE, stream->wait_fd, 0); + if (stream->mmap_base == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto error_close_fd; + } + } + + /* we return 0 to let the library handle the FD internally */ + return 0; + +error_close_fd: + { + int err; + + err = close(stream->out_fd); + assert(!err); + } +error: + return ret; +} + diff --git a/liblttng-ustconsumer/lttng-ustconsumer.c b/liblttng-ustconsumer/lttng-ustconsumer.c index db0ba05bd..29f735249 100644 --- a/liblttng-ustconsumer/lttng-ustconsumer.c +++ b/liblttng-ustconsumer/lttng-ustconsumer.c @@ -208,6 +208,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, fds[0], fds[1]); + assert(msg.u.stream.output == LTTNG_EVENT_MMAP); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fds[0], fds[1], @@ -317,3 +318,98 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) { ustctl_close_stream_read(stream->chan->handle, stream->buf); } + + +int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + unsigned long len; + int err; + long ret = 0; + struct lttng_ust_shm_handle *handle; + struct lttng_ust_lib_ring_buffer *buf; + char dummy; + ssize_t readlen; + + DBG("In read_subbuffer (wait_fd: %d, stream key: %d)", + stream->wait_fd, stream->key); + + /* We can consume the 1 byte written into the wait_fd by UST */ + do { + readlen = read(stream->wait_fd, &dummy, 1); + } while (readlen == -1 && errno == -EINTR); + if (readlen == -1) { + ret = readlen; + goto end; + } + + buf = stream->buf; + handle = stream->chan->handle; + /* Get the next subbuffer */ + err = ustctl_get_next_subbuf(handle, buf); + if (err != 0) { + ret = errno; + /* + * This is a debug message even for single-threaded consumer, + * because poll() have more relaxed criterions than get subbuf, + * so get_subbuf may fail for short race windows where poll() + * would issue wakeups. + */ + DBG("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + assert(stream->output == LTTNG_EVENT_MMAP); + /* read the used subbuffer size */ + err = ustctl_get_padded_subbuf_size(handle, buf, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + /* write the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error writing to tracefile"); + } + err = ustctl_put_next_subbuf(handle, buf); + if (err != 0) { + ret = errno; + if (errno == EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if (errno == EIO) { + /* Should never happen with newer LTTng versions */ + perror("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + goto end; + } +end: + return ret; +} + +int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + /* Opening the tracefile in write mode */ + if (stream->path_name != NULL) { + ret = open(stream->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", stream->path_name); + perror("open"); + goto error; + } + stream->out_fd = ret; + } + + /* we return 0 to let the library handle the FD internally */ + return 0; + +error: + return ret; +} diff --git a/lttng-consumerd/lttng-consumerd.c b/lttng-consumerd/lttng-consumerd.c index c85914e7f..9bc86b0b3 100644 --- a/lttng-consumerd/lttng-consumerd.c +++ b/lttng-consumerd/lttng-consumerd.c @@ -219,142 +219,6 @@ static void parse_args(int argc, char **argv) } } -/* - * Consume data on a file descriptor and write it on a trace file. - */ -static int read_subbuffer(struct lttng_consumer_stream *stream) -{ - unsigned long len; - int err; - long ret = 0; - int infd = stream->wait_fd; - - DBG("In read_subbuffer (infd : %d)", infd); - /* Get the next subbuffer */ - err = kernctl_get_next_subbuf(infd); - if (err != 0) { - ret = errno; - /* - * This is a debug message even for single-threaded consumer, - * because poll() have more relaxed criterions than get subbuf, - * so get_subbuf may fail for short race windows where poll() - * would issue wakeups. - */ - DBG("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency)"); - goto end; - } - - switch (stream->output) { - case LTTNG_EVENT_SPLICE: - /* read the whole subbuffer */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } - - /* splice the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); - if (ret < 0) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile"); - } - break; - case LTTNG_EVENT_MMAP: - /* read the used subbuffer size */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret < 0) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error writing to tracefile"); - } - break; - default: - ERR("Unknown output method"); - ret = -1; - } - - err = kernctl_put_next_subbuf(infd); - if (err != 0) { - ret = errno; - if (errno == EFAULT) { - perror("Error in unreserving sub buffer\n"); - } else if (errno == EIO) { - /* Should never happen with newer LTTng versions */ - perror("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - goto end; - } - -end: - return ret; -} - -static int on_recv_stream(struct lttng_consumer_stream *stream) -{ - int ret; - - /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { - ret = open(stream->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); - if (ret < 0) { - ERR("Opening %s", stream->path_name); - perror("open"); - goto error; - } - stream->out_fd = ret; - } - - if (stream->output == LTTNG_EVENT_MMAP) { - /* get the len of the mmap region */ - unsigned long mmap_len; - - ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_len"); - goto error_close_fd; - } - stream->mmap_len = (size_t) mmap_len; - - stream->mmap_base = mmap(NULL, stream->mmap_len, - PROT_READ, MAP_PRIVATE, stream->wait_fd, 0); - if (stream->mmap_base == MAP_FAILED) { - perror("Error mmaping"); - ret = -1; - goto error_close_fd; - } - } - - /* we return 0 to let the library handle the FD internally */ - return 0; - -error_close_fd: - { - int err; - - err = close(stream->out_fd); - assert(!err); - } -error: - return ret; -} - /* * main */ @@ -384,7 +248,8 @@ int main(int argc, char **argv) USTCONSUMERD_CMD_SOCK_PATH); } /* create the consumer instance with and assign the callbacks */ - ctx = lttng_consumer_create(opt_type, read_subbuffer, NULL, on_recv_stream, NULL); + ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer, + NULL, lttng_consumer_on_recv_stream, NULL); if (ctx == NULL) { goto error; } diff --git a/lttng-sessiond/trace-ust.c b/lttng-sessiond/trace-ust.c index e912c53c7..3e1055a8b 100644 --- a/lttng-sessiond/trace-ust.c +++ b/lttng-sessiond/trace-ust.c @@ -250,7 +250,7 @@ struct ltt_ust_metadata *trace_ust_create_metadata(char *path) lum->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM; lum->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER; lum->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER; - lum->attr.output = DEFAULT_UST_CHANNEL_OUTPUT; + lum->attr.output = LTTNG_UST_MMAP; lum->handle = -1; /* Set metadata trace path */ diff --git a/lttng-sessiond/ust-consumer.c b/lttng-sessiond/ust-consumer.c index b83fcc435..ca008d4ad 100644 --- a/lttng-sessiond/ust-consumer.c +++ b/lttng-sessiond/ust-consumer.c @@ -75,7 +75,11 @@ static int send_channel_streams(int sock, lum.u.stream.channel_key = uchan->obj->shm_fd; lum.u.stream.stream_key = stream->obj->shm_fd; lum.u.stream.state = LTTNG_CONSUMER_ACTIVE_STREAM; - lum.u.stream.output = uchan->attr.output; + /* + * FIXME Hack alert! we force MMAP for now. Mixup + * between EVENT and UST enums elsewhere. + */ + lum.u.stream.output = DEFAULT_UST_CHANNEL_OUTPUT; lum.u.stream.mmap_len = stream->obj->memory_map_size; strncpy(lum.u.stream.path_name, stream->pathname, PATH_MAX - 1); lum.u.stream.path_name[PATH_MAX - 1] = '\0'; -- 2.34.1