X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.cpp;h=3f40d03ed3d846cb8fa95788c94bf555be5fb28f;hb=HEAD;hp=6274cdcc5341bdfa62bdcca12e8f8c945bb50007;hpb=671e39d79a1ad9c3f13c4784a26710a5b1f14237;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index 6274cdcc5..3f40d03ed 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -930,6 +930,8 @@ error: */ consumer_stream_destroy(metadata->metadata_stream, nullptr); metadata->metadata_stream = nullptr; + metadata->metadata_pushed_wait_queue.wake_all(); + send_streams_error: error_no_stream: end: @@ -967,7 +969,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1); if (ret < 0) { goto error; } @@ -1013,6 +1015,7 @@ error_stream: */ consumer_stream_destroy(metadata_stream, nullptr); metadata_channel->metadata_stream = nullptr; + metadata_channel->metadata_pushed_wait_queue.wake_all(); error: return ret; @@ -1251,7 +1254,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t len, uint64_t version, struct lttng_consumer_channel *channel, - int timer, + bool invoked_by_timer, int wait) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; @@ -1339,13 +1342,8 @@ int lttng_ustconsumer_recv_metadata(int sock, if (!wait) { goto end_free; } - while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { - DBG("Waiting for metadata to be flushed"); - health_code_update(); - - usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); - } + consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer); end_free: free(metadata_str); @@ -1790,7 +1788,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); ret = lttng_ustconsumer_recv_metadata( - sock, key, offset, len, version, found_channel, 0, 1); + sock, key, offset, len, version, found_channel, false, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_push_metadata_fatal; @@ -2550,7 +2548,9 @@ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) ret = write_len; goto end; } + stream->ust_metadata_pushed += write_len; + stream->chan->metadata_pushed_wait_queue.wake_all(); LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); ret = write_len; @@ -2599,7 +2599,7 @@ lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { status = SYNC_METADATA_STATUS_ERROR; @@ -3238,7 +3238,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, - int timer, + bool invoked_by_timer, int wait) { struct lttcomm_metadata_request_msg request; @@ -3343,8 +3343,14 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = lttng_ustconsumer_recv_metadata( - ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait); + ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, + key, + offset, + len, + version, + channel, + invoked_by_timer, + wait); if (ret >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive