From: David Goulet Date: Wed, 19 Mar 2014 18:34:27 +0000 (-0400) Subject: Fix: add consumer wake up pipe to avoid race X-Git-Tag: v2.5.0-rc1~82 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=02b3d1769d5f8a33e4109b1e681141c9295dfda6 Fix: add consumer wake up pipe to avoid race UST application will notify the wait_fd pipe for every subbuffer that it writes and ready to be consumed. However, on *high* load systems, this 1:1 property can fail if the pipe gets filled up. For performance reason, UST will ignore this error and continue since it can't wait for the pipe to clear up. This triggers a race condition where we have *one* wake up on the UST pipe for potentially multiple subbuffers. A data pending command will wait forever on streams that still has data but the data thread could'nt consumed them because of this 1:n possible race. Using the stop command without waiting would mean a memory/fd leak of the stream. Thus, we add a consumer wake up pipe here that notifies the data thread if there is still data to be read after a successful read subbuffer call. With this, we end up handling the residual buffers if any since the data thread is always notified when there is still data to be read. Acked-by: Mathieu Desnoyers Signed-off-by: David Goulet --- diff --git a/src/common/consumer.c b/src/common/consumer.c index ede214c80..e80ac6be7 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1091,6 +1091,9 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, */ (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe); (*pollfd)[i].events = POLLIN | POLLPRI; + + (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe); + (*pollfd)[i + 1].events = POLLIN | POLLPRI; return i; } @@ -1287,6 +1290,11 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_poll_pipe; } + ctx->consumer_wakeup_pipe = lttng_pipe_open(0); + if (!ctx->consumer_wakeup_pipe) { + goto error_wakeup_pipe; + } + ret = pipe(ctx->consumer_should_quit); if (ret < 0) { PERROR("Error creating recv pipe"); @@ -1326,6 +1334,8 @@ error_channel_pipe: error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); error_quit_pipe: + lttng_pipe_destroy(ctx->consumer_wakeup_pipe); +error_wakeup_pipe: lttng_pipe_destroy(ctx->consumer_data_pipe); error_poll_pipe: free(ctx); @@ -1408,6 +1418,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) utils_close_pipe(ctx->consumer_channel_pipe); lttng_pipe_destroy(ctx->consumer_data_pipe); lttng_pipe_destroy(ctx->consumer_metadata_pipe); + lttng_pipe_destroy(ctx->consumer_wakeup_pipe); utils_close_pipe(ctx->consumer_should_quit); utils_close_pipe(ctx->consumer_splice_metadata_pipe); @@ -2402,16 +2413,18 @@ void *consumer_thread_data_poll(void *data) free(local_stream); local_stream = NULL; - /* allocate for all fds + 1 for the consumer_data_pipe */ - pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + /* + * Allocate for all fds +1 for the consumer_data_pipe and +1 for + * wake up pipe. + */ + pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); goto end; } - /* allocate for all fds + 1 for the consumer_data_pipe */ - local_stream = zmalloc((consumer_data.stream_count + 1) * + local_stream = zmalloc((consumer_data.stream_count + 2) * sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2438,9 +2451,9 @@ void *consumer_thread_data_poll(void *data) } /* poll on the array of fds */ restart: - DBG("polling on %d fd", nb_fd + 1); + DBG("polling on %d fd", nb_fd + 2); health_poll_entry(); - num_rdy = poll(pollfd, nb_fd + 1, -1); + num_rdy = poll(pollfd, nb_fd + 2, -1); health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { @@ -2489,6 +2502,20 @@ void *consumer_thread_data_poll(void *data) continue; } + /* Handle wakeup pipe. */ + if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) { + char dummy; + ssize_t pipe_readlen; + + pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, + sizeof(dummy)); + if (pipe_readlen < 0) { + PERROR("Consumer data wakeup pipe"); + } + /* We've been awakened to handle stream(s). */ + ctx->has_wakeup = 0; + } + /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { health_code_update(); @@ -2527,7 +2554,8 @@ void *consumer_thread_data_poll(void *data) continue; } if ((pollfd[i].revents & POLLIN) || - local_stream[i]->hangup_flush_done) { + local_stream[i]->hangup_flush_done || + local_stream[i]->has_data) { DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ diff --git a/src/common/consumer.h b/src/common/consumer.h index 7485e65b2..4ac823c01 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -341,6 +341,9 @@ struct lttng_consumer_stream { */ pthread_cond_t metadata_rdv; pthread_mutex_t metadata_rdv_lock; + + /* Indicate if the stream still has some data to be read. */ + unsigned int has_data:1; }; /* @@ -453,6 +456,19 @@ struct lttng_consumer_local_data { int consumer_splice_metadata_pipe[2]; /* Data stream poll thread pipe. To transfer data stream to the thread */ struct lttng_pipe *consumer_data_pipe; + + /* + * Data thread use that pipe to catch wakeup from read subbuffer that + * detects that there is still data to be read for the stream encountered. + * Before doing so, the stream is flagged to indicate that there is still + * data to be read. + * + * Both pipes (read/write) are owned and used inside the data thread. + */ + struct lttng_pipe *consumer_wakeup_pipe; + /* Indicate if the wakeup thread has been notified. */ + unsigned int has_wakeup:1; + /* to let the signal handler wake up the fd receiver thread */ int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0955e66fa..9f2e739a3 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1853,6 +1853,57 @@ end: return ret; } +/* + * Return 0 on success else a negative value. + */ +static int notify_if_more_data(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct ustctl_consumer_stream *ustream; + + assert(stream); + assert(ctx); + + ustream = stream->ustream; + + /* + * First, we are going to check if there is a new subbuffer available + * before reading the stream wait_fd. + */ + /* Get the next subbuffer */ + ret = ustctl_get_next_subbuf(ustream); + if (ret) { + /* No more data found, flag the stream. */ + stream->has_data = 0; + ret = 0; + goto end; + } + + ret = ustctl_put_next_subbuf(ustream); + assert(!ret); + + /* This stream still has data. Flag it and wake up the data thread. */ + stream->has_data = 1; + + if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) { + ssize_t writelen; + + writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1); + if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + ret = writelen; + goto end; + } + + /* The wake up pipe has been notified. */ + ctx->has_wakeup = 1; + } + ret = 0; + +end: + return ret; +} + /* * Read subbuffer from the given stream. * @@ -1866,7 +1917,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, unsigned long len, subbuf_size, padding; int err, write_index = 1; long ret = 0; - char dummy; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -1881,11 +1931,17 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ustream = stream->ustream; /* - * We can consume the 1 byte written into the wait_fd by UST. - * Don't trigger error if we cannot read this one byte (read - * returns 0), or if the error is EAGAIN or EWOULDBLOCK. + * We can consume the 1 byte written into the wait_fd by UST. Don't trigger + * error if we cannot read this one byte (read returns 0), or if the error + * is EAGAIN or EWOULDBLOCK. + * + * This is only done when the stream is monitored by a thread, before the + * flush is done after a hangup and if the stream is not flagged with data + * since there might be nothing to consume in the wait fd but still have + * data available flagged by the consumer wake up pipe. */ - if (stream->monitor && !stream->hangup_flush_done) { + if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) { + char dummy; ssize_t readlen; readlen = lttng_read(stream->wait_fd, &dummy, 1); @@ -1971,6 +2027,17 @@ retry: err = ustctl_put_next_subbuf(ustream); assert(err == 0); + /* + * This will consumer the byte on the wait_fd if and only if there is not + * next subbuffer to be acquired. + */ + if (!stream->metadata_flag) { + ret = notify_if_more_data(stream, ctx); + if (ret < 0) { + goto end; + } + } + /* Write index if needed. */ if (!write_index) { goto end;