Fix: Delete stream on write error in consumer
[lttng-tools.git] / src / common / consumer.c
index 8d1a34025687da37b7c3c1555d80441c93d8761d..53c618067f8d6546b91ccc8f77854d7d2b36ed72 100644 (file)
@@ -56,7 +56,7 @@ int consumer_poll_timeout = -1;
  * Also updated by the signal handler (consumer_should_exit()). Read by the
  * polling threads.
  */
-volatile int consumer_quit = 0;
+volatile int consumer_quit;
 
 /*
  * The following two hash tables are visible by all threads which are separated
@@ -66,8 +66,8 @@ volatile int consumer_quit = 0;
  * stream element in this ht should only be updated by the metadata poll thread
  * for the metadata and the data poll thread for the data.
  */
-struct lttng_ht *metadata_ht = NULL;
-struct lttng_ht *data_ht = NULL;
+struct lttng_ht *metadata_ht;
+struct lttng_ht *data_ht;
 
 /*
  * Find a stream. The consumer_data.lock must be locked during this
@@ -923,6 +923,8 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
        if (ret < 0) {
                PERROR("write consumer quit");
        }
+
+       DBG("Consumer flag that it should quit");
 }
 
 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
@@ -1084,6 +1086,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 {
        int ret;
 
+       DBG("Consumer destroying it. Closing everything.");
+
        ret = close(ctx->consumer_error_socket);
        if (ret) {
                PERROR("close");
@@ -1169,6 +1173,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
+       pthread_mutex_lock(&stream->lock);
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1268,6 +1274,7 @@ end:
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
+       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1307,6 +1314,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
+       pthread_mutex_lock(&stream->lock);
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1449,6 +1458,7 @@ end:
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
+       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1910,8 +1920,9 @@ restart:
                                len = ctx->on_buffer_ready(stream, ctx);
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       rcu_read_unlock();
-                                       goto end;
+                                       /* Clean up stream from consumer and free it. */
+                                       lttng_poll_del(&events, stream->wait_fd);
+                                       consumer_del_metadata_stream(stream, metadata_ht);
                                } else if (len > 0) {
                                        stream->data_read = 1;
                                }
@@ -2078,7 +2089,8 @@ void *consumer_thread_data_poll(void *data)
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       goto end;
+                                       /* Clean the stream and free it. */
+                                       consumer_del_stream(local_stream[i], data_ht);
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2101,7 +2113,8 @@ void *consumer_thread_data_poll(void *data)
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       goto end;
+                                       /* Clean the stream and free it. */
+                                       consumer_del_stream(local_stream[i], data_ht);
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
                                }
@@ -2451,10 +2464,12 @@ int consumer_data_available(uint64_t id)
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
+       struct consumer_relayd_sock_pair *relayd;
        int (*data_available)(struct lttng_consumer_stream *);
 
        DBG("Consumer data available command on session id %" PRIu64, id);
 
+       rcu_read_lock();
        pthread_mutex_lock(&consumer_data.lock);
 
        switch (consumer_data.type) {
@@ -2473,7 +2488,8 @@ int consumer_data_available(uint64_t id)
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
-       cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct,
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct((void *)((unsigned long) id), 0x42UL),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* Check the stream for data. */
@@ -2481,9 +2497,26 @@ int consumer_data_available(uint64_t id)
                if (ret == 0) {
                        goto data_not_available;
                }
-       }
 
-       /* TODO: Support to ask the relayd if the streams are remote */
+               if (stream->net_seq_idx != -1) {
+                       relayd = consumer_find_relayd(stream->net_seq_idx);
+                       assert(relayd);
+
+                       pthread_mutex_lock(&stream->lock);
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       if (stream->metadata_flag) {
+                               ret = relayd_quiescent_control(&relayd->control_sock);
+                       } else {
+                               ret = relayd_data_available(&relayd->control_sock,
+                                               stream->relayd_stream_id, stream->next_net_seq_num);
+                       }
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       pthread_mutex_unlock(&stream->lock);
+                       if (ret == 0) {
+                               goto data_not_available;
+                       }
+               }
+       }
 
        /*
         * Finding _no_ node in the hash table means that the stream(s) have been
@@ -2494,10 +2527,12 @@ int consumer_data_available(uint64_t id)
 
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
+       rcu_read_unlock();
        return 1;
 
 data_not_available:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
+       rcu_read_unlock();
        return 0;
 }
This page took 0.025236 seconds and 4 git commands to generate.