Fix: skip metadata flushed check if write failed
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 7d9444cd00b84300450e69676b6d42a668032e67..f7ffb0febf5b7bcdc338b30975ae7e3379724935 100644 (file)
@@ -185,21 +185,21 @@ error:
 static int send_stream_to_thread(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
-       int ret, stream_pipe;
+       int ret;
+       struct lttng_pipe *stream_pipe;
 
        /* Get the right pipe where the stream will be sent. */
        if (stream->metadata_flag) {
-               stream_pipe = ctx->consumer_metadata_pipe[1];
+               stream_pipe = ctx->consumer_metadata_pipe;
        } else {
-               stream_pipe = ctx->consumer_data_pipe[1];
+               stream_pipe = ctx->consumer_data_pipe;
        }
 
-       do {
-               ret = write(stream_pipe, &stream, sizeof(stream));
-       } while (ret < 0 && errno == EINTR);
+       ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
        if (ret < 0) {
-               PERROR("Consumer write %s stream to pipe %d",
-                               stream->metadata_flag ? "metadata" : "data", stream_pipe);
+               ERR("Consumer write %s stream to pipe %d",
+                               stream->metadata_flag ? "metadata" : "data",
+                               lttng_pipe_get_writefd(stream_pipe));
        }
 
        return ret;
@@ -278,6 +278,12 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                 */
                stream->wait_fd = wait_fd;
 
+               /*
+                * Increment channel refcount since the channel reference has now been
+                * assigned in the allocation process above.
+                */
+               uatomic_inc(&stream->chan->refcount);
+
                /*
                 * Order is important this is why a list is used. On error, the caller
                 * should clean this list.
@@ -485,10 +491,6 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
 
        channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
 
-       if (ret < 0) {
-               goto error;
-       }
-
        /* Open all streams for this channel. */
        ret = create_ust_streams(channel, ctx);
        if (ret < 0) {
@@ -552,6 +554,11 @@ int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata,
 
        DBG("UST consumer writing metadata to channel %s", metadata->name);
 
+       if (!metadata->metadata_stream) {
+               ret = 0;
+               goto error;
+       }
+
        assert(target_offset <= metadata->metadata_cache->max_offset);
        ret = ustctl_write_metadata_to_channel(metadata->uchan,
                        metadata_str + target_offset, len);
@@ -610,7 +617,7 @@ error:
  */
 static int close_metadata(uint64_t chan_key)
 {
-       int ret;
+       int ret = 0;
        struct lttng_consumer_channel *channel;
 
        DBG("UST consumer close metadata key %" PRIu64, chan_key);
@@ -622,18 +629,28 @@ static int close_metadata(uint64_t chan_key)
                goto error;
        }
 
-       ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
-       if (ret < 0) {
-               ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
-               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
-               goto error;
+       pthread_mutex_lock(&consumer_data.lock);
+
+       if (cds_lfht_is_node_deleted(&channel->node.node)) {
+               goto error_unlock;
        }
+
        if (channel->switch_timer_enabled == 1) {
                DBG("Deleting timer on metadata channel");
                consumer_timer_switch_stop(channel);
        }
-       consumer_metadata_cache_destroy(channel);
 
+       if (channel->metadata_stream) {
+               ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+               if (ret < 0) {
+                       ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+                       ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       goto error_unlock;
+               }
+       }
+
+error_unlock:
+       pthread_mutex_unlock(&consumer_data.lock);
 error:
        return ret;
 }
@@ -718,13 +735,33 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                goto end_free;
        }
 
+       /*
+        * XXX: The consumer data lock is acquired before calling metadata cache
+        * write which calls push metadata that MUST be protected by the consumer
+        * lock in order to be able to check the validity of the metadata stream of
+        * the channel.
+        *
+        * Note that this will be subject to change to better fine grained locking
+        * and ultimately try to get rid of this global consumer data lock.
+        */
+       pthread_mutex_lock(&consumer_data.lock);
+
        pthread_mutex_lock(&channel->metadata_cache->lock);
        ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
        if (ret < 0) {
                /* Unable to handle metadata. Notify session daemon. */
                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               /*
+                * Skip metadata flush on write error since the offset and len might
+                * not have been updated which could create an infinite loop below when
+                * waiting for the metadata cache to be flushed.
+                */
+               pthread_mutex_unlock(&channel->metadata_cache->lock);
+               pthread_mutex_unlock(&consumer_data.lock);
+               goto end_free;
        }
        pthread_mutex_unlock(&channel->metadata_cache->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
 
        while (consumer_metadata_cache_flushed(channel, offset + len)) {
                DBG("Waiting for metadata to be flushed");
@@ -923,10 +960,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                ret = add_channel(channel, ctx);
                if (ret < 0) {
+                       if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
+                               if (channel->switch_timer_enabled == 1) {
+                                       consumer_timer_switch_stop(channel);
+                               }
+                               consumer_metadata_cache_destroy(channel);
+                       }
                        goto end_channel_error;
                }
 
-
                /*
                 * Channel and streams are now created. Inform the session daemon that
                 * everything went well and should wait to receive the channel and
@@ -1193,6 +1235,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
        assert(chan);
        assert(chan->uchan);
 
+       if (chan->switch_timer_enabled == 1) {
+               consumer_timer_switch_stop(chan);
+       }
+       consumer_metadata_cache_destroy(chan);
        ustctl_destroy_channel(chan->uchan);
 }
 
@@ -1201,6 +1247,9 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
        assert(stream);
        assert(stream->ustream);
 
+       if (stream->chan->switch_timer_enabled == 1) {
+               consumer_timer_switch_stop(stream->chan);
+       }
        ustctl_destroy_stream(stream->ustream);
 }
 
This page took 0.025522 seconds and 4 git commands to generate.