X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-consumer%2Flttng-consumer.c;h=0263aa1d83134bb5edd2937effce7d9cec1e4320;hp=f4af47404c6053af72f90107a5ea6acf9c3650c2;hb=008dd0fe872d71d154867d25579b58a0a204d93d;hpb=e4421fecbda445e77b4604d2332014960bfbf695 diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index f4af47404..0263aa1d8 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -67,6 +67,8 @@ static struct lttng_consumer_stream *consumer_find_stream(int key) if (key < 0) return NULL; + rcu_read_lock(); + lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -74,6 +76,8 @@ static struct lttng_consumer_stream *consumer_find_stream(int key) stream = caa_container_of(node, struct lttng_consumer_stream, node); } + rcu_read_unlock(); + return stream; } @@ -96,6 +100,8 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) if (key < 0) return NULL; + rcu_read_lock(); + lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -103,6 +109,8 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) channel = caa_container_of(node, struct lttng_consumer_channel, node); } + rcu_read_unlock(); + return channel; } @@ -146,6 +154,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) goto end; } + rcu_read_lock(); + /* Get stream node from hash table */ lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) stream->key), &iter); @@ -153,6 +163,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) ret = lttng_ht_del(consumer_data.stream_ht, &iter); assert(!ret); + rcu_read_unlock(); + if (consumer_data.stream_count <= 0) { goto end; } @@ -166,8 +178,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) { close(stream->wait_fd); } - if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd - && !stream->shm_fd_is_copy) { + if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) { close(stream->shm_fd); } if (!--stream->chan->refcount) @@ -181,6 +192,16 @@ end: consumer_del_channel(free_chan); } +static void consumer_del_stream_rcu(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_consumer_stream *stream = + caa_container_of(node, struct lttng_consumer_stream, node); + + consumer_del_stream(stream); +} + struct lttng_consumer_stream *consumer_allocate_stream( int channel_key, int stream_key, int shm_fd, int wait_fd, @@ -257,7 +278,9 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&consumer_data.lock); /* Steal stream identifier, for UST */ consumer_steal_stream_key(stream->key); + rcu_read_lock(); lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); + rcu_read_unlock(); consumer_data.stream_count++; consumer_data.need_update = 1; @@ -321,11 +344,15 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } + rcu_read_lock(); + lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) channel->key), &iter); ret = lttng_ht_del(consumer_data.channel_ht, &iter); assert(!ret); + rcu_read_unlock(); + if (channel->mmap_base != NULL) { ret = munmap(channel->mmap_base, channel->mmap_len); if (ret != 0) { @@ -335,8 +362,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) { close(channel->wait_fd); } - if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd - && !channel->shm_fd_is_copy) { + if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) { close(channel->shm_fd); } free(channel); @@ -344,6 +370,16 @@ end: pthread_mutex_unlock(&consumer_data.lock); } +static void consumer_del_channel_rcu(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_consumer_channel *channel= + caa_container_of(node, struct lttng_consumer_channel, node); + + consumer_del_channel(channel); +} + struct lttng_consumer_channel *consumer_allocate_channel( int channel_key, int shm_fd, int wait_fd, @@ -403,7 +439,9 @@ int consumer_add_channel(struct lttng_consumer_channel *channel) pthread_mutex_lock(&consumer_data.lock); /* Steal channel identifier, for UST */ consumer_steal_channel_key(channel->key); + rcu_read_lock(); lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node); + rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); return 0; } @@ -510,27 +548,29 @@ void lttng_consumer_cleanup(void) { int ret; struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - struct lttng_consumer_channel *channel; + struct lttng_ht_node_ulong *node; + + rcu_read_lock(); /* - * close all outfd. Called when there are no more threads - * running (after joining on the threads), no need to protect - * list iteration with mutex. + * close all outfd. Called when there are no more threads running (after + * joining on the threads), no need to protect list iteration with mutex. */ - cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, - node.node) { + cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node, + node) { ret = lttng_ht_del(consumer_data.stream_ht, &iter); assert(!ret); - consumer_del_stream(stream); + call_rcu(&node->head, consumer_del_stream_rcu); } - cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel, - node.node) { + cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node, + node) { ret = lttng_ht_del(consumer_data.channel_ht, &iter); assert(!ret); - consumer_del_channel(channel); + call_rcu(&node->head, consumer_del_channel_rcu); } + + rcu_read_unlock(); } /* @@ -803,6 +843,8 @@ void *lttng_consumer_thread_poll_fds(void *data) int tmp2; struct lttng_consumer_local_data *ctx = data; + rcu_register_thread(); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); while (1) { @@ -897,11 +939,15 @@ void *lttng_consumer_thread_poll_fds(void *data) } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } else if ((pollfd[i].revents & POLLHUP) && !(pollfd[i].revents & POLLIN)) { @@ -919,7 +965,9 @@ void *lttng_consumer_thread_poll_fds(void *data) } else { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); } - consumer_del_stream(local_stream[i]); + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } } @@ -957,6 +1005,7 @@ end: free(local_stream); local_stream = NULL; } + rcu_unregister_thread(); return NULL; } @@ -974,6 +1023,8 @@ void *lttng_consumer_thread_receive_fds(void *data) struct pollfd consumer_sockpoll[2]; struct lttng_consumer_local_data *ctx = data; + rcu_register_thread(); + DBG("Creating command socket %s", ctx->consumer_command_sock_path); unlink(ctx->consumer_command_sock_path); client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path); @@ -1069,6 +1120,7 @@ end: if (ret < 0) { perror("poll pipe write"); } + rcu_unregister_thread(); return NULL; }