X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=53c618067f8d6546b91ccc8f77854d7d2b36ed72;hb=d804b36b2b3e7f6d2bcb8f2ff1ace8b04e2a3cfc;hp=8d1a34025687da37b7c3c1555d80441c93d8761d;hpb=ca22feea083301934d1c8511851c86fb008c0697;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 8d1a34025..53c618067 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -56,7 +56,7 @@ int consumer_poll_timeout = -1; * Also updated by the signal handler (consumer_should_exit()). Read by the * polling threads. */ -volatile int consumer_quit = 0; +volatile int consumer_quit; /* * The following two hash tables are visible by all threads which are separated @@ -66,8 +66,8 @@ volatile int consumer_quit = 0; * stream element in this ht should only be updated by the metadata poll thread * for the metadata and the data poll thread for the data. */ -struct lttng_ht *metadata_ht = NULL; -struct lttng_ht *data_ht = NULL; +struct lttng_ht *metadata_ht; +struct lttng_ht *data_ht; /* * Find a stream. The consumer_data.lock must be locked during this @@ -923,6 +923,8 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) if (ret < 0) { PERROR("write consumer quit"); } + + DBG("Consumer flag that it should quit"); } void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, @@ -1084,6 +1086,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) { int ret; + DBG("Consumer destroying it. Closing everything."); + ret = close(ctx->consumer_error_socket); if (ret) { PERROR("close"); @@ -1169,6 +1173,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1268,6 +1274,7 @@ end: if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } + pthread_mutex_unlock(&stream->lock); rcu_read_unlock(); return written; @@ -1307,6 +1314,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1449,6 +1458,7 @@ end: if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } + pthread_mutex_unlock(&stream->lock); rcu_read_unlock(); return written; @@ -1910,8 +1920,9 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* It's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { - rcu_read_unlock(); - goto end; + /* Clean up stream from consumer and free it. */ + lttng_poll_del(&events, stream->wait_fd); + consumer_del_metadata_stream(stream, metadata_ht); } else if (len > 0) { stream->data_read = 1; } @@ -2078,7 +2089,8 @@ void *consumer_thread_data_poll(void *data) len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { - goto end; + /* Clean the stream and free it. */ + consumer_del_stream(local_stream[i], data_ht); } else if (len > 0) { local_stream[i]->data_read = 1; } @@ -2101,7 +2113,8 @@ void *consumer_thread_data_poll(void *data) len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { - goto end; + /* Clean the stream and free it. */ + consumer_del_stream(local_stream[i], data_ht); } else if (len > 0) { local_stream[i]->data_read = 1; } @@ -2451,10 +2464,12 @@ int consumer_data_available(uint64_t id) struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; + struct consumer_relayd_sock_pair *relayd; int (*data_available)(struct lttng_consumer_stream *); DBG("Consumer data available command on session id %" PRIu64, id); + rcu_read_lock(); pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { @@ -2473,7 +2488,8 @@ int consumer_data_available(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct, + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct((void *)((unsigned long) id), 0x42UL), ht->match_fct, (void *)((unsigned long) id), &iter.iter, stream, node_session_id.node) { /* Check the stream for data. */ @@ -2481,9 +2497,26 @@ int consumer_data_available(uint64_t id) if (ret == 0) { goto data_not_available; } - } - /* TODO: Support to ask the relayd if the streams are remote */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + assert(relayd); + + pthread_mutex_lock(&stream->lock); + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->metadata_flag) { + ret = relayd_quiescent_control(&relayd->control_sock); + } else { + ret = relayd_data_available(&relayd->control_sock, + stream->relayd_stream_id, stream->next_net_seq_num); + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + if (ret == 0) { + goto data_not_available; + } + } + } /* * Finding _no_ node in the hash table means that the stream(s) have been @@ -2494,10 +2527,12 @@ int consumer_data_available(uint64_t id) /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); return 1; data_not_available: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); return 0; }