From 43c34bc328e6970b298c9f5cd661e2ca648ebf16 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Thu, 11 Oct 2012 16:48:57 -0400 Subject: [PATCH] Make stream hash tables global to the consumer The data stream hash table is now global to the consumer and used in the data thread. The consumer_data stream_ht is no longer used to track the data streams but instead will be used (and possibly renamed) by the session daemon poll thread to keep track of streams on a per session id basis for the upcoming feature that check traced data availability. For now, in order to avoid mind bugging problems to access the streams, both hash table are now defined globally (metadata and data). However, stream update are still done in a single thread. Don't count on this to be guaranteed in the next commits. Signed-off-by: David Goulet --- src/common/consumer.c | 91 +++++++++++++++++++++++++++++++++---------- src/common/consumer.h | 9 ++--- 2 files changed, 75 insertions(+), 25 deletions(-) diff --git a/src/common/consumer.c b/src/common/consumer.c index be78e256f..0b2f07391 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -58,6 +58,17 @@ int consumer_poll_timeout = -1; */ volatile int consumer_quit = 0; +/* + * The following two hash tables are visible by all threads which are separated + * in different source files. + * + * Global hash table containing respectively metadata and data streams. The + * stream element in this ht should only be updated by the metadata poll thread + * for the metadata and the data poll thread for the data. + */ +struct lttng_ht *metadata_ht = NULL; +struct lttng_ht *data_ht = NULL; + /* * Find a stream. The consumer_data.lock must be locked during this * call. @@ -433,19 +444,24 @@ end: /* * Add a stream to the global list protected by a mutex. */ -int consumer_add_stream(struct lttng_consumer_stream *stream) +static int consumer_add_stream(struct lttng_consumer_stream *stream, + struct lttng_ht *ht) { int ret = 0; struct consumer_relayd_sock_pair *relayd; assert(stream); + assert(ht); DBG3("Adding consumer stream %d", stream->key); pthread_mutex_lock(&consumer_data.lock); rcu_read_lock(); - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); + /* Steal stream identifier to avoid having streams with the same key */ + consumer_steal_stream_key(stream->key, ht); + + lttng_ht_add_unique_ulong(ht, &stream->node); /* Check and cleanup relayd */ relayd = consumer_find_relayd(stream->net_seq_idx); @@ -783,9 +799,9 @@ end: * * Returns the number of fds in the structures. */ -int consumer_update_poll_array( +static int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_stream) + struct lttng_consumer_stream **local_stream, struct lttng_ht *ht) { int i = 0; struct lttng_ht_iter iter; @@ -793,8 +809,7 @@ int consumer_update_poll_array( DBG("Updating poll fd array"); rcu_read_lock(); - cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, - node.node) { + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) { continue; } @@ -1518,6 +1533,33 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } +/* + * Iterate over all streams of the hashtable and free them properly. + * + * WARNING: *MUST* be used with data stream only. + */ +static void destroy_data_stream_ht(struct lttng_ht *ht) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + ret = lttng_ht_del(ht, &iter); + assert(!ret); + + call_rcu(&stream->node.head, consumer_free_stream); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + /* * Iterate over all streams of the hashtable and free them properly. * @@ -1708,6 +1750,9 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, uatomic_dec(&stream->chan->nb_init_streams); } + /* Steal stream identifier to avoid having streams with the same key */ + consumer_steal_stream_key(stream->key, ht); + lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); rcu_read_unlock(); @@ -1726,7 +1771,6 @@ void *consumer_thread_metadata_poll(void *data) struct lttng_consumer_stream *stream = NULL; struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; - struct lttng_ht *metadata_ht = NULL; struct lttng_poll_event events; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -1735,11 +1779,6 @@ void *consumer_thread_metadata_poll(void *data) DBG("Thread metadata poll started"); - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - if (metadata_ht == NULL) { - goto end; - } - /* Size is set to 1 for the consumer_metadata pipe */ ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); if (ret < 0) { @@ -1918,6 +1957,11 @@ void *consumer_thread_data_poll(void *data) rcu_register_thread(); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (data_ht == NULL) { + goto end; + } + local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); while (1) { @@ -1955,7 +1999,8 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); goto end; } - ret = consumer_update_poll_array(ctx, &pollfd, local_stream); + ret = consumer_update_poll_array(ctx, &pollfd, local_stream, + data_ht); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2015,7 +2060,7 @@ void *consumer_thread_data_poll(void *data) continue; } - ret = consumer_add_stream(new_stream); + ret = consumer_add_stream(new_stream, data_ht); if (ret) { ERR("Consumer add stream %d failed. Continuing", new_stream->key); @@ -2088,22 +2133,19 @@ void *consumer_thread_data_poll(void *data) if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } @@ -2131,6 +2173,10 @@ end: */ close(ctx->consumer_metadata_pipe[1]); + if (data_ht) { + destroy_data_stream_ht(data_ht); + } + rcu_unregister_thread(); return NULL; } @@ -2299,6 +2345,11 @@ void lttng_consumer_init(void) consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + assert(metadata_ht); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + assert(data_ht); } /* diff --git a/src/common/consumer.h b/src/common/consumer.h index 8e5891aef..6bce96d96 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -275,6 +275,10 @@ struct lttng_consumer_global_data { struct lttng_ht *relayd_ht; }; +/* Defined in consumer.c and coupled with explanations */ +extern struct lttng_ht *metadata_ht; +extern struct lttng_ht *data_ht; + /* * Init consumer data structures. */ @@ -324,10 +328,6 @@ extern void lttng_consumer_sync_trace_file( */ extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); -extern int consumer_update_poll_array( - struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_consumer_streams); - extern struct lttng_consumer_stream *consumer_allocate_stream( int channel_key, int stream_key, int shm_fd, int wait_fd, @@ -340,7 +340,6 @@ extern struct lttng_consumer_stream *consumer_allocate_stream( int net_index, int metadata_flag, int *alloc_ret); -extern int consumer_add_stream(struct lttng_consumer_stream *stream); extern void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, -- 2.34.1