X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=831592a1e96ef4a8a22b64fdc29ffeaa75f62786;hp=88e8d9d28017ed5fb0fccbee22fad316f1ed402c;hb=f64161251bd649abe5b6a473531adfa3af9bd6b6;hpb=00e2e675d54dc726a7c8f8887c889cc8ef022003 diff --git a/src/common/consumer.c b/src/common/consumer.c index 88e8d9d28..831592a1e 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -261,16 +261,17 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) } /* Check and cleanup relayd */ + rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != NULL) { - /* We are about to modify the relayd refcount */ - rcu_read_lock(); - if (!--relayd->refcount) { + uatomic_dec(&relayd->refcount); + assert(uatomic_read(&relayd->refcount) >= 0); + if (uatomic_read(&relayd->refcount) == 0) { /* Refcount of the relayd struct is 0, destroy it */ consumer_destroy_relayd(relayd); } - rcu_read_unlock(); } + rcu_read_unlock(); if (!--stream->chan->refcount) { free_chan = stream->chan; @@ -371,8 +372,8 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&consumer_data.lock); /* Steal stream identifier, for UST */ consumer_steal_stream_key(stream->key); - rcu_read_lock(); + rcu_read_lock(); lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) stream->key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -383,16 +384,13 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) } lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); - rcu_read_unlock(); /* Check and cleanup relayd */ relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != NULL) { - /* We are about to modify the relayd refcount */ - rcu_read_lock(); - relayd->refcount++; - rcu_read_unlock(); + uatomic_inc(&relayd->refcount); } + rcu_read_unlock(); /* Update consumer data */ consumer_data.stream_count++; @@ -482,6 +480,8 @@ error: * Find a relayd socket pair in the global consumer data. * * Return the object if found else NULL. + * RCU read-side lock must be held across this call and while using the + * returned object. */ struct consumer_relayd_sock_pair *consumer_find_relayd(int key) { @@ -494,8 +494,6 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(int key) goto error; } - rcu_read_lock(); - lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -503,8 +501,6 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(int key) relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node); } - rcu_read_unlock(); - error: return relayd; } @@ -528,6 +524,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, /* Reset data header */ memset(&data_hdr, 0, sizeof(data_hdr)); + rcu_read_lock(); /* Get relayd reference of the stream. */ relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { @@ -562,6 +559,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, } error: + rcu_read_unlock(); return outfd; } @@ -875,7 +873,9 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) { int ret; consumer_quit = 1; - ret = write(ctx->consumer_should_quit[1], "4", 1); + do { + ret = write(ctx->consumer_should_quit[1], "4", 1); + } while (ret < 0 && errno == EINTR); if (ret < 0) { perror("write consumer quit"); } @@ -1522,7 +1522,7 @@ end: */ do { ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret == -1UL && errno == EINTR); + } while (ret < 0 && errno == EINTR); rcu_unregister_thread(); return NULL; }