Fix: error handling in kernel add_channel
[lttng-tools.git] / src / common / consumer.c
index c4518cc03a7d8891715704cd40a8983f50d08650..5ca2e7bad68187f7b61b628a28ae0932658a3ccb 100644 (file)
@@ -77,18 +77,17 @@ static struct lttng_ht *metadata_ht;
 static struct lttng_ht *data_ht;
 
 /*
- * Notify a thread pipe to poll back again. This usually means that some global
- * state has changed so we just send back the thread in a poll wait call.
+ * Notify a thread lttng pipe to poll back again. This usually means that some
+ * global state has changed so we just send back the thread in a poll wait
+ * call.
  */
-static void notify_thread_pipe(int wpipe)
+static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
 {
-       int ret;
+       struct lttng_consumer_stream *null_stream = NULL;
 
-       do {
-               struct lttng_consumer_stream *null_stream = NULL;
+       assert(pipe);
 
-               ret = write(wpipe, &null_stream, sizeof(null_stream));
-       } while (ret < 0 && errno == EINTR);
+       (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
 }
 
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
@@ -99,6 +98,8 @@ static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
        struct consumer_channel_msg msg;
        int ret;
 
+       memset(&msg, 0, sizeof(msg));
+
        msg.action = action;
        msg.chan = chan;
        do {
@@ -406,8 +407,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * read of this status which happens AFTER receiving this notify.
         */
        if (ctx) {
-               notify_thread_pipe(ctx->consumer_data_pipe[1]);
-               notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
+               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
+               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
        }
 }
 
@@ -462,6 +463,13 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
                                PERROR("munmap");
                        }
                }
+
+               if (stream->wait_fd >= 0) {
+                       ret = close(stream->wait_fd);
+                       if (ret) {
+                               PERROR("close");
+                       }
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
@@ -658,9 +666,6 @@ static int add_stream(struct lttng_consumer_stream *stream,
                uatomic_inc(&relayd->refcount);
        }
 
-       /* Update channel refcount once added without error(s). */
-       uatomic_inc(&stream->chan->refcount);
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -901,7 +906,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
                /* Channel already exist. Ignore the insertion */
                ERR("Consumer add channel key %" PRIu64 " already exists!",
                        channel->key);
-               ret = -1;
+               ret = LTTNG_ERR_KERN_CHAN_EXIST;
                goto end;
        }
 
@@ -971,7 +976,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
         * Insert the consumer_data_pipe at the end of the array and don't
         * increment i so nb_fd is the number of real FD.
         */
-       (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
+       (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
        (*pollfd)[i].events = POLLIN | POLLPRI;
        return i;
 }
@@ -1167,26 +1172,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
        ctx->on_recv_stream = recv_stream;
        ctx->on_update_stream = update_stream;
 
-       ret = pipe(ctx->consumer_data_pipe);
-       if (ret < 0) {
-               PERROR("Error creating poll pipe");
+       ctx->consumer_data_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_data_pipe) {
                goto error_poll_pipe;
        }
 
-       /* set read end of the pipe to non-blocking */
-       ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto error_poll_fcntl;
-       }
-
-       /* set write end of the pipe to non-blocking */
-       ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto error_poll_fcntl;
-       }
-
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
@@ -1205,8 +1195,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_channel_pipe;
        }
 
-       ret = utils_create_pipe(ctx->consumer_metadata_pipe);
-       if (ret < 0) {
+       ctx->consumer_metadata_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_metadata_pipe) {
                goto error_metadata_pipe;
        }
 
@@ -1218,16 +1208,15 @@ struct lttng_consumer_local_data *lttng_consumer_create(
        return ctx;
 
 error_splice_pipe:
-       utils_close_pipe(ctx->consumer_metadata_pipe);
+       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
 error_metadata_pipe:
        utils_close_pipe(ctx->consumer_channel_pipe);
 error_channel_pipe:
        utils_close_pipe(ctx->consumer_thread_pipe);
 error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
-error_poll_fcntl:
 error_quit_pipe:
-       utils_close_pipe(ctx->consumer_data_pipe);
+       lttng_pipe_destroy(ctx->consumer_data_pipe);
 error_poll_pipe:
        free(ctx);
 error:
@@ -1253,7 +1242,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        }
        utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
-       utils_close_pipe(ctx->consumer_data_pipe);
+       lttng_pipe_destroy(ctx->consumer_data_pipe);
+       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
        utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
@@ -1424,6 +1414,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                goto end;
                        }
                        outfd = stream->out_fd = ret;
+                       /* Reset current size because we just perform a rotation. */
+                       stream->tracefile_size_current = 0;
                }
                stream->tracefile_size_current += len;
        }
@@ -1604,6 +1596,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                goto end;
                        }
                        outfd = stream->out_fd = ret;
+                       /* Reset current size because we just perform a rotation. */
+                       stream->tracefile_size_current = 0;
                }
                stream->tracefile_size_current += len;
        }
@@ -1889,6 +1883,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                                PERROR("munmap metadata stream");
                        }
                }
+
+               if (stream->wait_fd >= 0) {
+                       ret = close(stream->wait_fd);
+                       if (ret < 0) {
+                               PERROR("close kernel metadata wait_fd");
+                       }
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
@@ -2130,7 +2131,8 @@ void *consumer_thread_metadata_poll(void *data)
                goto end_poll;
        }
 
-       ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
+       ret = lttng_poll_add(&events,
+                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
        if (ret < 0) {
                goto end;
        }
@@ -2168,30 +2170,26 @@ restart:
                                continue;
                        }
 
-                       if (pollfd == ctx->consumer_metadata_pipe[0]) {
+                       if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
                                        /*
                                         * Remove the pipe from the poll set and continue the loop
                                         * since their might be data to consume.
                                         */
-                                       lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
-                                       ret = close(ctx->consumer_metadata_pipe[0]);
-                                       if (ret < 0) {
-                                               PERROR("close metadata pipe");
-                                       }
+                                       lttng_poll_del(&events,
+                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
                                        continue;
                                } else if (revents & LPOLLIN) {
-                                       do {
-                                               /* Get the stream pointer received */
-                                               ret = read(pollfd, &stream, sizeof(stream));
-                                       } while (ret < 0 && errno == EINTR);
-                                       if (ret < 0 ||
-                                                       ret < sizeof(struct lttng_consumer_stream *)) {
-                                               PERROR("read metadata stream");
+                                       ssize_t pipe_len;
+
+                                       pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
+                                                       &stream, sizeof(stream));
+                                       if (pipe_len < 0) {
+                                               ERR("read metadata stream, ret: %ld", pipe_len);
                                                /*
-                                                * Let's continue here and hope we can still work
-                                                * without stopping the consumer. XXX: Should we?
+                                                * Continue here to handle the rest of the streams.
                                                 */
                                                continue;
                                        }
@@ -2398,13 +2396,10 @@ void *consumer_thread_data_poll(void *data)
                        ssize_t pipe_readlen;
 
                        DBG("consumer_data_pipe wake up");
-                       /* Consume 1 byte of pipe data */
-                       do {
-                               pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
-                                               sizeof(new_stream));
-                       } while (pipe_readlen == -1 && errno == EINTR);
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+                                       &new_stream, sizeof(new_stream));
                        if (pipe_readlen < 0) {
-                               PERROR("read consumer data pipe");
+                               ERR("Consumer data pipe ret %ld", pipe_readlen);
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
                        }
@@ -2542,10 +2537,7 @@ end:
         * only tracked fd in the poll set. The thread will take care of closing
         * the read side.
         */
-       ret = close(ctx->consumer_metadata_pipe[1]);
-       if (ret < 0) {
-               PERROR("close data pipe");
-       }
+       (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
 
        destroy_data_stream_ht(data_ht);
 
@@ -2964,7 +2956,7 @@ end:
         * Notify the data poll thread to poll back again and test the
         * consumer_quit state that we just set so to quit gracefully.
         */
-       notify_thread_pipe(ctx->consumer_data_pipe[1]);
+       notify_thread_lttng_pipe(ctx->consumer_data_pipe);
 
        notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
This page took 0.02879 seconds and 4 git commands to generate.