Fix: add consumer wake up pipe to avoid race
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 83aa5986f7f663e122a92f58dd7895836d807149..9f2e739a36fb59ba5bbba2df574dfcecbab2327d 100644 (file)
@@ -409,6 +409,7 @@ static int send_sessiond_channel(int sock,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_consumer_stream *stream;
+       uint64_t net_seq_idx = -1ULL;
 
        assert(channel);
        assert(ctx);
@@ -433,6 +434,9 @@ static int send_sessiond_channel(int sock,
                                }
                                ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                        }
+                       if (net_seq_idx == -1ULL) {
+                               net_seq_idx = stream->net_seq_idx;
+                       }
                }
        }
 
@@ -625,45 +629,6 @@ error:
        rcu_read_unlock();
        return ret;
 }
-/*
- * Close metadata stream wakeup_fd using the given key to retrieve the channel.
- * RCU read side lock MUST be acquired before calling this function.
- *
- * NOTE: This function does NOT take any channel nor stream lock.
- *
- * Return 0 on success else LTTng error code.
- */
-static int _close_metadata(struct lttng_consumer_channel *channel)
-{
-       int ret = LTTCOMM_CONSUMERD_SUCCESS;
-
-       assert(channel);
-       assert(channel->type == CONSUMER_CHANNEL_TYPE_METADATA);
-
-       if (channel->switch_timer_enabled == 1) {
-               DBG("Deleting timer on metadata channel");
-               consumer_timer_switch_stop(channel);
-       }
-
-       if (channel->metadata_stream) {
-               ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
-               if (ret < 0) {
-                       ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
-                       ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
-               }
-
-               if (channel->monitor) {
-                       /* Close the read-side in consumer_del_metadata_stream */
-                       ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]);
-                       if (ret < 0) {
-                               PERROR("Close UST metadata write-side poll pipe");
-                               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
-                       }
-               }
-       }
-
-       return ret;
-}
 
 /*
  * Close metadata stream wakeup_fd using the given key to retrieve the channel.
@@ -698,7 +663,7 @@ static int close_metadata(uint64_t chan_key)
                goto error_unlock;
        }
 
-       ret = _close_metadata(channel);
+       lttng_ustconsumer_close_metadata(channel);
 
 error_unlock:
        pthread_mutex_unlock(&channel->lock);
@@ -754,6 +719,12 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
                        ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                        goto error;
                }
+               ret = consumer_send_relayd_streams_sent(
+                               metadata->metadata_stream->net_seq_idx);
+               if (ret < 0) {
+                       ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+                       goto error;
+               }
        }
 
        ret = send_streams_to_thread(metadata, ctx);
@@ -778,8 +749,9 @@ error:
         * the stream is still in the local stream list of the channel. This call
         * will make sure to clean that list.
         */
-       cds_list_del(&metadata->metadata_stream->send_node);
        consumer_stream_destroy(metadata->metadata_stream, NULL);
+       cds_list_del(&metadata->metadata_stream->send_node);
+       metadata->metadata_stream = NULL;
 error_no_stream:
 end:
        return ret;
@@ -871,8 +843,8 @@ error_stream:
         * Clean up the stream completly because the next snapshot will use a new
         * metadata stream.
         */
-       cds_list_del(&metadata_stream->send_node);
        consumer_stream_destroy(metadata_stream, NULL);
+       cds_list_del(&metadata_stream->send_node);
        metadata_channel->metadata_stream = NULL;
 
 error:
@@ -939,6 +911,12 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
                                        stream->name, stream->key);
                }
+               if (relayd_id != -1ULL) {
+                       ret = consumer_send_relayd_streams_sent(relayd_id);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
+               }
 
                ustctl_flush_buffer(stream->ustream, 1);
 
@@ -1138,17 +1116,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
        health_code_update();
 
-       if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
-               /*
-                * Notify the session daemon that the command is completed.
-                *
-                * On transport layer error, the function call will print an error
-                * message so handling the returned code is a bit useless since we
-                * return an error code anyway.
-                */
-               (void) consumer_send_status_msg(sock, ret_code);
-               return -ENOENT;
-       }
+       /* deprecated */
+       assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
 
        health_code_update();
 
@@ -1451,8 +1420,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                channel = consumer_find_channel(key);
                if (!channel) {
-                       ERR("UST consumer push metadata %" PRIu64 " not found", key);
-                       ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+                       /*
+                        * This is possible if the metadata creation on the consumer side
+                        * is in flight vis-a-vis a concurrent push metadata from the
+                        * session daemon.  Simply return that the channel failed and the
+                        * session daemon will handle that message correctly considering
+                        * that this race is acceptable thus the DBG() statement here.
+                        */
+                       DBG("UST consumer push metadata %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
                        goto end_msg_sessiond;
                }
 
@@ -1471,7 +1447,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_poll_entry();
                ret = lttng_consumer_poll_socket(consumer_sockpoll);
                health_poll_exit();
-               if (ret < 0) {
+               if (ret) {
                        goto error_fatal;
                }
 
@@ -1706,6 +1682,22 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
        ustctl_destroy_stream(stream->ustream);
 }
 
+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+       assert(stream);
+       assert(stream->ustream);
+
+       return ustctl_stream_get_wakeup_fd(stream->ustream);
+}
+
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
+{
+       assert(stream);
+       assert(stream->ustream);
+
+       return ustctl_stream_close_wakeup_fd(stream->ustream);
+}
+
 /*
  * Populate index values of a UST stream. Values are set in big endian order.
  *
@@ -1861,6 +1853,57 @@ end:
        return ret;
 }
 
+/*
+ * Return 0 on success else a negative value.
+ */
+static int notify_if_more_data(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       struct ustctl_consumer_stream *ustream;
+
+       assert(stream);
+       assert(ctx);
+
+       ustream = stream->ustream;
+
+       /*
+        * First, we are going to check if there is a new subbuffer available
+        * before reading the stream wait_fd.
+        */
+       /* Get the next subbuffer */
+       ret = ustctl_get_next_subbuf(ustream);
+       if (ret) {
+               /* No more data found, flag the stream. */
+               stream->has_data = 0;
+               ret = 0;
+               goto end;
+       }
+
+       ret = ustctl_put_next_subbuf(ustream);
+       assert(!ret);
+
+       /* This stream still has data. Flag it and wake up the data thread. */
+       stream->has_data = 1;
+
+       if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
+               ssize_t writelen;
+
+               writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
+               if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       ret = writelen;
+                       goto end;
+               }
+
+               /* The wake up pipe has been notified. */
+               ctx->has_wakeup = 1;
+       }
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Read subbuffer from the given stream.
  *
@@ -1874,7 +1917,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        unsigned long len, subbuf_size, padding;
        int err, write_index = 1;
        long ret = 0;
-       char dummy;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
 
@@ -1889,11 +1931,17 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ustream = stream->ustream;
 
        /*
-        * We can consume the 1 byte written into the wait_fd by UST.
-        * Don't trigger error if we cannot read this one byte (read
-        * returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+        * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
+        * error if we cannot read this one byte (read returns 0), or if the error
+        * is EAGAIN or EWOULDBLOCK.
+        *
+        * This is only done when the stream is monitored by a thread, before the
+        * flush is done after a hangup and if the stream is not flagged with data
+        * since there might be nothing to consume in the wait fd but still have
+        * data available flagged by the consumer wake up pipe.
         */
-       if (stream->monitor && !stream->hangup_flush_done) {
+       if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
+               char dummy;
                ssize_t readlen;
 
                readlen = lttng_read(stream->wait_fd, &dummy, 1);
@@ -1979,6 +2027,17 @@ retry:
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
 
+       /*
+        * This will consumer the byte on the wait_fd if and only if there is not
+        * next subbuffer to be acquired.
+        */
+       if (!stream->metadata_flag) {
+               ret = notify_if_more_data(stream, ctx);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
        /* Write index if needed. */
        if (!write_index) {
                goto end;
@@ -2111,6 +2170,45 @@ end:
        return ret;
 }
 
+/*
+ * Stop a given metadata channel timer if enabled and close the wait fd which
+ * is the poll pipe of the metadata stream.
+ *
+ * This MUST be called with the metadata channel acquired.
+ */
+void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
+{
+       int ret;
+
+       assert(metadata);
+       assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
+
+       DBG("Closing metadata channel key %" PRIu64, metadata->key);
+
+       if (metadata->switch_timer_enabled == 1) {
+               consumer_timer_switch_stop(metadata);
+       }
+
+       if (!metadata->metadata_stream) {
+               goto end;
+       }
+
+       /*
+        * Closing write side so the thread monitoring the stream wakes up if any
+        * and clean the metadata stream.
+        */
+       if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
+               ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
+               if (ret < 0) {
+                       PERROR("closing metadata pipe write side");
+               }
+               metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
+       }
+
+end:
+       return;
+}
+
 /*
  * Close every metadata stream wait fd of the metadata hash table. This
  * function MUST be used very carefully so not to run into a race between the
@@ -2120,7 +2218,7 @@ end:
  * producer so calling this is safe because we are assured that no state change
  * can occur in the metadata thread for the streams in the hash table.
  */
-void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
+void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
 {
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
@@ -2137,13 +2235,7 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
                health_code_update();
 
                pthread_mutex_lock(&stream->chan->lock);
-               /*
-                * Whatever returned value, we must continue to try to close everything
-                * so ignore it.
-                */
-               (void) _close_metadata(stream->chan);
-               DBG("Metadata wait fd %d and poll pipe fd %d closed", stream->wait_fd,
-                               stream->ust_metadata_poll_pipe[1]);
+               lttng_ustconsumer_close_metadata(stream->chan);
                pthread_mutex_unlock(&stream->chan->lock);
 
        }
@@ -2178,6 +2270,8 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        assert(channel);
        assert(channel->metadata_cache);
 
+       memset(&request, 0, sizeof(request));
+
        /* send the metadata request to sessiond */
        switch (consumer_data.type) {
        case LTTNG_CONSUMER64_UST:
This page took 0.027361 seconds and 4 git commands to generate.