Fix: FD leak on consumer add relayd socket error
[lttng-tools.git] / src / common / consumer.c
index acbc6783b8f20cc4796526d1719d92e7a918cbc1..5766860a89823bf40c4ebd98448fc233e54c06f2 100644 (file)
@@ -47,9 +47,6 @@ struct lttng_consumer_global_data consumer_data = {
        .type = LTTNG_CONSUMER_UNKNOWN,
 };
 
-/* timeout parameter, to control the polling thread grace period. */
-int consumer_poll_timeout = -1;
-
 /*
  * Flag to inform the polling thread to quit when all fd hung up. Updated by
  * the consumer_thread_receive_fds when it notices that all fds has hung up.
@@ -59,15 +56,12 @@ int consumer_poll_timeout = -1;
 volatile int consumer_quit;
 
 /*
- * The following two hash tables are visible by all threads which are separated
- * in different source files.
- *
  * Global hash table containing respectively metadata and data streams. The
  * stream element in this ht should only be updated by the metadata poll thread
  * for the metadata and the data poll thread for the data.
  */
-struct lttng_ht *metadata_ht;
-struct lttng_ht *data_ht;
+static struct lttng_ht *metadata_ht;
+static struct lttng_ht *data_ht;
 
 /*
  * Notify a thread pipe to poll back again. This usually means that some global
@@ -133,6 +127,12 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht)
        rcu_read_unlock();
 }
 
+/*
+ * Return a channel object for the given key.
+ *
+ * RCU read side lock MUST be acquired before calling this function and
+ * protects the channel ptr.
+ */
 static struct lttng_consumer_channel *consumer_find_channel(int key)
 {
        struct lttng_ht_iter iter;
@@ -144,8 +144,6 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
                return NULL;
        }
 
-       rcu_read_lock();
-
        lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
                        &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -153,8 +151,6 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
                channel = caa_container_of(node, struct lttng_consumer_channel, node);
        }
 
-       rcu_read_unlock();
-
        return channel;
 }
 
@@ -354,8 +350,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
                goto free_stream;
        }
 
-       pthread_mutex_lock(&stream->lock);
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -447,8 +443,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
 
 end:
        consumer_data.need_update = 1;
-       pthread_mutex_unlock(&consumer_data.lock);
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
                consumer_del_channel(free_chan);
@@ -481,6 +477,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                goto end;
        }
 
+       rcu_read_lock();
+
        /*
         * Get stream's channel reference. Needed when adding the stream to the
         * global hash table.
@@ -537,9 +535,12 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                        stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
                        (unsigned long long) stream->mmap_len, stream->out_fd,
                        stream->net_seq_idx, stream->session_id);
+
+       rcu_read_unlock();
        return stream;
 
 error:
+       rcu_read_unlock();
        free(stream);
 end:
        return NULL;
@@ -560,6 +561,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %d", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
        /* Steal stream identifier to avoid having streams with the same key */
@@ -599,6 +601,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        consumer_data.need_update = 1;
 
        rcu_read_unlock();
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -914,7 +917,7 @@ static int consumer_update_poll_array(
                 * changed where this function will be called back again.
                 */
                if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
-                               stream->endpoint_status) {
+                               stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
                        continue;
                }
                DBG("Active FD %d", stream->wait_fd);
@@ -1264,6 +1267,8 @@ end:
  * core function for writing trace buffers to either the local filesystem or
  * the network.
  *
+ * It must be called with the stream lock held.
+ *
  * Careful review MUST be put if any changes occur!
  *
  * Returns the number of bytes written
@@ -1284,8 +1289,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
-       pthread_mutex_lock(&stream->lock);
-
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1410,7 +1413,6 @@ end:
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
-       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1419,6 +1421,8 @@ end:
 /*
  * Splice the data from the ring buffer to the tracefile.
  *
+ * It must be called with the stream lock held.
+ *
  * Returns the number of bytes spliced.
  */
 ssize_t lttng_consumer_on_read_subbuffer_splice(
@@ -1451,8 +1455,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
-       pthread_mutex_lock(&stream->lock);
-
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1620,7 +1622,6 @@ end:
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
-       pthread_mutex_unlock(&stream->lock);
 
        rcu_read_unlock();
        return written;
@@ -1766,9 +1767,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                goto free_stream;
        }
 
+       pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->lock);
 
-       pthread_mutex_lock(&consumer_data.lock);
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                if (stream->mmap_base != NULL) {
@@ -1858,8 +1859,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
 end:
-       pthread_mutex_unlock(&consumer_data.lock);
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
                consumer_del_channel(free_chan);
@@ -1878,6 +1879,8 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 {
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
 
        assert(stream);
        assert(ht);
@@ -1885,6 +1888,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->lock);
 
        /*
         * From here, refcounts are updated so be _careful_ when returning an error
@@ -1892,6 +1896,15 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
         */
 
        rcu_read_lock();
+
+       /*
+        * Lookup the stream just to make sure it does not exist in our internal
+        * state. This should NEVER happen.
+        */
+       lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       assert(!node);
+
        /* Find relayd and, if one is found, increment refcount. */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
@@ -1912,9 +1925,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_dec(&stream->chan->nb_init_streams);
        }
 
-       /* Steal stream identifier to avoid having streams with the same key */
-       consumer_steal_stream_key(stream->key, ht);
-
        lttng_ht_add_unique_ulong(ht, &stream->node);
 
        /*
@@ -1926,6 +1936,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
 
+       pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -1943,7 +1954,7 @@ static void validate_endpoint_status_data_stream(void)
        rcu_read_lock();
        cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
                /* Validate delete flag of the stream */
-               if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
                        continue;
                }
                /* Delete it right now */
@@ -1968,7 +1979,7 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_lock();
        cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
                /* Validate delete flag of the stream */
-               if (!stream->endpoint_status) {
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
                        continue;
                }
                /*
@@ -2057,7 +2068,10 @@ restart:
                                         * since their might be data to consume.
                                         */
                                        lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
-                                       close(ctx->consumer_metadata_pipe[0]);
+                                       ret = close(ctx->consumer_metadata_pipe[0]);
+                                       if (ret < 0) {
+                                               PERROR("close metadata pipe");
+                                       }
                                        continue;
                                } else if (revents & LPOLLIN) {
                                        do {
@@ -2250,7 +2264,7 @@ void *consumer_thread_data_poll(void *data)
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
-               num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
+               num_rdy = poll(pollfd, nb_fd + 1, -1);
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
                        /*
@@ -2421,7 +2435,10 @@ end:
         * only tracked fd in the poll set. The thread will take care of closing
         * the read side.
         */
-       close(ctx->consumer_metadata_pipe[1]);
+       ret = close(ctx->consumer_metadata_pipe[1]);
+       if (ret < 0) {
+               PERROR("close data pipe");
+       }
 
        if (data_ht) {
                destroy_data_stream_ht(data_ht);
@@ -2437,7 +2454,7 @@ end:
  */
 void *consumer_thread_sessiond_poll(void *data)
 {
-       int sock, client_socket, ret;
+       int sock = -1, client_socket, ret;
        /*
         * structure to poll for incoming data on communication socket avoids
         * making blocking sockets.
@@ -2497,6 +2514,13 @@ void *consumer_thread_sessiond_poll(void *data)
                goto end;
        }
 
+       /* This socket is not useful anymore. */
+       ret = close(client_socket);
+       if (ret < 0) {
+               PERROR("close client_socket");
+       }
+       client_socket = -1;
+
        /* update the polling structure to poll on the established socket */
        consumer_sockpoll[1].fd = sock;
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
@@ -2534,19 +2558,26 @@ end:
         */
        consumer_quit = 1;
 
-       /*
-        * 2s of grace period, if no polling events occur during
-        * this period, the polling thread will exit even if there
-        * are still open FDs (should not happen, but safety mechanism).
-        */
-       consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
-
        /*
         * Notify the data poll thread to poll back again and test the
         * consumer_quit state that we just set so to quit gracefully.
         */
        notify_thread_pipe(ctx->consumer_data_pipe[1]);
 
+       /* Cleaning up possibly open sockets. */
+       if (sock >= 0) {
+               ret = close(sock);
+               if (ret < 0) {
+                       PERROR("close sock sessiond poll");
+               }
+       }
+       if (client_socket >= 0) {
+               ret = close(sock);
+               if (ret < 0) {
+                       PERROR("close client_socket sessiond poll");
+               }
+       }
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2554,17 +2585,27 @@ end:
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
+       ssize_t ret;
+
+       pthread_mutex_lock(&stream->lock);
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               return lttng_kconsumer_read_subbuffer(stream, ctx);
+               ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+               break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               return lttng_ustconsumer_read_subbuffer(stream, ctx);
+               ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+               break;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
-               return -ENOSYS;
+               ret = -ENOSYS;
+               break;
        }
+
+       pthread_mutex_unlock(&stream->lock);
+       return ret;
 }
 
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
@@ -2607,7 +2648,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
 {
-       int fd, ret = -1;
+       int fd = -1, ret = -1;
        struct consumer_relayd_sock_pair *relayd;
 
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
@@ -2634,6 +2675,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        if (ret != sizeof(fd)) {
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
                ret = -1;
+               fd = -1;        /* Just in case it gets set with an invalid value. */
                goto error;
        }
 
@@ -2643,13 +2685,17 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                /* Copy received lttcomm socket */
                lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
                ret = lttcomm_create_sock(&relayd->control_sock);
+               /* Immediately try to close the created socket if valid. */
+               if (relayd->control_sock.fd >= 0) {
+                       if (close(relayd->control_sock.fd)) {
+                               PERROR("close relayd control socket");
+                       }
+               }
+               /* Handle create_sock error. */
                if (ret < 0) {
                        goto error;
                }
 
-               /* Close the created socket fd which is useless */
-               close(relayd->control_sock.fd);
-
                /* Assign new file descriptor */
                relayd->control_sock.fd = fd;
                break;
@@ -2657,13 +2703,17 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                /* Copy received lttcomm socket */
                lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
                ret = lttcomm_create_sock(&relayd->data_sock);
+               /* Immediately try to close the created socket if valid. */
+               if (relayd->data_sock.fd >= 0) {
+                       if (close(relayd->data_sock.fd)) {
+                               PERROR("close relayd data socket");
+                       }
+               }
+               /* Handle create_sock error. */
                if (ret < 0) {
                        goto error;
                }
 
-               /* Close the created socket fd which is useless */
-               close(relayd->data_sock.fd);
-
                /* Assign new file descriptor */
                relayd->data_sock.fd = fd;
                break;
@@ -2683,9 +2733,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        add_relayd(relayd);
 
        /* All good! */
-       ret = 0;
+       return 0;
 
 error:
+       /* Close received socket if valid. */
+       if (fd >= 0) {
+               if (close(fd)) {
+                       PERROR("close received socket");
+               }
+       }
        return ret;
 }
 
@@ -2754,7 +2810,7 @@ int consumer_data_pending(uint64_t id)
        ht = consumer_data.stream_list_ht;
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct((void *)((unsigned long) id), 0x42UL),
+                       ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* If this call fails, the stream is being used hence data pending. */
This page took 0.028834 seconds and 4 git commands to generate.