From 060a32b279132ceeeef14b96a611077195a2ca46 Mon Sep 17 00:00:00 2001 From: Christian Babeux Date: Sun, 2 Dec 2012 21:09:39 -0500 Subject: [PATCH] Cygwin: Fix handling of wait pipe hangup by properly detecting EOF On Linux, the POLLHUP poll(3) event is used to signal that the other end of a pipe has been disconnected. Due to poor wording in the Single UNIX Specification, differents UNIX implementation signal the EOF with conflicting poll events [1]. This is the case on Cygwin. A pipe close sends the POLLIN poll(3) event. The actual consumer implementation sees this has a wakeup for data ready to be consumed. The current hangup handling leads to infinite looping in the consumer because the hangup is never detected and the POLLIN event is never cleared. To fix this issue, the consumer must read on the pipe, check for EOF (read(3) shall return 0 to indicate EOF) and proceed to force the POLLHUP poll(3) event if this is indeed the case. [1] - http://www.greenend.org.uk/rjk/tech/poll.html Signed-off-by: Christian Babeux --- src/common/consumer.c | 68 +++++++++++++++++++------- src/common/consumer.h | 2 + src/common/ust-consumer/ust-consumer.c | 26 ++++++++++ src/common/ust-consumer/ust-consumer.h | 10 ++++ 4 files changed, 88 insertions(+), 18 deletions(-) diff --git a/src/common/consumer.c b/src/common/consumer.c index 024ee1785..c51149435 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1046,30 +1046,45 @@ void *lttng_consumer_thread_poll_fds(void *data) continue; } - /* Take care of high priority channels first. */ + + /* Check if each pipe has data. hack for cygwin. */ for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents & POLLPRI) { - ssize_t len; + if ((pollfd[i].revents & POLLIN) || + local_stream[i]->hangup_flush_done) { + int check_ret; - DBG("Urgent read on fd %d", pollfd[i].fd); - high_prio = 1; - len = ctx->on_buffer_ready(local_stream[i], ctx); - /* it's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { - goto end; - } else if (len > 0) { - local_stream[i]->data_read = 1; + check_ret = lttng_consumer_check_pipe(local_stream[i], ctx); + if (check_ret != 0) { + pollfd[i].revents |= POLLHUP; } } } - /* - * If we read high prio channel in this loop, try again - * for more high prio data. - */ - if (high_prio) { - continue; - } + /* Take care of high priority channels first. */ + /* for (i = 0; i < nb_fd; i++) { */ + /* DBG("!!! POLL FLAGS: %d", pollfd[i].revents); */ + /* if (pollfd[i].revents & POLLPRI) { */ + /* ssize_t len; */ + + /* DBG("Urgent read on fd %d", pollfd[i].fd); */ + /* high_prio = 1; */ + /* len = ctx->on_buffer_ready(local_stream[i], ctx); */ + /* /\* it's ok to have an unavailable sub-buffer *\/ */ + /* if (len < 0 && len != -EAGAIN) { */ + /* goto end; */ + /* } else if (len > 0) { */ + /* local_stream[i]->data_read = 1; */ + /* } */ + /* } */ + /* } */ + + /* /\* */ + /* * If we read high prio channel in this loop, try again */ + /* * for more high prio data. */ + /* *\/ */ + /* if (high_prio) { */ + /* continue; */ + /* } */ /* Take care of low priority channels. */ for (i = 0; i < nb_fd; i++) { @@ -1285,6 +1300,23 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, } } +int lttng_consumer_check_pipe(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + assert(0); + return -ENOSYS; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_check_pipe(stream, ctx); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) { switch (consumer_data.type) { diff --git a/src/common/consumer.h b/src/common/consumer.h index 6ac781605..6953c0b38 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -314,5 +314,7 @@ extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); +int lttng_consumer_check_pipe(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx); #endif /* _LTTNG_CONSUMER_H */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 97b890bf2..9d624966b 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -414,6 +414,31 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) } +int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + ssize_t readlen; + char dummy; + + DBG("In check_pipe (wait_fd: %d, stream key: %d)\n", + stream->wait_fd, stream->key); + + /* We consume the 1 byte written into the wait_fd by UST */ + if (!stream->hangup_flush_done) { + do { + readlen = read(stream->wait_fd, &dummy, 1); + } while (readlen == -1 && errno == EINTR); + if (readlen == -1) { + return -1; /* error */ + } + DBG("Read %zu byte from pipe: %c\n", readlen, dummy); + if (readlen == 0) + return 1; /* POLLHUP */ + } + return 0; /* no error nor HUP */ + +} + int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { @@ -437,6 +462,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ret = readlen; goto end; } + DBG("Read %zu byte from pipe: %c\n", readlen, dummy); } buf = stream->buf; diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index c07377f8e..044a10d82 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -72,6 +72,9 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream); void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream); +int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx); + #else /* HAVE_LIBLTTNG_UST_CTL */ static inline @@ -153,6 +156,13 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) { } +static inline +int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + return -ENOSYS; +} + #endif /* HAVE_LIBLTTNG_UST_CTL */ #endif /* _LTTNG_USTCONSUMER_H */ -- 2.34.1