Fix: Delete stream on write error in consumer
[lttng-tools.git] / src / common / consumer.c
index 8d897e5df1e20e20c67ddbc6ed8f99af7e7de2aa..53c618067f8d6546b91ccc8f77854d7d2b36ed72 100644 (file)
@@ -56,7 +56,7 @@ int consumer_poll_timeout = -1;
  * Also updated by the signal handler (consumer_should_exit()). Read by the
  * polling threads.
  */
-volatile int consumer_quit = 0;
+volatile int consumer_quit;
 
 /*
  * The following two hash tables are visible by all threads which are separated
@@ -66,8 +66,8 @@ volatile int consumer_quit = 0;
  * stream element in this ht should only be updated by the metadata poll thread
  * for the metadata and the data poll thread for the data.
  */
-struct lttng_ht *metadata_ht = NULL;
-struct lttng_ht *data_ht = NULL;
+struct lttng_ht *metadata_ht;
+struct lttng_ht *data_ht;
 
 /*
  * Find a stream. The consumer_data.lock must be locked during this
@@ -923,6 +923,8 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
        if (ret < 0) {
                PERROR("write consumer quit");
        }
+
+       DBG("Consumer flag that it should quit");
 }
 
 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
@@ -1084,6 +1086,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 {
        int ret;
 
+       DBG("Consumer destroying it. Closing everything.");
+
        ret = close(ctx->consumer_error_socket);
        if (ret) {
                PERROR("close");
@@ -1266,11 +1270,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
 end:
-       pthread_mutex_unlock(&stream->lock);
        /* Unlock only if ctrl socket used */
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
+       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1451,10 +1455,10 @@ splice_error:
        }
 
 end:
-       pthread_mutex_unlock(&stream->lock);
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
+       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1916,8 +1920,9 @@ restart:
                                len = ctx->on_buffer_ready(stream, ctx);
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       rcu_read_unlock();
-                                       goto end;
+                                       /* Clean up stream from consumer and free it. */
+                                       lttng_poll_del(&events, stream->wait_fd);
+                                       consumer_del_metadata_stream(stream, metadata_ht);
                                } else if (len > 0) {
                                        stream->data_read = 1;
                                }
@@ -2084,7 +2089,8 @@ void *consumer_thread_data_poll(void *data)
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       goto end;
+                                       /* Clean the stream and free it. */
+                                       consumer_del_stream(local_stream[i], data_ht);
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2107,7 +2113,8 @@ void *consumer_thread_data_poll(void *data)
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       goto end;
+                                       /* Clean the stream and free it. */
+                                       consumer_del_stream(local_stream[i], data_ht);
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2462,6 +2469,7 @@ int consumer_data_available(uint64_t id)
 
        DBG("Consumer data available command on session id %" PRIu64, id);
 
+       rcu_read_lock();
        pthread_mutex_lock(&consumer_data.lock);
 
        switch (consumer_data.type) {
@@ -2477,8 +2485,6 @@ int consumer_data_available(uint64_t id)
                assert(0);
        }
 
-       rcu_read_lock();
-
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
This page took 0.025329 seconds and 4 git commands to generate.