Fix: add consumer wake up pipe to avoid race
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index b48a3c821d386fbf3cbfaceb8b01d9bbcc4f724d..9f2e739a36fb59ba5bbba2df574dfcecbab2327d 100644 (file)
@@ -1116,17 +1116,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
        health_code_update();
 
-       if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
-               /*
-                * Notify the session daemon that the command is completed.
-                *
-                * On transport layer error, the function call will print an error
-                * message so handling the returned code is a bit useless since we
-                * return an error code anyway.
-                */
-               (void) consumer_send_status_msg(sock, ret_code);
-               return -ENOENT;
-       }
+       /* deprecated */
+       assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
 
        health_code_update();
 
@@ -1456,7 +1447,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_poll_entry();
                ret = lttng_consumer_poll_socket(consumer_sockpoll);
                health_poll_exit();
-               if (ret < 0) {
+               if (ret) {
                        goto error_fatal;
                }
 
@@ -1862,6 +1853,57 @@ end:
        return ret;
 }
 
+/*
+ * Return 0 on success else a negative value.
+ */
+static int notify_if_more_data(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       struct ustctl_consumer_stream *ustream;
+
+       assert(stream);
+       assert(ctx);
+
+       ustream = stream->ustream;
+
+       /*
+        * First, we are going to check if there is a new subbuffer available
+        * before reading the stream wait_fd.
+        */
+       /* Get the next subbuffer */
+       ret = ustctl_get_next_subbuf(ustream);
+       if (ret) {
+               /* No more data found, flag the stream. */
+               stream->has_data = 0;
+               ret = 0;
+               goto end;
+       }
+
+       ret = ustctl_put_next_subbuf(ustream);
+       assert(!ret);
+
+       /* This stream still has data. Flag it and wake up the data thread. */
+       stream->has_data = 1;
+
+       if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
+               ssize_t writelen;
+
+               writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
+               if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       ret = writelen;
+                       goto end;
+               }
+
+               /* The wake up pipe has been notified. */
+               ctx->has_wakeup = 1;
+       }
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Read subbuffer from the given stream.
  *
@@ -1875,7 +1917,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        unsigned long len, subbuf_size, padding;
        int err, write_index = 1;
        long ret = 0;
-       char dummy;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
 
@@ -1890,11 +1931,17 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ustream = stream->ustream;
 
        /*
-        * We can consume the 1 byte written into the wait_fd by UST.
-        * Don't trigger error if we cannot read this one byte (read
-        * returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+        * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
+        * error if we cannot read this one byte (read returns 0), or if the error
+        * is EAGAIN or EWOULDBLOCK.
+        *
+        * This is only done when the stream is monitored by a thread, before the
+        * flush is done after a hangup and if the stream is not flagged with data
+        * since there might be nothing to consume in the wait fd but still have
+        * data available flagged by the consumer wake up pipe.
         */
-       if (stream->monitor && !stream->hangup_flush_done) {
+       if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
+               char dummy;
                ssize_t readlen;
 
                readlen = lttng_read(stream->wait_fd, &dummy, 1);
@@ -1980,6 +2027,17 @@ retry:
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
 
+       /*
+        * This will consumer the byte on the wait_fd if and only if there is not
+        * next subbuffer to be acquired.
+        */
+       if (!stream->metadata_flag) {
+               ret = notify_if_more_data(stream, ctx);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
        /* Write index if needed. */
        if (!write_index) {
                goto end;
@@ -2212,6 +2270,8 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        assert(channel);
        assert(channel->metadata_cache);
 
+       memset(&request, 0, sizeof(request));
+
        /* send the metadata request to sessiond */
        switch (consumer_data.type) {
        case LTTNG_CONSUMER64_UST:
This page took 0.02431 seconds and 4 git commands to generate.