X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=d1b7ba29b22b91551f790fc2ed514f4b1508ba73;hp=e26388f8e50fab81d5df0999ab5fb73948e8faba;hb=acdb9057878ddbd9c112206f3c1c4c2104093088;hpb=9fd926379bd4c6c17f2b3c39a5bbf9879bc74e14 diff --git a/src/common/consumer.c b/src/common/consumer.c index e26388f8e..d1b7ba29b 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -91,6 +91,20 @@ static void notify_thread_pipe(int wpipe) } while (ret < 0 && errno == EINTR); } +/* + * Notify a thread lttng pipe to poll back again. This usually means that some + * global state has changed so we just send back the thread in a poll wait + * call. + */ +static void notify_thread_lttng_pipe(struct lttng_pipe *pipe) +{ + struct lttng_consumer_stream *null_stream = NULL; + + assert(pipe); + + (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); +} + static void notify_channel_pipe(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *chan, uint64_t key, @@ -408,7 +422,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * read of this status which happens AFTER receiving this notify. */ if (ctx) { - notify_thread_pipe(ctx->consumer_data_pipe[1]); + notify_thread_lttng_pipe(ctx->consumer_data_pipe); notify_thread_pipe(ctx->consumer_metadata_pipe[1]); } } @@ -970,7 +984,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * Insert the consumer_data_pipe at the end of the array and don't * increment i so nb_fd is the number of real FD. */ - (*pollfd)[i].fd = ctx->consumer_data_pipe[0]; + (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe); (*pollfd)[i].events = POLLIN | POLLPRI; return i; } @@ -1166,26 +1180,11 @@ struct lttng_consumer_local_data *lttng_consumer_create( ctx->on_recv_stream = recv_stream; ctx->on_update_stream = update_stream; - ret = pipe(ctx->consumer_data_pipe); - if (ret < 0) { - PERROR("Error creating poll pipe"); + ctx->consumer_data_pipe = lttng_pipe_open(0); + if (!ctx->consumer_data_pipe) { goto error_poll_pipe; } - /* set read end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK); - if (ret < 0) { - PERROR("fcntl O_NONBLOCK"); - goto error_poll_fcntl; - } - - /* set write end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK); - if (ret < 0) { - PERROR("fcntl O_NONBLOCK"); - goto error_poll_fcntl; - } - ret = pipe(ctx->consumer_should_quit); if (ret < 0) { PERROR("Error creating recv pipe"); @@ -1224,9 +1223,8 @@ error_channel_pipe: utils_close_pipe(ctx->consumer_thread_pipe); error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); -error_poll_fcntl: error_quit_pipe: - utils_close_pipe(ctx->consumer_data_pipe); + lttng_pipe_destroy(ctx->consumer_data_pipe); error_poll_pipe: free(ctx); error: @@ -1252,7 +1250,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) } utils_close_pipe(ctx->consumer_thread_pipe); utils_close_pipe(ctx->consumer_channel_pipe); - utils_close_pipe(ctx->consumer_data_pipe); + lttng_pipe_destroy(ctx->consumer_data_pipe); utils_close_pipe(ctx->consumer_should_quit); utils_close_pipe(ctx->consumer_splice_metadata_pipe); @@ -2401,13 +2399,10 @@ void *consumer_thread_data_poll(void *data) ssize_t pipe_readlen; DBG("consumer_data_pipe wake up"); - /* Consume 1 byte of pipe data */ - do { - pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, - sizeof(new_stream)); - } while (pipe_readlen == -1 && errno == EINTR); + pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, + &new_stream, sizeof(new_stream)); if (pipe_readlen < 0) { - PERROR("read consumer data pipe"); + ERR("Consumer data pipe ret %ld", pipe_readlen); /* Continue so we can at least handle the current stream(s). */ continue; } @@ -2967,7 +2962,7 @@ end: * Notify the data poll thread to poll back again and test the * consumer_quit state that we just set so to quit gracefully. */ - notify_thread_pipe(ctx->consumer_data_pipe[1]); + notify_thread_lttng_pipe(ctx->consumer_data_pipe); notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);