From 94d4914075c61cd1ee2ec00d8b61eacff105fc47 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Thu, 12 Sep 2013 11:04:22 -0400 Subject: [PATCH] Test for new metadata at each packet After sending each data packet in live, we need to check if new metadata is available before sending the index informing the viewer it can read the trace. Since the data and the metadata are handled by two different threads, this patch introduces a rendez-vous point: if new metadata is available, the data thread flushes the metadata stream and waits on a conditionnal variable. When the metadata thread finishes to send its data, it wakes up the data thread which can send its index. That way, the viewer is informed new metadata is available before attempting to read a packet that might require an update of the metadata. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- src/bin/lttng-relayd/main.c | 118 +++++++------- src/common/consumer-stream.c | 136 ++++++++++++++++ src/common/consumer-stream.h | 3 + src/common/consumer-timer.c | 9 +- src/common/consumer.c | 10 ++ src/common/consumer.h | 6 + src/common/kernel-consumer/kernel-consumer.c | 51 +++++- src/common/kernel-consumer/kernel-consumer.h | 1 + src/common/ust-consumer/ust-consumer.c | 159 +++++++++++++++---- src/common/ust-consumer/ust-consumer.h | 12 +- 10 files changed, 410 insertions(+), 95 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 43e6f318a..ba97ab3d7 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -761,6 +761,50 @@ void deferred_free_session(struct rcu_head *head) free(session); } +static void close_stream(struct relay_stream *stream, + struct lttng_ht *viewer_streams_ht, struct lttng_ht *ctf_traces_ht) +{ + int delret; + struct relay_viewer_stream *vstream; + struct lttng_ht_iter iter; + + assert(stream); + assert(viewer_streams_ht); + + delret = close(stream->fd); + if (delret < 0) { + PERROR("close stream"); + } + + if (stream->index_fd >= 0) { + delret = close(stream->index_fd); + if (delret < 0) { + PERROR("close stream index_fd"); + } + } + + vstream = live_find_viewer_stream_by_id(stream->stream_handle, + viewer_streams_ht); + if (vstream) { + /* + * Set the last good value into the viewer stream. This is done + * right before the stream gets deleted from the hash table. The + * lookup failure on the live thread side of a stream indicates + * that the viewer stream index received value should be used. + */ + vstream->total_index_received = stream->total_index_received; + } + + iter.iter.node = &stream->stream_n.node; + delret = lttng_ht_del(relay_streams_ht, &iter); + assert(!delret); + iter.iter.node = &stream->ctf_trace_node.node; + delret = lttng_ht_del(ctf_traces_ht, &iter); + assert(!delret); + call_rcu(&stream->rcu_node, deferred_free_stream); + DBG("Closed tracefile %d from close stream", stream->fd); +} + /* * relay_delete_session: Free all memory associated with a session and * close all the FDs @@ -1031,12 +1075,11 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, struct relay_command *cmd, struct lttng_ht *viewer_streams_ht) { + int ret, send_ret; struct relay_session *session = cmd->session; struct lttcomm_relayd_close_stream stream_info; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; - int ret, send_ret; - struct lttng_ht_iter iter; DBG("Close stream received"); @@ -1070,42 +1113,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->close_flag = 1; if (close_stream_check(stream)) { - int delret; - struct relay_viewer_stream *vstream; - - delret = close(stream->fd); - if (delret < 0) { - PERROR("close stream"); - } - - if (stream->index_fd >= 0) { - delret = close(stream->index_fd); - if (delret < 0) { - PERROR("close stream index_fd"); - } - } - - vstream = live_find_viewer_stream_by_id(stream->stream_handle, - viewer_streams_ht); - if (vstream) { - /* - * Set the last good value into the viewer stream. This is done - * right before the stream gets deleted from the hash table. The - * lookup failure on the live thread side of a stream indicates - * that the viewer stream index received value should be used. - */ - vstream->total_index_received = stream->total_index_received; - } - - iter.iter.node = &stream->stream_n.node; - delret = lttng_ht_del(relay_streams_ht, &iter); - assert(!delret); - iter.iter.node = &stream->ctf_trace_node.node; - delret = lttng_ht_del(cmd->ctf_traces_ht, &iter); - assert(!delret); - call_rcu(&stream->rcu_node, - deferred_free_stream); - DBG("Closed tracefile %d from close stream", stream->fd); + close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht); } end_unlock: @@ -1829,7 +1837,7 @@ end: */ static int relay_process_data(struct relay_command *cmd, - struct lttng_ht *indexes_ht) + struct lttng_ht *indexes_ht, struct lttng_ht *viewer_streams_ht) { int ret = 0, rotate_index = 0, index_created = 0; struct relay_stream *stream; @@ -2001,24 +2009,7 @@ int relay_process_data(struct relay_command *cmd, /* Check if we need to close the FD */ if (close_stream_check(stream)) { - int cret; - struct lttng_ht_iter iter; - - cret = close(stream->fd); - if (cret < 0) { - PERROR("close stream process data"); - } - - cret = close(stream->index_fd); - if (cret < 0) { - PERROR("close stream index_fd"); - } - iter.iter.node = &stream->stream_n.node; - ret = lttng_ht_del(relay_streams_ht, &iter); - assert(!ret); - call_rcu(&stream->rcu_node, - deferred_free_stream); - DBG("Closed tracefile %d after recv data", stream->fd); + close_stream(stream, viewer_streams_ht, cmd->ctf_traces_ht); } end_rcu_unlock: @@ -2320,7 +2311,8 @@ restart: continue; } - ret = relay_process_data(relay_connection, indexes_ht); + ret = relay_process_data(relay_connection, indexes_ht, + relay_ctx->viewer_streams_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2360,9 +2352,15 @@ error: &iter, relay_connection, sessions_ht); } } - rcu_read_unlock(); error_poll_create: - lttng_ht_destroy(indexes_ht); + { + struct relay_index *index; + cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) { + relay_index_delete(index, indexes_ht); + } + lttng_ht_destroy(indexes_ht); + } + rcu_read_unlock(); indexes_ht_error: lttng_ht_destroy(relay_connections_ht); relay_connections_ht_error: diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 920948760..808cae236 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -355,3 +356,138 @@ error: rcu_read_unlock(); return ret; } + +/* + * Synchronize the metadata using a given session ID. A successful acquisition + * of a metadata stream will trigger a request to the session daemon and a + * snapshot so the metadata thread can consume it. + * + * This function call is a rendez-vous point between the metadata thread and + * the data thread. + * + * Return 0 on success or else a negative value. + */ +int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, + uint64_t session_id) +{ + int ret; + struct lttng_consumer_stream *metadata = NULL, *stream = NULL; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + + assert(ctx); + + /* Ease our life a bit. */ + ht = consumer_data.stream_list_ht; + + rcu_read_lock(); + + /* Search the metadata associated with the session id of the given stream. */ + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, + &session_id, &iter.iter, stream, node_session_id.node) { + if (stream->metadata_flag) { + metadata = stream; + break; + } + } + if (!metadata) { + ret = 0; + goto end_unlock_rcu; + } + + /* + * In UST, since we have to write the metadata from the cache packet + * by packet, we might need to start this procedure multiple times + * until all the metadata from the cache has been extracted. + */ + do { + /* + * Steps : + * - Lock the metadata stream + * - Check if metadata stream node was deleted before locking. + * - if yes, release and return success + * - Check if new metadata is ready (flush + snapshot pos) + * - If nothing : release and return. + * - Lock the metadata_rdv_lock + * - Unlock the metadata stream + * - cond_wait on metadata_rdv to wait the wakeup from the + * metadata thread + * - Unlock the metadata_rdv_lock + */ + pthread_mutex_lock(&metadata->lock); + + /* + * There is a possibility that we were able to acquire a reference on the + * stream from the RCU hash table but between then and now, the node might + * have been deleted just before the lock is acquired. Thus, after locking, + * we make sure the metadata node has not been deleted which means that the + * buffers are closed. + * + * In that case, there is no need to sync the metadata hence returning a + * success return code. + */ + ret = cds_lfht_is_node_deleted(&metadata->node.node); + if (ret) { + ret = 0; + goto end_unlock_mutex; + } + + switch (ctx->type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Empty the metadata cache and flush the current stream. + */ + ret = lttng_kconsumer_sync_metadata(metadata); + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Ask the sessiond if we have new metadata waiting and update the + * consumer metadata cache. + */ + ret = lttng_ustconsumer_sync_metadata(ctx, metadata); + break; + default: + assert(0); + ret = -1; + break; + } + /* + * Error or no new metadata, we exit here. + */ + if (ret <= 0 || ret == ENODATA) { + goto end_unlock_mutex; + } + + /* + * At this point, new metadata have been flushed, so we wait on the + * rendez-vous point for the metadata thread to wake us up when it + * finishes consuming the metadata and continue execution. + */ + + pthread_mutex_lock(&metadata->metadata_rdv_lock); + + /* + * Release metadata stream lock so the metadata thread can process it. + */ + pthread_mutex_unlock(&metadata->lock); + + /* + * Wait on the rendez-vous point. Once woken up, it means the metadata was + * consumed and thus synchronization is achieved. + */ + pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock); + pthread_mutex_unlock(&metadata->metadata_rdv_lock); + } while (ret == EAGAIN); + + ret = 0; + goto end_unlock_rcu; + +end_unlock_mutex: + pthread_mutex_unlock(&metadata->lock); +end_unlock_rcu: + rcu_read_unlock(); + return ret; +} diff --git a/src/common/consumer-stream.h b/src/common/consumer-stream.h index 956bb6328..79efa721e 100644 --- a/src/common/consumer-stream.h +++ b/src/common/consumer-stream.h @@ -74,4 +74,7 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream); int consumer_stream_write_index(struct lttng_consumer_stream *stream, struct lttng_packet_index *index); +int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, + uint64_t session_id); + #endif /* LTTNG_CONSUMER_STREAM_H */ diff --git a/src/common/consumer-timer.c b/src/common/consumer-timer.c index e2be05e7e..0f5d4ba96 100644 --- a/src/common/consumer-timer.c +++ b/src/common/consumer-timer.c @@ -100,7 +100,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, * they are held while consumer_timer_switch_stop() is * called. */ - ret = lttng_ustconsumer_request_metadata(ctx, channel, 1); + ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1); if (ret < 0) { channel->switch_timer_error = 1; } @@ -186,6 +186,11 @@ static int check_ust_stream(struct lttng_consumer_stream *stream) * safely send the empty index. */ pthread_mutex_lock(&stream->lock); + ret = cds_lfht_is_node_deleted(&stream->node.node); + if (ret) { + goto error_unlock; + } + ret = ustctl_get_current_timestamp(stream->ustream, &ts); if (ret < 0) { ERR("Failed to get the current timestamp"); @@ -194,7 +199,7 @@ static int check_ust_stream(struct lttng_consumer_stream *stream) ustctl_flush_buffer(stream->ustream, 1); ret = ustctl_snapshot(stream->ustream); if (ret < 0) { - if (errno != EAGAIN) { + if (ret != -EAGAIN) { ERR("Taking UST snapshot"); ret = -1; goto error_unlock; diff --git a/src/common/consumer.c b/src/common/consumer.c index b8695698d..2420d110b 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -520,6 +520,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->metadata_flag = 1; /* Metadata is flat out. */ strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name)); + /* Live rendez-vous point. */ + pthread_cond_init(&stream->metadata_rdv, NULL); + pthread_mutex_init(&stream->metadata_rdv_lock, NULL); } else { /* Format stream name to _ */ ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", @@ -3062,6 +3065,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, ssize_t ret; pthread_mutex_lock(&stream->lock); + if (stream->metadata_flag) { + pthread_mutex_lock(&stream->metadata_rdv_lock); + } switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -3078,6 +3084,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, break; } + if (stream->metadata_flag) { + pthread_cond_broadcast(&stream->metadata_rdv); + pthread_mutex_unlock(&stream->metadata_rdv_lock); + } pthread_mutex_unlock(&stream->lock); return ret; } diff --git a/src/common/consumer.h b/src/common/consumer.h index 2bf572303..aef7f560e 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -334,6 +334,12 @@ struct lttng_consumer_stream { * FD of the index file for this stream. */ int index_fd; + + /* + * Rendez-vous point between data and metadata stream in live mode. + */ + pthread_cond_t metadata_rdv; + pthread_mutex_t metadata_rdv_lock; }; /* diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 4618ccedc..d02e8502d 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -504,7 +504,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } - consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval); + if (CONSUMER_CHANNEL_TYPE_DATA) { + consumer_timer_live_start(new_channel, + msg.u.channel.live_timer_interval); + } /* If we received an error in add_channel, we need to report it. */ if (ret < 0) { @@ -902,6 +905,42 @@ static int get_index_values(struct lttng_packet_index *index, int infd) error: return ret; } +/* + * Sync metadata meaning request them to the session daemon and snapshot to the + * metadata thread can consumer them. + * + * Metadata stream lock MUST be acquired. + * + * Return 0 if new metadatda is available, EAGAIN if the metadata stream + * is empty or a negative value on error. + */ +int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata) +{ + int ret; + + assert(metadata); + + ret = kernctl_buffer_flush(metadata->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + + ret = kernctl_snapshot(metadata->wait_fd); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Sync metadata, taking kernel snapshot failed."); + goto end; + } + DBG("Sync metadata, no new kernel metadata"); + /* No new metadata, exit. */ + ret = ENODATA; + goto end; + } + +end: + return ret; +} /* * Consume data on a file descriptor and write it on a trace file. @@ -1032,6 +1071,16 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + if (stream->chan->live_timer_interval && !stream->metadata_flag) { + /* + * In live, block until all the metadata is sent. + */ + err = consumer_stream_sync_metadata(ctx, stream->session_id); + if (err < 0) { + goto end; + } + } + err = consumer_stream_write_index(stream, &index); if (err < 0) { goto end; diff --git a/src/common/kernel-consumer/kernel-consumer.h b/src/common/kernel-consumer/kernel-consumer.h index d8f74ca1f..1aad2733b 100644 --- a/src/common/kernel-consumer/kernel-consumer.h +++ b/src/common/kernel-consumer/kernel-consumer.h @@ -32,5 +32,6 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx); int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream); int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream); +int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata); #endif /* _LTTNG_KCONSUMER_H */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 192217b4e..2d3d89c7c 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -800,7 +800,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1); if (ret < 0) { goto error; } @@ -835,18 +835,13 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, metadata_stream->tracefile_size_current = 0; } - pthread_mutex_lock(&metadata_channel->metadata_cache->lock); - do { ret = lttng_consumer_read_subbuffer(metadata_stream, ctx); if (ret < 0) { - goto error_unlock; + goto error_stream; } } while (ret > 0); -error_unlock: - pthread_mutex_unlock(&metadata_channel->metadata_cache->lock); - error_stream: /* * Clean up the stream completly because the next snapshot will use a new @@ -1024,7 +1019,7 @@ error: */ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, uint64_t len, struct lttng_consumer_channel *channel, - int timer) + int timer, int wait) { int ret, ret_code = LTTNG_OK; char *metadata_str; @@ -1061,6 +1056,9 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, } pthread_mutex_unlock(&channel->metadata_cache->lock); + if (!wait) { + goto end_free; + } while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { DBG("Waiting for metadata to be flushed"); usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); @@ -1253,10 +1251,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } consumer_timer_switch_start(channel, attr.switch_timer_interval); attr.switch_timer_interval = 0; + } else { + consumer_timer_live_start(channel, + msg.u.ask_channel.live_timer_interval); } - consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval); - /* * Add the channel to the internal state AFTER all streams were created * and successfully sent to session daemon. This way, all streams must @@ -1410,7 +1409,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret = lttng_ustconsumer_recv_metadata(sock, key, offset, - len, channel, 0); + len, channel, 0, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_fatal; @@ -1665,7 +1664,112 @@ error: return ret; } +/* + * Write up to one packet from the metadata cache to the channel. + * + * Returns the number of bytes pushed in the cache, or a negative value + * on error. + */ +static +int commit_one_metadata_packet(struct lttng_consumer_stream *stream) +{ + ssize_t write_len; + int ret; + + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + if (stream->chan->metadata_cache->contiguous + == stream->ust_metadata_pushed) { + ret = 0; + goto end; + } + + write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, + &stream->chan->metadata_cache->data[stream->ust_metadata_pushed], + stream->chan->metadata_cache->contiguous + - stream->ust_metadata_pushed); + assert(write_len != 0); + if (write_len < 0) { + ERR("Writing one metadata packet"); + ret = -1; + goto end; + } + stream->ust_metadata_pushed += write_len; + + assert(stream->chan->metadata_cache->contiguous >= + stream->ust_metadata_pushed); + ret = write_len; + +end: + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); + return ret; +} + +/* + * Sync metadata meaning request them to the session daemon and snapshot to the + * metadata thread can consumer them. + * + * Metadata stream lock MUST be acquired. + * + * Return 0 if new metadatda is available, EAGAIN if the metadata stream + * is empty or a negative value on error. + */ +int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *metadata) +{ + int ret; + int retry = 0; + + assert(ctx); + assert(metadata); + + /* + * Request metadata from the sessiond, but don't wait for the flush + * because we locked the metadata thread. + */ + ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0); + if (ret < 0) { + goto end; + } + + ret = commit_one_metadata_packet(metadata); + if (ret <= 0) { + goto end; + } else if (ret > 0) { + retry = 1; + } + + ustctl_flush_buffer(metadata->ustream, 1); + ret = ustctl_snapshot(metadata->ustream); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Sync metadata, taking UST snapshot"); + goto end; + } + DBG("No new metadata when syncing them."); + /* No new metadata, exit. */ + ret = ENODATA; + goto end; + } + + /* + * After this flush, we still need to extract metadata. + */ + if (retry) { + ret = EAGAIN; + } + +end: + return ret; +} + +/* + * Read subbuffer from the given stream. + * + * Stream lock MUST be acquired. + * + * Return 0 on success else a negative value. + */ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { @@ -1708,25 +1812,10 @@ retry: * already been read. */ if (stream->metadata_flag) { - ssize_t write_len; - - if (stream->chan->metadata_cache->contiguous - == stream->ust_metadata_pushed) { - ret = 0; + ret = commit_one_metadata_packet(stream); + if (ret <= 0) { goto end; } - - write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, - &stream->chan->metadata_cache->data[stream->ust_metadata_pushed], - stream->chan->metadata_cache->contiguous - - stream->ust_metadata_pushed); - assert(write_len != 0); - if (write_len < 0) { - ERR("Writing one metadata packet"); - ret = -1; - goto end; - } - stream->ust_metadata_pushed += write_len; ustctl_flush_buffer(stream->ustream, 1); goto retry; } @@ -1795,6 +1884,16 @@ retry: goto end; } + if (stream->chan->live_timer_interval && !stream->metadata_flag) { + /* + * In live, block until all the metadata is sent. + */ + err = consumer_stream_sync_metadata(ctx, stream->session_id); + if (err < 0) { + goto end; + } + } + assert(!stream->metadata_flag); err = consumer_stream_write_index(stream, &index); if (err < 0) { @@ -1965,7 +2064,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * introduces deadlocks. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, int timer) + struct lttng_consumer_channel *channel, int timer, int wait) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; @@ -2059,7 +2158,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, } ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, - key, offset, len, channel, timer); + key, offset, len, channel, timer, wait); if (ret_code >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index c10cd13a2..bf005c30d 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -55,9 +55,11 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *ht); void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream); int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, uint64_t len, struct lttng_consumer_channel *channel, - int timer); + int timer, int wait); int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, int timer); + struct lttng_consumer_channel *channel, int timer, int wait); +int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *metadata); #else /* HAVE_LIBLTTNG_UST_CTL */ @@ -176,6 +178,12 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, { return -ENOSYS; } +static inline +int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *metadata) +{ + return -ENOSYS; +} #endif /* HAVE_LIBLTTNG_UST_CTL */ #endif /* _LTTNG_USTCONSUMER_H */ -- 2.34.1