X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=ee5575262ec5667d448007a8ab82eec0607e19c9;hp=2604f3d060574c1c7677047fcc16581a5c54818d;hb=04fdd819f0eebfd15a4196a9ca98e826352a9b4f;hpb=9ef70f8738b524b8ea4f266c526de9d5a4fdc29c diff --git a/src/common/consumer.c b/src/common/consumer.c index 2604f3d06..ee5575262 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -736,6 +736,20 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_poll_pipe; } + /* set read end of the pipe to non-blocking */ + ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto error_poll_fcntl; + } + + /* set write end of the pipe to non-blocking */ + ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto error_poll_fcntl; + } + ret = pipe(ctx->consumer_should_quit); if (ret < 0) { perror("Error creating recv pipe"); @@ -760,6 +774,7 @@ error_thread_pipe: PERROR("close"); } } +error_poll_fcntl: error_quit_pipe: for (i = 0; i < 2; i++) { int err; @@ -933,8 +948,6 @@ void *lttng_consumer_thread_poll_fds(void *data) struct lttng_consumer_stream **local_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; - char tmp; - int tmp2; struct lttng_consumer_local_data *ctx = data; rcu_register_thread(); @@ -1019,11 +1032,14 @@ void *lttng_consumer_thread_poll_fds(void *data) * low-priority reads. */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { + size_t pipe_readlen; + char tmp; + DBG("consumer_poll_pipe wake up"); - tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1); - if (tmp2 < 0) { - perror("read consumer poll"); - } + /* Consume 1 byte of pipe data */ + do { + pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1); + } while (pipe_readlen == -1 && errno == EINTR); continue; } @@ -1228,11 +1244,20 @@ end: */ consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; - /* wake up the polling thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("poll pipe write"); - } + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the other end), the other end should + * perform the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret == -1UL && errno == EINTR); rcu_unregister_thread(); return NULL; }