UST consumer: fix read on hangup, and UST get subbuf error handling
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 16 Nov 2011 02:01:47 +0000 (21:01 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 16 Nov 2011 02:01:47 +0000 (21:01 -0500)
This fixes multiple consecutive runs of the "demo" program, where
sometimes one channel would not be saved.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
liblttng-consumer/lttng-consumer.c
liblttng-ustconsumer/lttng-ustconsumer.c

index 293d5f808ea0d2f68d441554563b239911fd02b8..0d8dd00143fb6d75b52b0e53c81088e853abaccc 100644 (file)
@@ -178,7 +178,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        struct lttng_consumer_stream *stream;
        int ret;
 
        struct lttng_consumer_stream *stream;
        int ret;
 
-       stream = malloc(sizeof(*stream));
+       stream = zmalloc(sizeof(*stream));
        if (stream == NULL) {
                perror("malloc struct lttng_consumer_stream");
                goto end;
        if (stream == NULL) {
                perror("malloc struct lttng_consumer_stream");
                goto end;
@@ -574,7 +574,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                consumer_data.type == type);
        consumer_data.type = type;
 
                consumer_data.type == type);
        consumer_data.type = type;
 
-       ctx = malloc(sizeof(struct lttng_consumer_local_data));
+       ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
        if (ctx == NULL) {
                perror("allocating context");
                goto error;
        if (ctx == NULL) {
                perror("allocating context");
                goto error;
@@ -760,7 +760,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
        int tmp2;
        struct lttng_consumer_local_data *ctx = data;
 
        int tmp2;
        struct lttng_consumer_local_data *ctx = data;
 
-       local_stream = malloc(sizeof(struct lttng_consumer_stream));
+       local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
 
        while (1) {
                high_prio = 0;
 
        while (1) {
                high_prio = 0;
@@ -782,7 +782,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        }
 
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
                        }
 
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
-                       pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                perror("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                        if (pollfd == NULL) {
                                perror("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
@@ -790,7 +790,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        }
 
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
                        }
 
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
-                       local_stream = malloc((consumer_data.stream_count + 1) *
+                       local_stream = zmalloc((consumer_data.stream_count + 1) *
                                        sizeof(struct lttng_consumer_stream));
                        if (local_stream == NULL) {
                                perror("local_stream malloc");
                                        sizeof(struct lttng_consumer_stream));
                        if (local_stream == NULL) {
                                perror("local_stream malloc");
@@ -867,12 +867,10 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                                pollfd[i].fd);
                                        if (!local_stream[i]->hangup_flush_done) {
                                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
                                                pollfd[i].fd);
                                        if (!local_stream[i]->hangup_flush_done) {
                                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
-                                               /* try reading after flush */
-                                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
-                                               /* it's ok to have an unavailable sub-buffer */
-                                               if (ret == EAGAIN) {
-                                                       ret = 0;
-                                               }
+                                               /* read after flush */
+                                               do {
+                                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                                               } while (ret == EAGAIN);
                                        }
                                } else {
                                        DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                        }
                                } else {
                                        DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
index f938d3548d9e8ed4ef049333efe1fc1aa16d1a0c..89dbefa361380ad2c317bf161a6d65cc7baae0e5 100644 (file)
@@ -287,6 +287,7 @@ int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
        ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
        ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
+       stream->hangup_flush_done = 1;
 }
 
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
 }
 
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
@@ -343,12 +344,14 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                stream->wait_fd, stream->key);
 
        /* We can consume the 1 byte written into the wait_fd by UST */
                stream->wait_fd, stream->key);
 
        /* We can consume the 1 byte written into the wait_fd by UST */
-       do {
-               readlen = read(stream->wait_fd, &dummy, 1);
-       } while (readlen == -1 && errno == -EINTR);
-       if (readlen == -1) {
-               ret = readlen;
-               goto end;
+       if (!stream->hangup_flush_done) {
+               do {
+                       readlen = read(stream->wait_fd, &dummy, 1);
+               } while (readlen == -1 && errno == -EINTR);
+               if (readlen == -1) {
+                       ret = readlen;
+                       goto end;
+               }
        }
 
        buf = stream->buf;
        }
 
        buf = stream->buf;
@@ -356,7 +359,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        /* Get the next subbuffer */
        err = ustctl_get_next_subbuf(handle, buf);
        if (err != 0) {
        /* Get the next subbuffer */
        err = ustctl_get_next_subbuf(handle, buf);
        if (err != 0) {
-               ret = errno;
+               ret = -ret;     /* ustctl_get_next_subbuf returns negative, caller expect positive. */
                /*
                 * This is a debug message even for single-threaded consumer,
                 * because poll() have more relaxed criterions than get subbuf,
                /*
                 * This is a debug message even for single-threaded consumer,
                 * because poll() have more relaxed criterions than get subbuf,
@@ -370,11 +373,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        assert(stream->output == LTTNG_EVENT_MMAP);
        /* read the used subbuffer size */
        err = ustctl_get_padded_subbuf_size(handle, buf, &len);
        assert(stream->output == LTTNG_EVENT_MMAP);
        /* read the used subbuffer size */
        err = ustctl_get_padded_subbuf_size(handle, buf, &len);
-       if (err != 0) {
-               ret = errno;
-               perror("Getting sub-buffer len failed.");
-               goto end;
-       }
+       assert(err == 0);
        /* write the subbuffer to the tracefile */
        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
        if (ret < 0) {
        /* write the subbuffer to the tracefile */
        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
        if (ret < 0) {
@@ -385,16 +384,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                ERR("Error writing to tracefile");
        }
        err = ustctl_put_next_subbuf(handle, buf);
                ERR("Error writing to tracefile");
        }
        err = ustctl_put_next_subbuf(handle, buf);
-       if (err != 0) {
-               ret = errno;
-               if (errno == EFAULT) {
-                       perror("Error in unreserving sub buffer\n");
-               } else if (errno == EIO) {
-                       /* Should never happen with newer LTTng versions */
-                       perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
-               }
-               goto end;
-       }
+       assert(err == 0);
 end:
        return ret;
 }
 end:
        return ret;
 }
This page took 0.027994 seconds and 4 git commands to generate.