From acdb9057878ddbd9c112206f3c1c4c2104093088 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Tue, 14 May 2013 11:00:22 -0400 Subject: [PATCH] Change consumer_data_pipe to be a lttng_pipe Also, an important change here is that this pipe is no longer in non block mode. Before sending stream's pointer over this pipe, only one byte was written thus making it unlikely to fail in a read/write race condition between threads. Now, 4 bytes are written so keeping this pipe non block with threads is a bit of a "looking for trouble situation". The lttng pipe wrappers make sure that the read and write side are synchronized between threads using a mutex for each side. Furthermore, the read and write handle partial I/O and EINTR meaning that once the call returns we are sure that either everything was read/written or an error occured thus making it not possible for the read side to block indefinitely after a poll event. Fixes #475 Signed-off-by: David Goulet --- src/common/consumer.c | 53 +++++++++----------- src/common/consumer.h | 3 +- src/common/kernel-consumer/kernel-consumer.c | 3 +- src/common/ust-consumer/ust-consumer.c | 2 +- 4 files changed, 29 insertions(+), 32 deletions(-) diff --git a/src/common/consumer.c b/src/common/consumer.c index e26388f8e..d1b7ba29b 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -91,6 +91,20 @@ static void notify_thread_pipe(int wpipe) } while (ret < 0 && errno == EINTR); } +/* + * Notify a thread lttng pipe to poll back again. This usually means that some + * global state has changed so we just send back the thread in a poll wait + * call. + */ +static void notify_thread_lttng_pipe(struct lttng_pipe *pipe) +{ + struct lttng_consumer_stream *null_stream = NULL; + + assert(pipe); + + (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); +} + static void notify_channel_pipe(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *chan, uint64_t key, @@ -408,7 +422,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * read of this status which happens AFTER receiving this notify. */ if (ctx) { - notify_thread_pipe(ctx->consumer_data_pipe[1]); + notify_thread_lttng_pipe(ctx->consumer_data_pipe); notify_thread_pipe(ctx->consumer_metadata_pipe[1]); } } @@ -970,7 +984,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * Insert the consumer_data_pipe at the end of the array and don't * increment i so nb_fd is the number of real FD. */ - (*pollfd)[i].fd = ctx->consumer_data_pipe[0]; + (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe); (*pollfd)[i].events = POLLIN | POLLPRI; return i; } @@ -1166,26 +1180,11 @@ struct lttng_consumer_local_data *lttng_consumer_create( ctx->on_recv_stream = recv_stream; ctx->on_update_stream = update_stream; - ret = pipe(ctx->consumer_data_pipe); - if (ret < 0) { - PERROR("Error creating poll pipe"); + ctx->consumer_data_pipe = lttng_pipe_open(0); + if (!ctx->consumer_data_pipe) { goto error_poll_pipe; } - /* set read end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK); - if (ret < 0) { - PERROR("fcntl O_NONBLOCK"); - goto error_poll_fcntl; - } - - /* set write end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK); - if (ret < 0) { - PERROR("fcntl O_NONBLOCK"); - goto error_poll_fcntl; - } - ret = pipe(ctx->consumer_should_quit); if (ret < 0) { PERROR("Error creating recv pipe"); @@ -1224,9 +1223,8 @@ error_channel_pipe: utils_close_pipe(ctx->consumer_thread_pipe); error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); -error_poll_fcntl: error_quit_pipe: - utils_close_pipe(ctx->consumer_data_pipe); + lttng_pipe_destroy(ctx->consumer_data_pipe); error_poll_pipe: free(ctx); error: @@ -1252,7 +1250,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) } utils_close_pipe(ctx->consumer_thread_pipe); utils_close_pipe(ctx->consumer_channel_pipe); - utils_close_pipe(ctx->consumer_data_pipe); + lttng_pipe_destroy(ctx->consumer_data_pipe); utils_close_pipe(ctx->consumer_should_quit); utils_close_pipe(ctx->consumer_splice_metadata_pipe); @@ -2401,13 +2399,10 @@ void *consumer_thread_data_poll(void *data) ssize_t pipe_readlen; DBG("consumer_data_pipe wake up"); - /* Consume 1 byte of pipe data */ - do { - pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, - sizeof(new_stream)); - } while (pipe_readlen == -1 && errno == EINTR); + pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, + &new_stream, sizeof(new_stream)); if (pipe_readlen < 0) { - PERROR("read consumer data pipe"); + ERR("Consumer data pipe ret %ld", pipe_readlen); /* Continue so we can at least handle the current stream(s). */ continue; } @@ -2967,7 +2962,7 @@ end: * Notify the data poll thread to poll back again and test the * consumer_quit state that we just set so to quit gracefully. */ - notify_thread_pipe(ctx->consumer_data_pipe[1]); + notify_thread_lttng_pipe(ctx->consumer_data_pipe); notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT); diff --git a/src/common/consumer.h b/src/common/consumer.h index 43989e4c5..91039e8e9 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -31,6 +31,7 @@ #include #include #include +#include /* Commands for consumer */ enum lttng_consumer_command { @@ -346,7 +347,7 @@ struct lttng_consumer_local_data { int consumer_channel_pipe[2]; int consumer_splice_metadata_pipe[2]; /* Data stream poll thread pipe. To transfer data stream to the thread */ - int consumer_data_pipe[2]; + struct lttng_pipe *consumer_data_pipe; /* to let the signal handler wake up the fd receiver thread */ int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 2cf9ac1a8..d8aec492f 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -289,7 +290,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (new_stream->metadata_flag) { stream_pipe = ctx->consumer_metadata_pipe[1]; } else { - stream_pipe = ctx->consumer_data_pipe[1]; + stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe); } do { diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 01fca9b7f..8bc69006b 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -191,7 +191,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, if (stream->metadata_flag) { stream_pipe = ctx->consumer_metadata_pipe[1]; } else { - stream_pipe = ctx->consumer_data_pipe[1]; + stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe); } do { -- 2.34.1