Fix: consumer add channel return value was positive on error
[lttng-tools.git] / src / common / consumer.c
index d1b7ba29b22b91551f790fc2ed514f4b1508ba73..96bc337262d65cf525484ac14962f2b44eee19a6 100644 (file)
@@ -76,21 +76,6 @@ volatile int consumer_quit;
 static struct lttng_ht *metadata_ht;
 static struct lttng_ht *data_ht;
 
-/*
- * Notify a thread pipe to poll back again. This usually means that some global
- * state has changed so we just send back the thread in a poll wait call.
- */
-static void notify_thread_pipe(int wpipe)
-{
-       int ret;
-
-       do {
-               struct lttng_consumer_stream *null_stream = NULL;
-
-               ret = write(wpipe, &null_stream, sizeof(null_stream));
-       } while (ret < 0 && errno == EINTR);
-}
-
 /*
  * Notify a thread lttng pipe to poll back again. This usually means that some
  * global state has changed so we just send back the thread in a poll wait
@@ -117,6 +102,7 @@ static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
 
        msg.action = action;
        msg.chan = chan;
+       msg.key = key;
        do {
                ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
        } while (ret < 0 && errno == EINTR);
@@ -423,7 +409,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         */
        if (ctx) {
                notify_thread_lttng_pipe(ctx->consumer_data_pipe);
-               notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
+               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
        }
 }
 
@@ -478,6 +464,13 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
                                PERROR("munmap");
                        }
                }
+
+               if (stream->wait_fd >= 0) {
+                       ret = close(stream->wait_fd);
+                       if (ret) {
+                               PERROR("close");
+                       }
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
@@ -897,6 +890,8 @@ end:
 
 /*
  * Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
@@ -914,7 +909,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
                /* Channel already exist. Ignore the insertion */
                ERR("Consumer add channel key %" PRIu64 " already exists!",
                        channel->key);
-               ret = -1;
+               ret = -EEXIST;
                goto end;
        }
 
@@ -1203,8 +1198,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_channel_pipe;
        }
 
-       ret = utils_create_pipe(ctx->consumer_metadata_pipe);
-       if (ret < 0) {
+       ctx->consumer_metadata_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_metadata_pipe) {
                goto error_metadata_pipe;
        }
 
@@ -1216,7 +1211,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
        return ctx;
 
 error_splice_pipe:
-       utils_close_pipe(ctx->consumer_metadata_pipe);
+       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
 error_metadata_pipe:
        utils_close_pipe(ctx->consumer_channel_pipe);
 error_channel_pipe:
@@ -1251,6 +1246,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
+       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
        utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
@@ -1890,6 +1886,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                                PERROR("munmap metadata stream");
                        }
                }
+
+               if (stream->wait_fd >= 0) {
+                       ret = close(stream->wait_fd);
+                       if (ret < 0) {
+                               PERROR("close kernel metadata wait_fd");
+                       }
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
@@ -1959,6 +1962,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
 end:
+       /*
+        * Nullify the stream reference so it is not used after deletion. The
+        * consumer data lock MUST be acquired before being able to check for a
+        * NULL pointer value.
+        */
+       stream->chan->metadata_stream = NULL;
+
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
@@ -2011,9 +2021,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_inc(&relayd->refcount);
        }
 
-       /* Update channel refcount once added without error(s). */
-       uatomic_inc(&stream->chan->refcount);
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -2131,7 +2138,8 @@ void *consumer_thread_metadata_poll(void *data)
                goto end_poll;
        }
 
-       ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
+       ret = lttng_poll_add(&events,
+                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
        if (ret < 0) {
                goto end;
        }
@@ -2169,30 +2177,26 @@ restart:
                                continue;
                        }
 
-                       if (pollfd == ctx->consumer_metadata_pipe[0]) {
+                       if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
                                        /*
                                         * Remove the pipe from the poll set and continue the loop
                                         * since their might be data to consume.
                                         */
-                                       lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
-                                       ret = close(ctx->consumer_metadata_pipe[0]);
-                                       if (ret < 0) {
-                                               PERROR("close metadata pipe");
-                                       }
+                                       lttng_poll_del(&events,
+                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
                                        continue;
                                } else if (revents & LPOLLIN) {
-                                       do {
-                                               /* Get the stream pointer received */
-                                               ret = read(pollfd, &stream, sizeof(stream));
-                                       } while (ret < 0 && errno == EINTR);
-                                       if (ret < 0 ||
-                                                       ret < sizeof(struct lttng_consumer_stream *)) {
-                                               PERROR("read metadata stream");
+                                       ssize_t pipe_len;
+
+                                       pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
+                                                       &stream, sizeof(stream));
+                                       if (pipe_len < 0) {
+                                               ERR("read metadata stream, ret: %ld", pipe_len);
                                                /*
-                                                * Let's continue here and hope we can still work
-                                                * without stopping the consumer. XXX: Should we?
+                                                * Continue here to handle the rest of the streams.
                                                 */
                                                continue;
                                        }
@@ -2347,7 +2351,7 @@ void *consumer_thread_data_poll(void *data)
 
                        /* allocate for all fds + 1 for the consumer_data_pipe */
                        local_stream = zmalloc((consumer_data.stream_count + 1) *
-                                       sizeof(struct lttng_consumer_stream));
+                                       sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
@@ -2540,10 +2544,7 @@ end:
         * only tracked fd in the poll set. The thread will take care of closing
         * the read side.
         */
-       ret = close(ctx->consumer_metadata_pipe[1]);
-       if (ret < 0) {
-               PERROR("close data pipe");
-       }
+       (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
 
        destroy_data_stream_ht(data_ht);
 
@@ -2717,20 +2718,25 @@ restart:
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                        chan->wait_fd);
+                                               rcu_read_lock();
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                &chan->wait_fd_node);
+                                               rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                lttng_poll_add(&events, chan->wait_fd,
                                                                LPOLLIN | LPOLLPRI);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
+                                               rcu_read_lock();
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
+                                                       rcu_read_unlock();
                                                        ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
                                                        break;
                                                }
                                                lttng_poll_del(&events, chan->wait_fd);
+                                               iter.iter.node = &chan->wait_fd_node.node;
                                                ret = lttng_ht_del(channel_ht, &iter);
                                                assert(ret == 0);
                                                consumer_close_channel_streams(chan);
@@ -2742,6 +2748,7 @@ restart:
                                                if (!uatomic_sub_return(&chan->refcount, 1)) {
                                                        consumer_del_channel(chan);
                                                }
+                                               rcu_read_unlock();
                                                goto restart;
                                        }
                                        case CONSUMER_CHANNEL_QUIT:
@@ -2868,12 +2875,6 @@ void *consumer_thread_sessiond_poll(void *data)
                goto end;
        }
 
-       ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto end;
-       }
-
        /* prepare the FDs to poll : to client socket and the should_quit pipe */
        consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
        consumer_sockpoll[0].events = POLLIN | POLLPRI;
@@ -2891,11 +2892,6 @@ void *consumer_thread_sessiond_poll(void *data)
                WARN("On accept");
                goto end;
        }
-       ret = fcntl(sock, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto end;
-       }
 
        /*
         * Setup metadata socket which is the second socket connection on the
@@ -2974,7 +2970,7 @@ end:
                }
        }
        if (client_socket >= 0) {
-               ret = close(sock);
+               ret = close(client_socket);
                if (ret < 0) {
                        PERROR("close client_socket sessiond poll");
                }
This page took 0.027799 seconds and 4 git commands to generate.