Fix: don't steal key when adding a metadata stream
[lttng-tools.git] / src / common / consumer.c
index 16b9eb64f88ae006a742ade5d3d05b162d5bb0ef..260779ae6d1ace6d9a186c40c2202a654c85dbea 100644 (file)
@@ -56,15 +56,12 @@ struct lttng_consumer_global_data consumer_data = {
 volatile int consumer_quit;
 
 /*
- * The following two hash tables are visible by all threads which are separated
- * in different source files.
- *
  * Global hash table containing respectively metadata and data streams. The
  * stream element in this ht should only be updated by the metadata poll thread
  * for the metadata and the data poll thread for the data.
  */
-struct lttng_ht *metadata_ht;
-struct lttng_ht *data_ht;
+static struct lttng_ht *metadata_ht;
+static struct lttng_ht *data_ht;
 
 /*
  * Notify a thread pipe to poll back again. This usually means that some global
@@ -557,6 +554,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %d", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
        /* Steal stream identifier to avoid having streams with the same key */
@@ -596,6 +594,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        consumer_data.need_update = 1;
 
        rcu_read_unlock();
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -1875,6 +1874,8 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 {
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
 
        assert(stream);
        assert(ht);
@@ -1882,6 +1883,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
 
        /*
         * From here, refcounts are updated so be _careful_ when returning an error
@@ -1889,6 +1891,15 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
         */
 
        rcu_read_lock();
+
+       /*
+        * Lookup the stream just to make sure it does not exist in our internal
+        * state. This should NEVER happen.
+        */
+       lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       assert(!node);
+
        /* Find relayd and, if one is found, increment refcount. */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
@@ -1909,9 +1920,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_dec(&stream->chan->nb_init_streams);
        }
 
-       /* Steal stream identifier to avoid having streams with the same key */
-       consumer_steal_stream_key(stream->key, ht);
-
        lttng_ht_add_unique_ulong(ht, &stream->node);
 
        /*
@@ -1923,6 +1931,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
 
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -2054,7 +2063,10 @@ restart:
                                         * since their might be data to consume.
                                         */
                                        lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
-                                       close(ctx->consumer_metadata_pipe[0]);
+                                       ret = close(ctx->consumer_metadata_pipe[0]);
+                                       if (ret < 0) {
+                                               PERROR("close metadata pipe");
+                                       }
                                        continue;
                                } else if (revents & LPOLLIN) {
                                        do {
@@ -2418,7 +2430,10 @@ end:
         * only tracked fd in the poll set. The thread will take care of closing
         * the read side.
         */
-       close(ctx->consumer_metadata_pipe[1]);
+       ret = close(ctx->consumer_metadata_pipe[1]);
+       if (ret < 0) {
+               PERROR("close data pipe");
+       }
 
        if (data_ht) {
                destroy_data_stream_ht(data_ht);
@@ -2638,7 +2653,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                }
 
                /* Close the created socket fd which is useless */
-               close(relayd->control_sock.fd);
+               ret = close(relayd->control_sock.fd);
+               if (ret < 0) {
+                       PERROR("close relayd control socket");
+               }
 
                /* Assign new file descriptor */
                relayd->control_sock.fd = fd;
@@ -2652,7 +2670,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                }
 
                /* Close the created socket fd which is useless */
-               close(relayd->data_sock.fd);
+               ret = close(relayd->data_sock.fd);
+               if (ret < 0) {
+                       PERROR("close relayd control socket");
+               }
 
                /* Assign new file descriptor */
                relayd->data_sock.fd = fd;
This page took 0.024726 seconds and 4 git commands to generate.