From: Mathieu Desnoyers Date: Wed, 16 Nov 2011 02:01:47 +0000 (-0500) Subject: UST consumer: fix read on hangup, and UST get subbuf error handling X-Git-Tag: v2.0-pre15~107 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=effcf122563da70fc22a5dcadf4fce3476035c54 UST consumer: fix read on hangup, and UST get subbuf error handling This fixes multiple consecutive runs of the "demo" program, where sometimes one channel would not be saved. Signed-off-by: Mathieu Desnoyers --- diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index 293d5f808..0d8dd0014 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -178,7 +178,7 @@ struct lttng_consumer_stream *consumer_allocate_stream( struct lttng_consumer_stream *stream; int ret; - stream = malloc(sizeof(*stream)); + stream = zmalloc(sizeof(*stream)); if (stream == NULL) { perror("malloc struct lttng_consumer_stream"); goto end; @@ -574,7 +574,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( consumer_data.type == type); consumer_data.type = type; - ctx = malloc(sizeof(struct lttng_consumer_local_data)); + ctx = zmalloc(sizeof(struct lttng_consumer_local_data)); if (ctx == NULL) { perror("allocating context"); goto error; @@ -760,7 +760,7 @@ void *lttng_consumer_thread_poll_fds(void *data) int tmp2; struct lttng_consumer_local_data *ctx = data; - local_stream = malloc(sizeof(struct lttng_consumer_stream)); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); while (1) { high_prio = 0; @@ -782,7 +782,7 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* allocate for all fds + 1 for the consumer_poll_pipe */ - pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { perror("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); @@ -790,7 +790,7 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* allocate for all fds + 1 for the consumer_poll_pipe */ - local_stream = malloc((consumer_data.stream_count + 1) * + local_stream = zmalloc((consumer_data.stream_count + 1) * sizeof(struct lttng_consumer_stream)); if (local_stream == NULL) { perror("local_stream malloc"); @@ -867,12 +867,10 @@ void *lttng_consumer_thread_poll_fds(void *data) pollfd[i].fd); if (!local_stream[i]->hangup_flush_done) { lttng_ustconsumer_on_stream_hangup(local_stream[i]); - /* try reading after flush */ - ret = ctx->on_buffer_ready(local_stream[i], ctx); - /* it's ok to have an unavailable sub-buffer */ - if (ret == EAGAIN) { - ret = 0; - } + /* read after flush */ + do { + ret = ctx->on_buffer_ready(local_stream[i], ctx); + } while (ret == EAGAIN); } } else { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); diff --git a/liblttng-ustconsumer/lttng-ustconsumer.c b/liblttng-ustconsumer/lttng-ustconsumer.c index f938d3548..89dbefa36 100644 --- a/liblttng-ustconsumer/lttng-ustconsumer.c +++ b/liblttng-ustconsumer/lttng-ustconsumer.c @@ -287,6 +287,7 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) { ustctl_flush_buffer(stream->chan->handle, stream->buf, 0); + stream->hangup_flush_done = 1; } void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) @@ -343,12 +344,14 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, stream->wait_fd, stream->key); /* We can consume the 1 byte written into the wait_fd by UST */ - do { - readlen = read(stream->wait_fd, &dummy, 1); - } while (readlen == -1 && errno == -EINTR); - if (readlen == -1) { - ret = readlen; - goto end; + if (!stream->hangup_flush_done) { + do { + readlen = read(stream->wait_fd, &dummy, 1); + } while (readlen == -1 && errno == -EINTR); + if (readlen == -1) { + ret = readlen; + goto end; + } } buf = stream->buf; @@ -356,7 +359,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = ustctl_get_next_subbuf(handle, buf); if (err != 0) { - ret = errno; + ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -370,11 +373,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, assert(stream->output == LTTNG_EVENT_MMAP); /* read the used subbuffer size */ err = ustctl_get_padded_subbuf_size(handle, buf, &len); - if (err != 0) { - ret = errno; - perror("Getting sub-buffer len failed."); - goto end; - } + assert(err == 0); /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); if (ret < 0) { @@ -385,16 +384,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ERR("Error writing to tracefile"); } err = ustctl_put_next_subbuf(handle, buf); - if (err != 0) { - ret = errno; - if (errno == EFAULT) { - perror("Error in unreserving sub buffer\n"); - } else if (errno == EIO) { - /* Should never happen with newer LTTng versions */ - perror("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - goto end; - } + assert(err == 0); end: return ret; }