Fix: FD leak on consumer add relayd socket error
[lttng-tools.git] / src / common / consumer.c
index 51f861e23695c642a56a943aa272fb24c89c7a5d..5766860a89823bf40c4ebd98448fc233e54c06f2 100644 (file)
@@ -917,7 +917,7 @@ static int consumer_update_poll_array(
                 * changed where this function will be called back again.
                 */
                if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
-                               stream->endpoint_status) {
+                               stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
                        continue;
                }
                DBG("Active FD %d", stream->wait_fd);
@@ -1267,6 +1267,8 @@ end:
  * core function for writing trace buffers to either the local filesystem or
  * the network.
  *
+ * It must be called with the stream lock held.
+ *
  * Careful review MUST be put if any changes occur!
  *
  * Returns the number of bytes written
@@ -1419,6 +1421,8 @@ end:
 /*
  * Splice the data from the ring buffer to the tracefile.
  *
+ * It must be called with the stream lock held.
+ *
  * Returns the number of bytes spliced.
  */
 ssize_t lttng_consumer_on_read_subbuffer_splice(
@@ -1950,7 +1954,7 @@ static void validate_endpoint_status_data_stream(void)
        rcu_read_lock();
        cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
                /* Validate delete flag of the stream */
-               if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
                        continue;
                }
                /* Delete it right now */
@@ -1975,7 +1979,7 @@ static void validate_endpoint_status_metadata_stream(
        rcu_read_lock();
        cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
                /* Validate delete flag of the stream */
-               if (!stream->endpoint_status) {
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
                        continue;
                }
                /*
@@ -2450,7 +2454,7 @@ end:
  */
 void *consumer_thread_sessiond_poll(void *data)
 {
-       int sock, client_socket, ret;
+       int sock = -1, client_socket, ret;
        /*
         * structure to poll for incoming data on communication socket avoids
         * making blocking sockets.
@@ -2510,6 +2514,13 @@ void *consumer_thread_sessiond_poll(void *data)
                goto end;
        }
 
+       /* This socket is not useful anymore. */
+       ret = close(client_socket);
+       if (ret < 0) {
+               PERROR("close client_socket");
+       }
+       client_socket = -1;
+
        /* update the polling structure to poll on the established socket */
        consumer_sockpoll[1].fd = sock;
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
@@ -2553,6 +2564,20 @@ end:
         */
        notify_thread_pipe(ctx->consumer_data_pipe[1]);
 
+       /* Cleaning up possibly open sockets. */
+       if (sock >= 0) {
+               ret = close(sock);
+               if (ret < 0) {
+                       PERROR("close sock sessiond poll");
+               }
+       }
+       if (client_socket >= 0) {
+               ret = close(sock);
+               if (ret < 0) {
+                       PERROR("close client_socket sessiond poll");
+               }
+       }
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2623,7 +2648,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
 {
-       int fd, ret = -1;
+       int fd = -1, ret = -1;
        struct consumer_relayd_sock_pair *relayd;
 
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
@@ -2650,6 +2675,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        if (ret != sizeof(fd)) {
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
                ret = -1;
+               fd = -1;        /* Just in case it gets set with an invalid value. */
                goto error;
        }
 
@@ -2659,14 +2685,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                /* Copy received lttcomm socket */
                lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
                ret = lttcomm_create_sock(&relayd->control_sock);
-               if (ret < 0) {
-                       goto error;
+               /* Immediately try to close the created socket if valid. */
+               if (relayd->control_sock.fd >= 0) {
+                       if (close(relayd->control_sock.fd)) {
+                               PERROR("close relayd control socket");
+                       }
                }
-
-               /* Close the created socket fd which is useless */
-               ret = close(relayd->control_sock.fd);
+               /* Handle create_sock error. */
                if (ret < 0) {
-                       PERROR("close relayd control socket");
+                       goto error;
                }
 
                /* Assign new file descriptor */
@@ -2676,14 +2703,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                /* Copy received lttcomm socket */
                lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
                ret = lttcomm_create_sock(&relayd->data_sock);
-               if (ret < 0) {
-                       goto error;
+               /* Immediately try to close the created socket if valid. */
+               if (relayd->data_sock.fd >= 0) {
+                       if (close(relayd->data_sock.fd)) {
+                               PERROR("close relayd data socket");
+                       }
                }
-
-               /* Close the created socket fd which is useless */
-               ret = close(relayd->data_sock.fd);
+               /* Handle create_sock error. */
                if (ret < 0) {
-                       PERROR("close relayd control socket");
+                       goto error;
                }
 
                /* Assign new file descriptor */
@@ -2705,9 +2733,15 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        add_relayd(relayd);
 
        /* All good! */
-       ret = 0;
+       return 0;
 
 error:
+       /* Close received socket if valid. */
+       if (fd >= 0) {
+               if (close(fd)) {
+                       PERROR("close received socket");
+               }
+       }
        return ret;
 }
 
This page took 0.025211 seconds and 4 git commands to generate.