Fix: possible use after free in consumer
[lttng-tools.git] / src / common / consumer.c
index 821a04e3d73d80025e76ad99f1cb0558725c7188..999e400059fbe763264698a75c67b3fb80d6c717 100644 (file)
@@ -34,6 +34,7 @@
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/compat/endian.h>
 #include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
@@ -1091,12 +1092,15 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
         */
        (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
        (*pollfd)[i].events = POLLIN | POLLPRI;
+
+       (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
+       (*pollfd)[i + 1].events = POLLIN | POLLPRI;
        return i;
 }
 
 /*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket
+ * Poll on the should_quit pipe and the command socket return -1 on
+ * error, 1 if should exit, 0 if data is available on the command socket
  */
 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
 {
@@ -1112,16 +1116,13 @@ restart:
                        goto restart;
                }
                PERROR("Poll error");
-               goto exit;
+               return -1;
        }
        if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
                DBG("consumer_should_quit wake up");
-               goto exit;
+               return 1;
        }
        return 0;
-
-exit:
-       return -1;
 }
 
 /*
@@ -1290,6 +1291,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_poll_pipe;
        }
 
+       ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_wakeup_pipe) {
+               goto error_wakeup_pipe;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
@@ -1329,6 +1335,8 @@ error_channel_pipe:
 error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+error_wakeup_pipe:
        lttng_pipe_destroy(ctx->consumer_data_pipe);
 error_poll_pipe:
        free(ctx);
@@ -1396,6 +1404,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 
        DBG("Consumer destroying it. Closing everything.");
 
+       if (!ctx) {
+               return;
+       }
+
        destroy_data_stream_ht(data_ht);
        destroy_metadata_stream_ht(metadata_ht);
 
@@ -1411,6 +1423,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
        utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
@@ -2405,16 +2418,18 @@ void *consumer_thread_data_poll(void *data)
                        free(local_stream);
                        local_stream = NULL;
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       /*
+                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
+                        * wake up pipe.
+                        */
+                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       local_stream = zmalloc((consumer_data.stream_count + 1) *
+                       local_stream = zmalloc((consumer_data.stream_count + 2) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2441,9 +2456,9 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 1);
+               DBG("polling on %d fd", nb_fd + 2);
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 1, -1);
+               num_rdy = poll(pollfd, nb_fd + 2, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -2492,6 +2507,20 @@ void *consumer_thread_data_poll(void *data)
                        continue;
                }
 
+               /* Handle wakeup pipe. */
+               if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
+                       char dummy;
+                       ssize_t pipe_readlen;
+
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
+                                       sizeof(dummy));
+                       if (pipe_readlen < 0) {
+                               PERROR("Consumer data wakeup pipe");
+                       }
+                       /* We've been awakened to handle stream(s). */
+                       ctx->has_wakeup = 0;
+               }
+
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
@@ -2530,7 +2559,8 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
                        if ((pollfd[i].revents & POLLIN) ||
-                                       local_stream[i]->hangup_flush_done) {
+                                       local_stream[i]->hangup_flush_done ||
+                                       local_stream[i]->has_data) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
@@ -2944,8 +2974,8 @@ static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
        assert(ctx);
        assert(sockpoll);
 
-       if (lttng_consumer_poll_socket(sockpoll) < 0) {
-               ret = -1;
+       ret = lttng_consumer_poll_socket(sockpoll);
+       if (ret) {
                goto error;
        }
        DBG("Metadata connection on client_socket");
@@ -3014,7 +3044,12 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].fd = client_socket;
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
-       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+       ret = lttng_consumer_poll_socket(consumer_sockpoll);
+       if (ret) {
+               if (ret > 0) {
+                       /* should exit */
+                       err = 0;
+               }
                goto end;
        }
        DBG("Connection on client_socket");
@@ -3031,7 +3066,11 @@ void *consumer_thread_sessiond_poll(void *data)
         * command unix socket.
         */
        ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
-       if (ret < 0) {
+       if (ret) {
+               if (ret > 0) {
+                       /* should exit */
+                       err = 0;
+               }
                goto end;
        }
 
@@ -3052,15 +3091,15 @@ void *consumer_thread_sessiond_poll(void *data)
                health_poll_entry();
                ret = lttng_consumer_poll_socket(consumer_sockpoll);
                health_poll_exit();
-               if (ret < 0) {
+               if (ret) {
+                       if (ret > 0) {
+                               /* should exit */
+                               err = 0;
+                       }
                        goto end;
                }
                DBG("Incoming command on sock");
                ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
-               if (ret == -ENOENT) {
-                       DBG("Received STOP command");
-                       goto end;
-               }
                if (ret <= 0) {
                        /*
                         * This could simply be a session daemon quitting. Don't output
@@ -3278,7 +3317,9 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        }
 
        /* Poll on consumer socket. */
-       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+       ret = lttng_consumer_poll_socket(consumer_sockpoll);
+       if (ret) {
+               /* Needing to exit in the middle of a command: error. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                ret = -EINTR;
                goto error_nosignal;
@@ -3613,6 +3654,7 @@ int consumer_send_status_msg(int sock, int ret_code)
 {
        struct lttcomm_consumer_status_msg msg;
 
+       memset(&msg, 0, sizeof(msg));
        msg.ret_code = ret_code;
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
@@ -3630,6 +3672,7 @@ int consumer_send_status_channel(int sock,
 
        assert(sock >= 0);
 
+       memset(&msg, 0, sizeof(msg));
        if (!channel) {
                msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
        } else {
This page took 0.026429 seconds and 4 git commands to generate.