From 6065ceec9574bf18eb79ae707f627322f2713d18 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Wed, 11 Jan 2012 14:51:04 -0500 Subject: [PATCH] RCU support for consumer's hash tables Signed-off-by: David Goulet --- include/lttng/lttng-consumer.h | 7 +-- liblttng-consumer/lttng-consumer.c | 76 ++++++++++++++++++++++++------ 2 files changed, 66 insertions(+), 17 deletions(-) diff --git a/include/lttng/lttng-consumer.h b/include/lttng/lttng-consumer.h index bba72ee69..f5ad3e6f2 100644 --- a/include/lttng/lttng-consumer.h +++ b/include/lttng/lttng-consumer.h @@ -184,9 +184,10 @@ struct lttng_consumer_global_data { /* * At this time, this lock is used to ensure coherence between the count * and number of element in the hash table. It's also a protection for - * concurrent read/write between threads. Although hash table used are - * lockless data structure, appropriate RCU lock mechanism are not yet - * implemented in the consumer. + * concurrent read/write between threads. + * + * XXX: We need to see if this lock is still needed with the lockless RCU + * hash tables. */ pthread_mutex_t lock; diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index f4af47404..617282d9c 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -67,6 +67,8 @@ static struct lttng_consumer_stream *consumer_find_stream(int key) if (key < 0) return NULL; + rcu_read_lock(); + lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -74,6 +76,8 @@ static struct lttng_consumer_stream *consumer_find_stream(int key) stream = caa_container_of(node, struct lttng_consumer_stream, node); } + rcu_read_unlock(); + return stream; } @@ -96,6 +100,8 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) if (key < 0) return NULL; + rcu_read_lock(); + lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -103,6 +109,8 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) channel = caa_container_of(node, struct lttng_consumer_channel, node); } + rcu_read_unlock(); + return channel; } @@ -146,6 +154,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) goto end; } + rcu_read_lock(); + /* Get stream node from hash table */ lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) stream->key), &iter); @@ -153,6 +163,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) ret = lttng_ht_del(consumer_data.stream_ht, &iter); assert(!ret); + rcu_read_unlock(); + if (consumer_data.stream_count <= 0) { goto end; } @@ -181,6 +193,16 @@ end: consumer_del_channel(free_chan); } +static void consumer_del_stream_rcu(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_consumer_stream *stream = + caa_container_of(node, struct lttng_consumer_stream, node); + + consumer_del_stream(stream); +} + struct lttng_consumer_stream *consumer_allocate_stream( int channel_key, int stream_key, int shm_fd, int wait_fd, @@ -257,7 +279,9 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&consumer_data.lock); /* Steal stream identifier, for UST */ consumer_steal_stream_key(stream->key); + rcu_read_lock(); lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); + rcu_read_unlock(); consumer_data.stream_count++; consumer_data.need_update = 1; @@ -321,11 +345,15 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } + rcu_read_lock(); + lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) channel->key), &iter); ret = lttng_ht_del(consumer_data.channel_ht, &iter); assert(!ret); + rcu_read_unlock(); + if (channel->mmap_base != NULL) { ret = munmap(channel->mmap_base, channel->mmap_len); if (ret != 0) { @@ -344,6 +372,16 @@ end: pthread_mutex_unlock(&consumer_data.lock); } +static void consumer_del_channel_rcu(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_consumer_channel *channel= + caa_container_of(node, struct lttng_consumer_channel, node); + + consumer_del_channel(channel); +} + struct lttng_consumer_channel *consumer_allocate_channel( int channel_key, int shm_fd, int wait_fd, @@ -403,7 +441,9 @@ int consumer_add_channel(struct lttng_consumer_channel *channel) pthread_mutex_lock(&consumer_data.lock); /* Steal channel identifier, for UST */ consumer_steal_channel_key(channel->key); + rcu_read_lock(); lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node); + rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); return 0; } @@ -510,27 +550,29 @@ void lttng_consumer_cleanup(void) { int ret; struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - struct lttng_consumer_channel *channel; + struct lttng_ht_node_ulong *node; + + rcu_read_lock(); /* - * close all outfd. Called when there are no more threads - * running (after joining on the threads), no need to protect - * list iteration with mutex. + * close all outfd. Called when there are no more threads running (after + * joining on the threads), no need to protect list iteration with mutex. */ - cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, - node.node) { + cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node, + node) { ret = lttng_ht_del(consumer_data.stream_ht, &iter); assert(!ret); - consumer_del_stream(stream); + call_rcu(&node->head, consumer_del_stream_rcu); } - cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel, - node.node) { + cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node, + node) { ret = lttng_ht_del(consumer_data.channel_ht, &iter); assert(!ret); - consumer_del_channel(channel); + call_rcu(&node->head, consumer_del_channel_rcu); } + + rcu_read_unlock(); } /* @@ -897,11 +939,15 @@ void *lttng_consumer_thread_poll_fds(void *data) } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } else if ((pollfd[i].revents & POLLHUP) && !(pollfd[i].revents & POLLIN)) { @@ -919,7 +965,9 @@ void *lttng_consumer_thread_poll_fds(void *data) } else { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); } - consumer_del_stream(local_stream[i]); + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } } -- 2.34.1