X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=676aa0d9e2f066bec6b439f61e151579f4758bb5;hp=61b220ee012a76930d3149be0eba07ba2fef919b;hb=6a00837f8cb0431a2ad90974d67fae138ea97dd5;hpb=ec6ea7d01adc8a9d1481ba645b282c92ec27208e diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 61b220ee0..676aa0d9e 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -194,18 +194,41 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, /* Get the right pipe where the stream will be sent. */ if (stream->metadata_flag) { + ret = consumer_add_metadata_stream(stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed.", + stream->key); + goto error; + } stream_pipe = ctx->consumer_metadata_pipe; } else { + ret = consumer_add_data_stream(stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed.", + stream->key); + goto error; + } stream_pipe = ctx->consumer_data_pipe; } + /* + * From this point on, the stream's ownership has been moved away from + * the channel and becomes globally visible. + */ + stream->globally_visible = 1; + ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); if (ret < 0) { ERR("Consumer write %s stream to pipe %d", stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); + if (stream->metadata_flag) { + consumer_del_stream_for_metadata(stream); + } else { + consumer_del_stream_for_data(stream); + } } - +error: return ret; } @@ -533,17 +556,14 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, * If we are unable to send the stream to the thread, there is * a big problem so just stop everything. */ + /* Remove node from the channel stream list. */ + cds_list_del(&stream->send_node); goto error; } /* Remove node from the channel stream list. */ cds_list_del(&stream->send_node); - /* - * From this point on, the stream's ownership has been moved away from - * the channel and becomes globally visible. - */ - stream->globally_visible = 1; } error: @@ -579,7 +599,7 @@ static int flush_channel(uint64_t chan_key) cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter, stream, node_channel_id.node) { - ustctl_flush_buffer(stream->ustream, 1); + ustctl_flush_buffer(stream->ustream, 1); } error: rcu_read_unlock(); @@ -614,7 +634,6 @@ static int close_metadata(uint64_t chan_key) pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); - pthread_mutex_lock(&channel->timer_lock); if (cds_lfht_is_node_deleted(&channel->node.node)) { goto error_unlock; @@ -642,7 +661,6 @@ static int close_metadata(uint64_t chan_key) } error_unlock: - pthread_mutex_unlock(&channel->timer_lock); pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); error: @@ -749,7 +767,8 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, metadata_channel = consumer_find_channel(key); if (!metadata_channel) { - ERR("UST snapshot metadata channel not found for key %lu", key); + ERR("UST snapshot metadata channel not found for key %" PRIu64, + key); ret = -1; goto error; } @@ -759,7 +778,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0); if (ret < 0) { goto error; } @@ -845,12 +864,12 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, channel = consumer_find_channel(key); if (!channel) { - ERR("UST snapshot channel not found for key %lu", key); + ERR("UST snapshot channel not found for key %" PRIu64, key); ret = -1; goto error; } assert(!channel->monitor); - DBG("UST consumer snapshot channel %lu", key); + DBG("UST consumer snapshot channel %" PRIu64, key); cds_list_for_each_entry(stream, &channel->streams.head, send_node) { /* Lock stream because we are about to change its state. */ @@ -982,7 +1001,8 @@ error: * Receive the metadata updates from the sessiond. */ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, - uint64_t len, struct lttng_consumer_channel *channel) + uint64_t len, struct lttng_consumer_channel *channel, + int timer) { int ret, ret_code = LTTNG_OK; char *metadata_str; @@ -1019,7 +1039,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, } pthread_mutex_unlock(&channel->metadata_cache->lock); - while (consumer_metadata_cache_flushed(channel, offset + len)) { + while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { DBG("Waiting for metadata to be flushed"); usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); } @@ -1354,7 +1374,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret = lttng_ustconsumer_recv_metadata(sock, key, offset, - len, channel); + len, channel, 0); if (ret < 0) { /* error receiving from sessiond */ goto error_fatal; @@ -1813,7 +1833,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * introduces deadlocks. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel) + struct lttng_consumer_channel *channel, int timer) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; @@ -1901,7 +1921,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, } ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, - key, offset, len, channel); + key, offset, len, channel, timer); if (ret_code >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive