X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=b43ae58ffd84b4be5c836130341d6c8b200b672c;hp=e27e15ca573ade45605606ebbff0a01448ae67a2;hb=95671f5349e87cdd2ea6cb47243608e9368ab8d5;hpb=edbac916fd984b0b50dc3f7cc352a94cb7f24287 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index e27e15ca5..b43ae58ff 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -950,6 +950,8 @@ error: */ consumer_stream_destroy(metadata->metadata_stream, NULL); metadata->metadata_stream = NULL; + lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue); + send_streams_error: error_no_stream: end: @@ -985,7 +987,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1); if (ret < 0) { goto error; } @@ -1032,6 +1034,7 @@ error_stream: */ consumer_stream_destroy(metadata_stream, NULL); metadata_channel->metadata_stream = NULL; + lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue); error: rcu_read_unlock(); @@ -1275,7 +1278,7 @@ void metadata_stream_reset_cache_consumed_position( */ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, uint64_t len, uint64_t version, - struct lttng_consumer_channel *channel, int timer, int wait) + struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; @@ -1364,13 +1367,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, if (!wait) { goto end_free; } - while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { - DBG("Waiting for metadata to be flushed"); - - health_code_update(); - usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); - } + consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer); end_free: free(metadata_str); @@ -1821,7 +1819,7 @@ end_get_channel_nosignal: health_code_update(); ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len, - version, found_channel, 0, 1); + version, found_channel, false, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_push_metadata_fatal; @@ -2613,6 +2611,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) goto end; } stream->ust_metadata_pushed += write_len; + lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); assert(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); @@ -2662,7 +2661,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { status = SYNC_METADATA_STATUS_ERROR; @@ -3312,7 +3311,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * pushed out due to concurrent interaction with the session daemon. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, int timer, int wait) + struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; @@ -3420,7 +3419,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, - key, offset, len, version, channel, timer, wait); + key, offset, len, version, channel, invoked_by_timer, wait); if (ret >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive