From 04fdd819f0eebfd15a4196a9ca98e826352a9b4f Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 11 Apr 2012 21:57:31 -0400 Subject: [PATCH] Fix: consumer fd recv thread should write into non-blocking pipe Writing into a blocking pipe will cause the writer thread to block on the poll fds thread when the pipe is full. Given that we would like to batch stream array reallocation as much as possible, this wakeup should not block. Signed-off-by: Mathieu Desnoyers --- src/common/consumer.c | 47 +++++++++++++++----- src/common/kernel-consumer/kernel-consumer.c | 19 +++++--- src/common/ust-consumer/ust-consumer.c | 19 +++++--- 3 files changed, 64 insertions(+), 21 deletions(-) diff --git a/src/common/consumer.c b/src/common/consumer.c index 2604f3d06..ee5575262 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -736,6 +736,20 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_poll_pipe; } + /* set read end of the pipe to non-blocking */ + ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto error_poll_fcntl; + } + + /* set write end of the pipe to non-blocking */ + ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto error_poll_fcntl; + } + ret = pipe(ctx->consumer_should_quit); if (ret < 0) { perror("Error creating recv pipe"); @@ -760,6 +774,7 @@ error_thread_pipe: PERROR("close"); } } +error_poll_fcntl: error_quit_pipe: for (i = 0; i < 2; i++) { int err; @@ -933,8 +948,6 @@ void *lttng_consumer_thread_poll_fds(void *data) struct lttng_consumer_stream **local_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; - char tmp; - int tmp2; struct lttng_consumer_local_data *ctx = data; rcu_register_thread(); @@ -1019,11 +1032,14 @@ void *lttng_consumer_thread_poll_fds(void *data) * low-priority reads. */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { + size_t pipe_readlen; + char tmp; + DBG("consumer_poll_pipe wake up"); - tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1); - if (tmp2 < 0) { - perror("read consumer poll"); - } + /* Consume 1 byte of pipe data */ + do { + pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1); + } while (pipe_readlen == -1 && errno == EINTR); continue; } @@ -1228,11 +1244,20 @@ end: */ consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; - /* wake up the polling thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("poll pipe write"); - } + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the other end), the other end should + * perform the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret == -1UL && errno == EINTR); rcu_unregister_thread(); return NULL; } diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 9abee1de8..bbc31f8e3 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -295,11 +295,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } end: - /* signal the poll thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("write consumer poll"); - } + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the other end), the other end should + * perform the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret == -1UL && errno == EINTR); end_nosignal: return 0; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index bfdb7e968..2b55fd463 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -257,11 +257,20 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } end: - /* signal the poll thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - PERROR("write consumer poll"); - } + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the other end), the other end should + * perform the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret == -1UL && errno == EINTR); end_nosignal: return 0; } -- 2.34.1