Change consumer_data_pipe to be a lttng_pipe
[lttng-tools.git] / src / common / consumer.c
index e26388f8e50fab81d5df0999ab5fb73948e8faba..d1b7ba29b22b91551f790fc2ed514f4b1508ba73 100644 (file)
@@ -91,6 +91,20 @@ static void notify_thread_pipe(int wpipe)
        } while (ret < 0 && errno == EINTR);
 }
 
+/*
+ * Notify a thread lttng pipe to poll back again. This usually means that some
+ * global state has changed so we just send back the thread in a poll wait
+ * call.
+ */
+static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
+{
+       struct lttng_consumer_stream *null_stream = NULL;
+
+       assert(pipe);
+
+       (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+}
+
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *chan,
                uint64_t key,
@@ -408,7 +422,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * read of this status which happens AFTER receiving this notify.
         */
        if (ctx) {
-               notify_thread_pipe(ctx->consumer_data_pipe[1]);
+               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
                notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
        }
 }
@@ -970,7 +984,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
         * Insert the consumer_data_pipe at the end of the array and don't
         * increment i so nb_fd is the number of real FD.
         */
-       (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
+       (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
        (*pollfd)[i].events = POLLIN | POLLPRI;
        return i;
 }
@@ -1166,26 +1180,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
        ctx->on_recv_stream = recv_stream;
        ctx->on_update_stream = update_stream;
 
-       ret = pipe(ctx->consumer_data_pipe);
-       if (ret < 0) {
-               PERROR("Error creating poll pipe");
+       ctx->consumer_data_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_data_pipe) {
                goto error_poll_pipe;
        }
 
-       /* set read end of the pipe to non-blocking */
-       ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto error_poll_fcntl;
-       }
-
-       /* set write end of the pipe to non-blocking */
-       ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto error_poll_fcntl;
-       }
-
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
@@ -1224,9 +1223,8 @@ error_channel_pipe:
        utils_close_pipe(ctx->consumer_thread_pipe);
 error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
-error_poll_fcntl:
 error_quit_pipe:
-       utils_close_pipe(ctx->consumer_data_pipe);
+       lttng_pipe_destroy(ctx->consumer_data_pipe);
 error_poll_pipe:
        free(ctx);
 error:
@@ -1252,7 +1250,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        }
        utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
-       utils_close_pipe(ctx->consumer_data_pipe);
+       lttng_pipe_destroy(ctx->consumer_data_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
        utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
@@ -2401,13 +2399,10 @@ void *consumer_thread_data_poll(void *data)
                        ssize_t pipe_readlen;
 
                        DBG("consumer_data_pipe wake up");
-                       /* Consume 1 byte of pipe data */
-                       do {
-                               pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
-                                               sizeof(new_stream));
-                       } while (pipe_readlen == -1 && errno == EINTR);
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+                                       &new_stream, sizeof(new_stream));
                        if (pipe_readlen < 0) {
-                               PERROR("read consumer data pipe");
+                               ERR("Consumer data pipe ret %ld", pipe_readlen);
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
                        }
@@ -2967,7 +2962,7 @@ end:
         * Notify the data poll thread to poll back again and test the
         * consumer_quit state that we just set so to quit gracefully.
         */
-       notify_thread_pipe(ctx->consumer_data_pipe[1]);
+       notify_thread_lttng_pipe(ctx->consumer_data_pipe);
 
        notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
This page took 0.024165 seconds and 4 git commands to generate.