Fix: add consumer wake up pipe to avoid race
authorDavid Goulet <dgoulet@efficios.com>
Wed, 19 Mar 2014 18:34:27 +0000 (14:34 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Tue, 1 Apr 2014 14:53:11 +0000 (10:53 -0400)
UST application will notify the wait_fd pipe for every subbuffer that it
writes and ready to be consumed. However, on *high* load systems, this
1:1 property can fail if the pipe gets filled up. For performance
reason, UST will ignore this error and continue since it can't wait for
the pipe to clear up.

This triggers a race condition where we have *one* wake up on the UST
pipe for potentially multiple subbuffers. A data pending command will
wait forever on streams that still has data but the data thread could'nt
consumed them because of this 1:n possible race. Using the stop command
without waiting would mean a memory/fd leak of the stream.

Thus, we add a consumer wake up pipe here that notifies the data thread
if there is still data to be read after a successful read subbuffer
call. With this, we end up handling the residual buffers if any since
the data thread is always notified when there is still data to be read.

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/ust-consumer/ust-consumer.c

index ede214c8025b264fcdf5a538a565b2ab5b710eed..e80ac6be751fbba7eebc98404d520f9ea62c4cd5 100644 (file)
@@ -1091,6 +1091,9 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
         */
        (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
        (*pollfd)[i].events = POLLIN | POLLPRI;
+
+       (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
+       (*pollfd)[i + 1].events = POLLIN | POLLPRI;
        return i;
 }
 
@@ -1287,6 +1290,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_poll_pipe;
        }
 
+       ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_wakeup_pipe) {
+               goto error_wakeup_pipe;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                PERROR("Error creating recv pipe");
@@ -1326,6 +1334,8 @@ error_channel_pipe:
 error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+error_wakeup_pipe:
        lttng_pipe_destroy(ctx->consumer_data_pipe);
 error_poll_pipe:
        free(ctx);
@@ -1408,6 +1418,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+       lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
        utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
@@ -2402,16 +2413,18 @@ void *consumer_thread_data_poll(void *data)
                        free(local_stream);
                        local_stream = NULL;
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       /*
+                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
+                        * wake up pipe.
+                        */
+                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       /* allocate for all fds + 1 for the consumer_data_pipe */
-                       local_stream = zmalloc((consumer_data.stream_count + 1) *
+                       local_stream = zmalloc((consumer_data.stream_count + 2) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2438,9 +2451,9 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 1);
+               DBG("polling on %d fd", nb_fd + 2);
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 1, -1);
+               num_rdy = poll(pollfd, nb_fd + 2, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -2489,6 +2502,20 @@ void *consumer_thread_data_poll(void *data)
                        continue;
                }
 
+               /* Handle wakeup pipe. */
+               if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
+                       char dummy;
+                       ssize_t pipe_readlen;
+
+                       pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
+                                       sizeof(dummy));
+                       if (pipe_readlen < 0) {
+                               PERROR("Consumer data wakeup pipe");
+                       }
+                       /* We've been awakened to handle stream(s). */
+                       ctx->has_wakeup = 0;
+               }
+
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        health_code_update();
@@ -2527,7 +2554,8 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
                        if ((pollfd[i].revents & POLLIN) ||
-                                       local_stream[i]->hangup_flush_done) {
+                                       local_stream[i]->hangup_flush_done ||
+                                       local_stream[i]->has_data) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
index 7485e65b259d4b855c301405425225a462b5e0bf..4ac823c017d03754ed545a9a1288df707b84778f 100644 (file)
@@ -341,6 +341,9 @@ struct lttng_consumer_stream {
         */
        pthread_cond_t metadata_rdv;
        pthread_mutex_t metadata_rdv_lock;
+
+       /* Indicate if the stream still has some data to be read. */
+       unsigned int has_data:1;
 };
 
 /*
@@ -453,6 +456,19 @@ struct lttng_consumer_local_data {
        int consumer_splice_metadata_pipe[2];
        /* Data stream poll thread pipe. To transfer data stream to the thread */
        struct lttng_pipe *consumer_data_pipe;
+
+       /*
+        * Data thread use that pipe to catch wakeup from read subbuffer that
+        * detects that there is still data to be read for the stream encountered.
+        * Before doing so, the stream is flagged to indicate that there is still
+        * data to be read.
+        *
+        * Both pipes (read/write) are owned and used inside the data thread.
+        */
+       struct lttng_pipe *consumer_wakeup_pipe;
+       /* Indicate if the wakeup thread has been notified. */
+       unsigned int has_wakeup:1;
+
        /* to let the signal handler wake up the fd receiver thread */
        int consumer_should_quit[2];
        /* Metadata poll thread pipe. Transfer metadata stream to it */
index 0955e66fa4e384fdde2f70703a96e626b8ebfc8c..9f2e739a36fb59ba5bbba2df574dfcecbab2327d 100644 (file)
@@ -1853,6 +1853,57 @@ end:
        return ret;
 }
 
+/*
+ * Return 0 on success else a negative value.
+ */
+static int notify_if_more_data(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       struct ustctl_consumer_stream *ustream;
+
+       assert(stream);
+       assert(ctx);
+
+       ustream = stream->ustream;
+
+       /*
+        * First, we are going to check if there is a new subbuffer available
+        * before reading the stream wait_fd.
+        */
+       /* Get the next subbuffer */
+       ret = ustctl_get_next_subbuf(ustream);
+       if (ret) {
+               /* No more data found, flag the stream. */
+               stream->has_data = 0;
+               ret = 0;
+               goto end;
+       }
+
+       ret = ustctl_put_next_subbuf(ustream);
+       assert(!ret);
+
+       /* This stream still has data. Flag it and wake up the data thread. */
+       stream->has_data = 1;
+
+       if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
+               ssize_t writelen;
+
+               writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
+               if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       ret = writelen;
+                       goto end;
+               }
+
+               /* The wake up pipe has been notified. */
+               ctx->has_wakeup = 1;
+       }
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Read subbuffer from the given stream.
  *
@@ -1866,7 +1917,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        unsigned long len, subbuf_size, padding;
        int err, write_index = 1;
        long ret = 0;
-       char dummy;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
 
@@ -1881,11 +1931,17 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ustream = stream->ustream;
 
        /*
-        * We can consume the 1 byte written into the wait_fd by UST.
-        * Don't trigger error if we cannot read this one byte (read
-        * returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+        * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
+        * error if we cannot read this one byte (read returns 0), or if the error
+        * is EAGAIN or EWOULDBLOCK.
+        *
+        * This is only done when the stream is monitored by a thread, before the
+        * flush is done after a hangup and if the stream is not flagged with data
+        * since there might be nothing to consume in the wait fd but still have
+        * data available flagged by the consumer wake up pipe.
         */
-       if (stream->monitor && !stream->hangup_flush_done) {
+       if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
+               char dummy;
                ssize_t readlen;
 
                readlen = lttng_read(stream->wait_fd, &dummy, 1);
@@ -1971,6 +2027,17 @@ retry:
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
 
+       /*
+        * This will consumer the byte on the wait_fd if and only if there is not
+        * next subbuffer to be acquired.
+        */
+       if (!stream->metadata_flag) {
+               ret = notify_if_more_data(stream, ctx);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
        /* Write index if needed. */
        if (!write_index) {
                goto end;
This page took 0.029514 seconds and 4 git commands to generate.