Fix: consumer relay sender RCU usage
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 25 Jul 2012 20:31:02 +0000 (16:31 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 25 Jul 2012 20:33:48 +0000 (16:33 -0400)
* RCU read-side lock needs to be held across entire usage of the
  returned structure pointer, not just the lookup.
* Moving refcount inc/dec to uatomic ops. Theoretically not 100%
  required for now, but won't hurt when we move to multithread.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/common/consumer.c
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 974c65f550c94a6be3fb94c18227f86ad133076e..831592a1e96ef4a8a22b64fdc29ffeaa75f62786 100644 (file)
@@ -261,16 +261,17 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
        }
 
        /* Check and cleanup relayd */
+       rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
-               /* We are about to modify the relayd refcount */
-               rcu_read_lock();
-               if (!--relayd->refcount) {
+               uatomic_dec(&relayd->refcount);
+               assert(uatomic_read(&relayd->refcount) >= 0);
+               if (uatomic_read(&relayd->refcount) == 0) {
                        /* Refcount of the relayd struct is 0, destroy it */
                        consumer_destroy_relayd(relayd);
                }
-               rcu_read_unlock();
        }
+       rcu_read_unlock();
 
        if (!--stream->chan->refcount) {
                free_chan = stream->chan;
@@ -371,8 +372,8 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal stream identifier, for UST */
        consumer_steal_stream_key(stream->key);
-       rcu_read_lock();
 
+       rcu_read_lock();
        lttng_ht_lookup(consumer_data.stream_ht,
                        (void *)((unsigned long) stream->key), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -383,16 +384,13 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        }
 
        lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
-       rcu_read_unlock();
 
        /* Check and cleanup relayd */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
-               /* We are about to modify the relayd refcount */
-               rcu_read_lock();
-               relayd->refcount++;
-               rcu_read_unlock();
+               uatomic_inc(&relayd->refcount);
        }
+       rcu_read_unlock();
 
        /* Update consumer data */
        consumer_data.stream_count++;
@@ -482,6 +480,8 @@ error:
  * Find a relayd socket pair in the global consumer data.
  *
  * Return the object if found else NULL.
+ * RCU read-side lock must be held across this call and while using the
+ * returned object.
  */
 struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
 {
@@ -494,8 +494,6 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
                goto error;
        }
 
-       rcu_read_lock();
-
        lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
                        &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -503,8 +501,6 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
                relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
        }
 
-       rcu_read_unlock();
-
 error:
        return relayd;
 }
@@ -528,6 +524,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
        /* Reset data header */
        memset(&data_hdr, 0, sizeof(data_hdr));
 
+       rcu_read_lock();
        /* Get relayd reference of the stream. */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd == NULL) {
@@ -562,6 +559,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
        }
 
 error:
+       rcu_read_unlock();
        return outfd;
 }
 
index 00bb7f7212cac7f0ccf4f4a6c91ef80dbb4b58ef..551d8579a21ad4c6e86bed6904df6290c4304d24 100644 (file)
@@ -59,6 +59,9 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
        uint64_t metadata_id;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
+       /* RCU lock for the relayd pointer */
+       rcu_read_lock();
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -76,9 +79,6 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
                goto end;
        }
 
-       /* RCU lock for the relayd pointer */
-       rcu_read_lock();
-
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
                /*
@@ -178,6 +178,9 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice(
        uint64_t metadata_id;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
+       /* RCU lock for the relayd pointer */
+       rcu_read_lock();
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -186,9 +189,6 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice(
                }
        }
 
-       /* RCU lock for the relayd pointer */
-       rcu_read_lock();
-
        /* Write metadata stream id before payload */
        if (stream->metadata_flag && relayd) {
                /*
@@ -374,6 +374,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return -ENOENT;
        }
 
+       /* relayd needs RCU read-side protection */
+       rcu_read_lock();
+
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
@@ -580,6 +583,7 @@ end:
                ret = write(ctx->consumer_poll_pipe[1], "", 1);
        } while (ret < 0 && errno == EINTR);
 end_nosignal:
+       rcu_read_unlock();
        return 0;
 }
 
index 1cd39ef34b7e957bf9cfa95ac93e7fb40374fb9c..4cd6972125dccba26fcfadee133b7438d91c5eeb 100644 (file)
@@ -56,10 +56,14 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
        uint64_t metadata_id;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
+       /* RCU lock for the relayd pointer */
+       rcu_read_lock();
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ERR("Cannot find relay for network stream\n");
                        goto end;
                }
        }
@@ -139,6 +143,7 @@ end:
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
+       rcu_read_unlock();
        return written;
 }
 
@@ -210,6 +215,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return -ENOENT;
        }
 
+       /* relayd need RCU read-side lock */
+       rcu_read_lock();
+
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
@@ -433,6 +441,7 @@ end:
                ret = write(ctx->consumer_poll_pipe[1], "", 1);
        } while (ret < 0 && errno == EINTR);
 end_nosignal:
+       rcu_read_unlock();
        return 0;
 }
 
This page took 0.029564 seconds and 4 git commands to generate.