Fix: don't steal key when adding a metadata stream
[lttng-tools.git] / src / common / consumer.c
index ca6aeba3ffd7a8748256bd9cae356eeff3e977b6..260779ae6d1ace6d9a186c40c2202a654c85dbea 100644 (file)
@@ -47,9 +47,6 @@ struct lttng_consumer_global_data consumer_data = {
        .type = LTTNG_CONSUMER_UNKNOWN,
 };
 
-/* timeout parameter, to control the polling thread grace period. */
-int consumer_poll_timeout = -1;
-
 /*
  * Flag to inform the polling thread to quit when all fd hung up. Updated by
  * the consumer_thread_receive_fds when it notices that all fds has hung up.
@@ -59,15 +56,12 @@ int consumer_poll_timeout = -1;
 volatile int consumer_quit;
 
 /*
- * The following two hash tables are visible by all threads which are separated
- * in different source files.
- *
  * Global hash table containing respectively metadata and data streams. The
  * stream element in this ht should only be updated by the metadata poll thread
  * for the metadata and the data poll thread for the data.
  */
-struct lttng_ht *metadata_ht;
-struct lttng_ht *data_ht;
+static struct lttng_ht *metadata_ht;
+static struct lttng_ht *data_ht;
 
 /*
  * Notify a thread pipe to poll back again. This usually means that some global
@@ -560,6 +554,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %d", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
        /* Steal stream identifier to avoid having streams with the same key */
@@ -599,6 +594,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        consumer_data.need_update = 1;
 
        rcu_read_unlock();
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -1559,7 +1555,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                written = ret_splice;
                        }
                        /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EBADF) {
+                       if (errno == EBADF || errno == EPIPE) {
                                WARN("Remote relayd disconnected. Stopping");
                                relayd_hang_up = 1;
                                goto write_error;
@@ -1878,6 +1874,8 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 {
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
 
        assert(stream);
        assert(ht);
@@ -1885,6 +1883,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
 
        /*
         * From here, refcounts are updated so be _careful_ when returning an error
@@ -1892,6 +1891,15 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
         */
 
        rcu_read_lock();
+
+       /*
+        * Lookup the stream just to make sure it does not exist in our internal
+        * state. This should NEVER happen.
+        */
+       lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       assert(!node);
+
        /* Find relayd and, if one is found, increment refcount. */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
@@ -1912,9 +1920,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_dec(&stream->chan->nb_init_streams);
        }
 
-       /* Steal stream identifier to avoid having streams with the same key */
-       consumer_steal_stream_key(stream->key, ht);
-
        lttng_ht_add_unique_ulong(ht, &stream->node);
 
        /*
@@ -1926,6 +1931,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
 
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -2057,7 +2063,10 @@ restart:
                                         * since their might be data to consume.
                                         */
                                        lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
-                                       close(ctx->consumer_metadata_pipe[0]);
+                                       ret = close(ctx->consumer_metadata_pipe[0]);
+                                       if (ret < 0) {
+                                               PERROR("close metadata pipe");
+                                       }
                                        continue;
                                } else if (revents & LPOLLIN) {
                                        do {
@@ -2250,7 +2259,7 @@ void *consumer_thread_data_poll(void *data)
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
-               num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
+               num_rdy = poll(pollfd, nb_fd + 1, -1);
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
                        /*
@@ -2421,7 +2430,10 @@ end:
         * only tracked fd in the poll set. The thread will take care of closing
         * the read side.
         */
-       close(ctx->consumer_metadata_pipe[1]);
+       ret = close(ctx->consumer_metadata_pipe[1]);
+       if (ret < 0) {
+               PERROR("close data pipe");
+       }
 
        if (data_ht) {
                destroy_data_stream_ht(data_ht);
@@ -2534,13 +2546,6 @@ end:
         */
        consumer_quit = 1;
 
-       /*
-        * 2s of grace period, if no polling events occur during
-        * this period, the polling thread will exit even if there
-        * are still open FDs (should not happen, but safety mechanism).
-        */
-       consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
-
        /*
         * Notify the data poll thread to poll back again and test the
         * consumer_quit state that we just set so to quit gracefully.
@@ -2648,7 +2653,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                }
 
                /* Close the created socket fd which is useless */
-               close(relayd->control_sock.fd);
+               ret = close(relayd->control_sock.fd);
+               if (ret < 0) {
+                       PERROR("close relayd control socket");
+               }
 
                /* Assign new file descriptor */
                relayd->control_sock.fd = fd;
@@ -2662,7 +2670,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                }
 
                /* Close the created socket fd which is useless */
-               close(relayd->data_sock.fd);
+               ret = close(relayd->data_sock.fd);
+               if (ret < 0) {
+                       PERROR("close relayd control socket");
+               }
 
                /* Assign new file descriptor */
                relayd->data_sock.fd = fd;
@@ -2754,7 +2765,7 @@ int consumer_data_pending(uint64_t id)
        ht = consumer_data.stream_list_ht;
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct((void *)((unsigned long) id), 0x42UL),
+                       ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* If this call fails, the stream is being used hence data pending. */
This page took 0.026142 seconds and 4 git commands to generate.