Fix: Delete stream on write error in consumer
authorDavid Goulet <dgoulet@efficios.com>
Wed, 24 Oct 2012 16:40:59 +0000 (12:40 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Wed, 24 Oct 2012 16:40:59 +0000 (12:40 -0400)
Whenever a write() error occured, both on network or local trace file,
the consumer thread just died considering the error a fatal one.

This commit fixes that by simply deleting the stream, removing it from
the poll set of the thread and freeing it.

Furthermore, on a write() error, a SIGPIPE is usually raised if the FD
is invalid. The consumer is catching this signal but was initiating a
should exit action thus ultimately cleaning all threads. We now still
catch SIGPIPE but don't set the should quit flag so normal cleanup can
be done by the threads.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-consumerd/lttng-consumerd.c
src/common/consumer.c

index 1cc11d2d5898a35d2278826618a20042eb818a25..4d02c842798f65c1024b9326a248493ea60204af 100644 (file)
@@ -79,6 +79,14 @@ static void sighandler(int sig)
                return;
        }
 
                return;
        }
 
+       /*
+        * Ignore SIGPIPE because it should not stop the consumer whenever a
+        * SIGPIPE is catched through a FD operation.
+        */
+       if (sig == SIGPIPE) {
+               return;
+       }
+
        lttng_consumer_should_exit(ctx);
 }
 
        lttng_consumer_should_exit(ctx);
 }
 
index 295e7be5dab1b533143e4ce2fa6a5be16ddb1c5d..53c618067f8d6546b91ccc8f77854d7d2b36ed72 100644 (file)
@@ -923,6 +923,8 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
        if (ret < 0) {
                PERROR("write consumer quit");
        }
        if (ret < 0) {
                PERROR("write consumer quit");
        }
+
+       DBG("Consumer flag that it should quit");
 }
 
 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
 }
 
 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
@@ -1084,6 +1086,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 {
        int ret;
 
 {
        int ret;
 
+       DBG("Consumer destroying it. Closing everything.");
+
        ret = close(ctx->consumer_error_socket);
        if (ret) {
                PERROR("close");
        ret = close(ctx->consumer_error_socket);
        if (ret) {
                PERROR("close");
@@ -1916,8 +1920,9 @@ restart:
                                len = ctx->on_buffer_ready(stream, ctx);
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                len = ctx->on_buffer_ready(stream, ctx);
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       rcu_read_unlock();
-                                       goto end;
+                                       /* Clean up stream from consumer and free it. */
+                                       lttng_poll_del(&events, stream->wait_fd);
+                                       consumer_del_metadata_stream(stream, metadata_ht);
                                } else if (len > 0) {
                                        stream->data_read = 1;
                                }
                                } else if (len > 0) {
                                        stream->data_read = 1;
                                }
@@ -2084,7 +2089,8 @@ void *consumer_thread_data_poll(void *data)
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       goto end;
+                                       /* Clean the stream and free it. */
+                                       consumer_del_stream(local_stream[i], data_ht);
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2107,7 +2113,8 @@ void *consumer_thread_data_poll(void *data)
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       goto end;
+                                       /* Clean the stream and free it. */
+                                       consumer_del_stream(local_stream[i], data_ht);
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
This page took 0.029507 seconds and 4 git commands to generate.