Backported to glibc 2.8
[lttng-tools.git] / src / common / consumer.c
index e56afa78c78f020e6fc8d71a1d5b8a3c1a6ec5f6..cba4a605a4f8d119d4f324427fe81761b605c240 100644 (file)
@@ -34,6 +34,7 @@
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/compat/endian.h>
 #include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
@@ -250,6 +251,32 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        return channel;
 }
 
+/*
+ * There is a possibility that the consumer does not have enough time between
+ * the close of the channel on the session daemon and the cleanup in here thus
+ * once we have a channel add with an existing key, we know for sure that this
+ * channel will eventually get cleaned up by all streams being closed.
+ *
+ * This function just nullifies the already existing channel key.
+ */
+static void steal_channel_key(uint64_t key)
+{
+       struct lttng_consumer_channel *channel;
+
+       rcu_read_lock();
+       channel = consumer_find_channel(key);
+       if (channel) {
+               channel->key = (uint64_t) -1ULL;
+               /*
+                * We don't want the lookup to match, but we still need to iterate on
+                * this channel when iterating over the hash table. Just change the
+                * node key.
+                */
+               channel->node.key = (uint64_t) -1ULL;
+       }
+       rcu_read_unlock();
+}
+
 static void free_channel_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
@@ -979,43 +1006,35 @@ end:
 /*
  * Add a channel to the global list protected by a mutex.
  *
- * On success 0 is returned else a negative value.
+ * Always return 0 indicating success.
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
 {
-       int ret = 0;
-       struct lttng_ht_node_u64 *node;
-       struct lttng_ht_iter iter;
-
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
        pthread_mutex_lock(&channel->timer_lock);
-       rcu_read_lock();
 
-       lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
-       if (node != NULL) {
-               /* Channel already exist. Ignore the insertion */
-               ERR("Consumer add channel key %" PRIu64 " already exists!",
-                       channel->key);
-               ret = -EEXIST;
-               goto end;
-       }
+       /*
+        * This gives us a guarantee that the channel we are about to add to the
+        * channel hash table will be unique. See this function comment on the why
+        * we need to steel the channel key at this stage.
+        */
+       steal_channel_key(channel->key);
 
+       rcu_read_lock();
        lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
-
-end:
        rcu_read_unlock();
+
        pthread_mutex_unlock(&channel->timer_lock);
        pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
-       if (!ret && channel->wait_fd != -1 &&
-                       channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
+       if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
                notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
-       return ret;
+
+       return 0;
 }
 
 /*
@@ -1073,12 +1092,15 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
         */
        (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
        (*pollfd)[i].events = POLLIN | POLLPRI;
+
+       (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
+       (*pollfd)[i + 1].events = POLLIN | POLLPRI;
        return i;
 }
 
 /*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket
+ * Poll on the should_quit pipe and the command socket return -1 on
+ * error, 1 if should exit, 0 if data is available on the command socket
  */
 int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
 {
@@ -1094,16 +1116,13 @@ restart:
                        goto restart;
                }
                PERROR("Poll error");
-               goto exit;
+               return -1;
        }
        if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
                DBG("consumer_should_quit wake up");
-               goto exit;
+               return 1;
        }
        return 0;
-
-exit:
-       return -1;
 }
 
 /*
@@ -1272,6 +1291,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_poll_pipe;
        }
 
+       ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_wakeup_pipe) {
+               goto error_wakeup_pipe;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
@@ -1311,6 +1335,8 @@ error_channel_pipe:
 error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+error_wakeup_pipe:
        lttng_pipe_destroy(ctx->consumer_data_pipe);
 error_poll_pipe:
        free(ctx);
@@ -1393,6 +1419,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
        utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
@@ -2387,16 +2414,18 @@ void *consumer_thread_data_poll(void *data)
                        free(local_stream);
                        local_stream = NULL;
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       /*
+                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
+                        * wake up pipe.
+                        */
+                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       local_stream = zmalloc((consumer_data.stream_count + 1) *
+                       local_stream = zmalloc((consumer_data.stream_count + 2) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2423,9 +2452,9 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 1);
+               DBG("polling on %d fd", nb_fd + 2);
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 1, -1);
+               num_rdy = poll(pollfd, nb_fd + 2, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -2474,6 +2503,20 @@ void *consumer_thread_data_poll(void *data)
                        continue;
                }
 
+               /* Handle wakeup pipe. */
+               if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
+                       char dummy;
+                       ssize_t pipe_readlen;
+
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
+                                       sizeof(dummy));
+                       if (pipe_readlen < 0) {
+                               PERROR("Consumer data wakeup pipe");
+                       }
+                       /* We've been awakened to handle stream(s). */
+                       ctx->has_wakeup = 0;
+               }
+
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
@@ -2512,7 +2555,8 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
                        if ((pollfd[i].revents & POLLIN) ||
-                                       local_stream[i]->hangup_flush_done) {
+                                       local_stream[i]->hangup_flush_done ||
+                                       local_stream[i]->has_data) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
@@ -2926,8 +2970,8 @@ static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
        assert(ctx);
        assert(sockpoll);
 
-       if (lttng_consumer_poll_socket(sockpoll) < 0) {
-               ret = -1;
+       ret = lttng_consumer_poll_socket(sockpoll);
+       if (ret) {
                goto error;
        }
        DBG("Metadata connection on client_socket");
@@ -2996,7 +3040,12 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].fd = client_socket;
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
-       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+       ret = lttng_consumer_poll_socket(consumer_sockpoll);
+       if (ret) {
+               if (ret > 0) {
+                       /* should exit */
+                       err = 0;
+               }
                goto end;
        }
        DBG("Connection on client_socket");
@@ -3013,7 +3062,11 @@ void *consumer_thread_sessiond_poll(void *data)
         * command unix socket.
         */
        ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
-       if (ret < 0) {
+       if (ret) {
+               if (ret > 0) {
+                       /* should exit */
+                       err = 0;
+               }
                goto end;
        }
 
@@ -3034,15 +3087,15 @@ void *consumer_thread_sessiond_poll(void *data)
                health_poll_entry();
                ret = lttng_consumer_poll_socket(consumer_sockpoll);
                health_poll_exit();
-               if (ret < 0) {
+               if (ret) {
+                       if (ret > 0) {
+                               /* should exit */
+                               err = 0;
+                       }
                        goto end;
                }
                DBG("Incoming command on sock");
                ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
-               if (ret == -ENOENT) {
-                       DBG("Received STOP command");
-                       goto end;
-               }
                if (ret <= 0) {
                        /*
                         * This could simply be a session daemon quitting. Don't output
@@ -3260,7 +3313,9 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        }
 
        /* Poll on consumer socket. */
-       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+       ret = lttng_consumer_poll_socket(consumer_sockpoll);
+       if (ret) {
+               /* Needing to exit in the middle of a command: error. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                ret = -EINTR;
                goto error_nosignal;
@@ -3595,6 +3650,7 @@ int consumer_send_status_msg(int sock, int ret_code)
 {
        struct lttcomm_consumer_status_msg msg;
 
+       memset(&msg, 0, sizeof(msg));
        msg.ret_code = ret_code;
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
@@ -3612,6 +3668,7 @@ int consumer_send_status_channel(int sock,
 
        assert(sock >= 0);
 
+       memset(&msg, 0, sizeof(msg));
        if (!channel) {
                msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
        } else {
This page took 0.026906 seconds and 4 git commands to generate.