*/
consumer_stream_destroy(metadata->metadata_stream, NULL);
metadata->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
send_streams_error:
error_no_stream:
end:
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
if (ret < 0) {
goto error;
}
*/
consumer_stream_destroy(metadata_stream, NULL);
metadata_channel->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
error:
rcu_read_unlock();
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, uint64_t version,
- struct lttng_consumer_channel *channel, int timer, int wait)
+ struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
if (!wait) {
goto end_free;
}
- while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
- DBG("Waiting for metadata to be flushed");
-
- health_code_update();
- usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
- }
+ consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
end_free:
free(metadata_str);
health_code_update();
ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
- version, found_channel, 0, 1);
+ version, found_channel, false, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
goto end;
}
stream->ust_metadata_pushed += write_len;
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
assert(stream->chan->metadata_cache->contents.size >=
stream->ust_metadata_pushed);
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
* pushed out due to concurrent interaction with the session daemon.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer, int wait)
+ struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
health_code_update();
ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, version, channel, timer, wait);
+ key, offset, len, version, channel, invoked_by_timer, wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive