From c585821bc78955b3d747fcd733aa1d2b81a3258e Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 19 Aug 2015 14:44:59 -0700 Subject: [PATCH] Fix: sessiond vs consumerd push/get metadata deadlock MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit We need to unlock the registry while we push metadata to break a circular dependency between the consumerd metadata lock and the sessiond registry lock. Indeed, pushing metadata to the consumerd awaits that it gets pushed all the way to relayd, but doing so requires grabbing the metadata lock. If a concurrent metadata request is being performed by consumerd, this can try to grab the registry lock on the sessiond while holding the metadata lock on the consumer daemon. Those push and pull schemes are performed on two different bidirectionnal communication sockets. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- src/bin/lttng-sessiond/ust-app.c | 53 ++++++-- src/common/consumer-metadata-cache.c | 11 +- src/common/consumer-metadata-cache.h | 10 +- src/common/consumer-timer.c | 126 ++++++++++++++----- src/common/consumer-timer.h | 3 + src/common/consumer.c | 1 + src/common/consumer.h | 15 +++ src/common/kernel-consumer/kernel-consumer.c | 15 +++ src/common/ust-consumer/ust-consumer.c | 50 +++++++- 9 files changed, 221 insertions(+), 63 deletions(-) diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 96ba2f4bc..fc4b7085e 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -440,17 +440,20 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry, { int ret; char *metadata_str = NULL; - size_t len, offset; + size_t len, offset, new_metadata_len_sent; ssize_t ret_val; + uint64_t metadata_key; assert(registry); assert(socket); + metadata_key = registry->metadata_key; + /* * Means that no metadata was assigned to the session. This can * happens if no start has been done previously. */ - if (!registry->metadata_key) { + if (!metadata_key) { return 0; } @@ -468,6 +471,7 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry, offset = registry->metadata_len_sent; len = registry->metadata_len - registry->metadata_len_sent; + new_metadata_len_sent = registry->metadata_len; if (len == 0) { DBG3("No metadata to push for metadata key %" PRIu64, registry->metadata_key); @@ -486,13 +490,26 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry, ret_val = -ENOMEM; goto error; } - /* Copy what we haven't send out. */ + /* Copy what we haven't sent out. */ memcpy(metadata_str, registry->metadata + offset, len); - registry->metadata_len_sent += len; push_data: - ret = consumer_push_metadata(socket, registry->metadata_key, + pthread_mutex_unlock(®istry->lock); + /* + * We need to unlock the registry while we push metadata to + * break a circular dependency between the consumerd metadata + * lock and the sessiond registry lock. Indeed, pushing metadata + * to the consumerd awaits that it gets pushed all the way to + * relayd, but doing so requires grabbing the metadata lock. If + * a concurrent metadata request is being performed by + * consumerd, this can try to grab the registry lock on the + * sessiond while holding the metadata lock on the consumer + * daemon. Those push and pull schemes are performed on two + * different bidirectionnal communication sockets. + */ + ret = consumer_push_metadata(socket, metadata_key, metadata_str, len, offset); + pthread_mutex_lock(®istry->lock); if (ret < 0) { /* * There is an acceptable race here between the registry @@ -510,17 +527,29 @@ push_data: */ if (ret == -LTTCOMM_CONSUMERD_CHANNEL_FAIL) { ret = 0; + } else { + ERR("Error pushing metadata to consumer"); } - - /* - * Update back the actual metadata len sent since it - * failed here. - */ - registry->metadata_len_sent -= len; ret_val = ret; goto error_push; + } else { + /* + * Metadata may have been concurrently pushed, since + * we're not holding the registry lock while pushing to + * consumer. This is handled by the fact that we send + * the metadata content, size, and the offset at which + * that metadata belongs. This may arrive out of order + * on the consumer side, and the consumer is able to + * deal with overlapping fragments. The consumer + * supports overlapping fragments, which must be + * contiguous starting from offset 0. We keep the + * largest metadata_len_sent value of the concurrent + * send. + */ + registry->metadata_len_sent = + max_t(size_t, registry->metadata_len_sent, + new_metadata_len_sent); } - free(metadata_str); return len; diff --git a/src/common/consumer-metadata-cache.c b/src/common/consumer-metadata-cache.c index 9cd99e5bf..677469231 100644 --- a/src/common/consumer-metadata-cache.c +++ b/src/common/consumer-metadata-cache.c @@ -73,8 +73,8 @@ end: /* * Write metadata to the cache, extend the cache if necessary. We support - * non-contiguous updates but not overlapping ones. If there is contiguous - * metadata in the cache, we send it to the ring buffer. The metadata cache + * overlapping updates, but they need to be contiguous. Send the + * contiguous metadata in cache to the ring buffer. The metadata cache * lock MUST be acquired to write in the cache. * * Return 0 on success, a negative value on error. @@ -102,15 +102,10 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel, } memcpy(cache->data + offset, data, len); - cache->total_bytes_written += len; if (offset + len > cache->max_offset) { - cache->max_offset = offset + len; - } - - if (cache->max_offset == cache->total_bytes_written) { char dummy = 'c'; - cache->contiguous = cache->max_offset; + cache->max_offset = offset + len; if (channel->monitor) { size_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], &dummy, 1); diff --git a/src/common/consumer-metadata-cache.h b/src/common/consumer-metadata-cache.h index aaf9f24d2..e7aba4ac9 100644 --- a/src/common/consumer-metadata-cache.h +++ b/src/common/consumer-metadata-cache.h @@ -24,20 +24,12 @@ struct consumer_metadata_cache { char *data; uint64_t cache_alloc_size; - /* - * How many bytes from the cache are written contiguously. - */ - uint64_t contiguous; - /* - * How many bytes are written in the buffer (excluding the wholes). - */ - uint64_t total_bytes_written; /* * The upper-limit of data written inside the buffer. * * With the total_bytes_written it allows us to keep track of when the * cache contains contiguous metadata ready to be sent to the RB. - * The metadata cache updates must not overlap. + * All cached data is contiguous. */ uint64_t max_offset; /* diff --git a/src/common/consumer-timer.c b/src/common/consumer-timer.c index 646d32342..8bf3ae80c 100644 --- a/src/common/consumer-timer.c +++ b/src/common/consumer-timer.c @@ -133,78 +133,103 @@ error: return ret; } -static int check_kernel_stream(struct lttng_consumer_stream *stream) +int consumer_flush_kernel_index(struct lttng_consumer_stream *stream) { uint64_t ts, stream_id; int ret; - /* - * While holding the stream mutex, try to take a snapshot, if it - * succeeds, it means that data is ready to be sent, just let the data - * thread handle that. Otherwise, if the snapshot returns EAGAIN, it - * means that there is no data to read after the flush, so we can - * safely send the empty index. - */ - pthread_mutex_lock(&stream->lock); ret = kernctl_get_current_timestamp(stream->wait_fd, &ts); if (ret < 0) { ERR("Failed to get the current timestamp"); - goto error_unlock; + goto end; } ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); - goto error_unlock; + goto end; } ret = kernctl_snapshot(stream->wait_fd); if (ret < 0) { if (errno != EAGAIN && errno != ENODATA) { PERROR("live timer kernel snapshot"); ret = -1; - goto error_unlock; + goto end; } ret = kernctl_get_stream_id(stream->wait_fd, &stream_id); if (ret < 0) { PERROR("kernctl_get_stream_id"); - goto error_unlock; + goto end; } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); ret = send_empty_index(stream, ts, stream_id); if (ret < 0) { - goto error_unlock; + goto end; } } ret = 0; - -error_unlock: - pthread_mutex_unlock(&stream->lock); +end: return ret; } -static int check_ust_stream(struct lttng_consumer_stream *stream) +static int check_kernel_stream(struct lttng_consumer_stream *stream) { - uint64_t ts, stream_id; int ret; - assert(stream); - assert(stream->ustream); /* * While holding the stream mutex, try to take a snapshot, if it * succeeds, it means that data is ready to be sent, just let the data * thread handle that. Otherwise, if the snapshot returns EAGAIN, it * means that there is no data to read after the flush, so we can * safely send the empty index. + * + * Doing a trylock and checking if waiting on metadata if + * trylock fails. Bail out of the stream is indeed waiting for + * metadata to be pushed. Busy wait on trylock otherwise. */ - pthread_mutex_lock(&stream->lock); + for (;;) { + ret = pthread_mutex_trylock(&stream->lock); + switch (ret) { + case 0: + break; /* We have the lock. */ + case EBUSY: + pthread_mutex_lock(&stream->metadata_timer_lock); + if (stream->waiting_on_metadata) { + ret = 0; + stream->missed_metadata_flush = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + goto end; /* Bail out. */ + } + pthread_mutex_unlock(&stream->metadata_timer_lock); + /* Try again. */ + caa_cpu_relax(); + continue; + default: + ERR("Unexpected pthread_mutex_trylock error %d", ret); + ret = -1; + goto end; + } + break; + } + ret = consumer_flush_kernel_index(stream); + pthread_mutex_unlock(&stream->lock); +end: + return ret; +} + +int consumer_flush_ust_index(struct lttng_consumer_stream *stream) +{ + uint64_t ts, stream_id; + int ret; + ret = cds_lfht_is_node_deleted(&stream->node.node); if (ret) { - goto error_unlock; + goto end; } ret = lttng_ustconsumer_get_current_timestamp(stream, &ts); if (ret < 0) { ERR("Failed to get the current timestamp"); - goto error_unlock; + goto end; } lttng_ustconsumer_flush_buffer(stream, 1); ret = lttng_ustconsumer_take_snapshot(stream); @@ -212,23 +237,68 @@ static int check_ust_stream(struct lttng_consumer_stream *stream) if (ret != -EAGAIN) { ERR("Taking UST snapshot"); ret = -1; - goto error_unlock; + goto end; } ret = lttng_ustconsumer_get_stream_id(stream, &stream_id); if (ret < 0) { PERROR("ustctl_get_stream_id"); - goto error_unlock; + goto end; } DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); ret = send_empty_index(stream, ts, stream_id); if (ret < 0) { - goto error_unlock; + goto end; } } ret = 0; +end: + return ret; +} -error_unlock: +static int check_ust_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + assert(stream->ustream); + /* + * While holding the stream mutex, try to take a snapshot, if it + * succeeds, it means that data is ready to be sent, just let the data + * thread handle that. Otherwise, if the snapshot returns EAGAIN, it + * means that there is no data to read after the flush, so we can + * safely send the empty index. + * + * Doing a trylock and checking if waiting on metadata if + * trylock fails. Bail out of the stream is indeed waiting for + * metadata to be pushed. Busy wait on trylock otherwise. + */ + for (;;) { + ret = pthread_mutex_trylock(&stream->lock); + switch (ret) { + case 0: + break; /* We have the lock. */ + case EBUSY: + pthread_mutex_lock(&stream->metadata_timer_lock); + if (stream->waiting_on_metadata) { + ret = 0; + stream->missed_metadata_flush = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + goto end; /* Bail out. */ + } + pthread_mutex_unlock(&stream->metadata_timer_lock); + /* Try again. */ + caa_cpu_relax(); + continue; + default: + ERR("Unexpected pthread_mutex_trylock error %d", ret); + ret = -1; + goto end; + } + break; + } + ret = consumer_flush_ust_index(stream); pthread_mutex_unlock(&stream->lock); +end: return ret; } diff --git a/src/common/consumer-timer.h b/src/common/consumer-timer.h index baaa82b04..22e74574c 100644 --- a/src/common/consumer-timer.h +++ b/src/common/consumer-timer.h @@ -52,4 +52,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel); void *consumer_timer_thread(void *data); int consumer_signal_init(void); +int consumer_flush_kernel_index(struct lttng_consumer_stream *stream); +int consumer_flush_ust_index(struct lttng_consumer_stream *stream); + #endif /* CONSUMER_TIMER_H */ diff --git a/src/common/consumer.c b/src/common/consumer.c index effa5f86f..526fbbf35 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -563,6 +563,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_fd = -1; pthread_mutex_init(&stream->lock, NULL); + pthread_mutex_init(&stream->metadata_timer_lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ if (type == CONSUMER_CHANNEL_TYPE_METADATA) { diff --git a/src/common/consumer.h b/src/common/consumer.h index 509e24e01..ac3b4903f 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -242,6 +242,21 @@ struct lttng_consumer_stream { int shm_fd_is_copy; int data_read; int hangup_flush_done; + + /* + * metadata_timer_lock protects flags waiting_on_metadata and + * missed_metadata_flush. + */ + pthread_mutex_t metadata_timer_lock; + /* + * Flag set when awaiting metadata to be pushed. Used in the + * timer thread to skip waiting on the stream (and stream lock) to + * ensure we can proceed to flushing metadata in live mode. + */ + bool waiting_on_metadata; + /* Raised when a timer misses a metadata flush. */ + bool missed_metadata_flush; + enum lttng_event_output output; /* Maximum subbuffer size. */ unsigned long max_sb_size; diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index e30d21b1a..aae56f90c 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1219,7 +1219,22 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* * In live, block until all the metadata is sent. */ + pthread_mutex_lock(&stream->metadata_timer_lock); + assert(!stream->missed_metadata_flush); + stream->waiting_on_metadata = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + err = consumer_stream_sync_metadata(ctx, stream->session_id); + + pthread_mutex_lock(&stream->metadata_timer_lock); + stream->waiting_on_metadata = false; + if (stream->missed_metadata_flush) { + stream->missed_metadata_flush = false; + pthread_mutex_unlock(&stream->metadata_timer_lock); + (void) consumer_flush_kernel_index(stream); + } else { + pthread_mutex_unlock(&stream->metadata_timer_lock); + } if (err < 0) { goto end; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 819817d14..7dfcf9a3c 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1167,7 +1167,12 @@ error: } /* - * Receive the metadata updates from the sessiond. + * Receive the metadata updates from the sessiond. Supports receiving + * overlapping metadata, but is needs to always belong to a contiguous + * range starting from 0. + * Be careful about the locks held when calling this function: it needs + * the metadata cache flush to concurrently progress in order to + * complete. */ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, uint64_t len, struct lttng_consumer_channel *channel, @@ -1581,6 +1586,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); + if (!len) { + /* + * There is nothing to receive. We have simply + * checked whether the channel can be found. + */ + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + goto end_msg_sessiond; + } + /* Tell session daemon we are ready to receive the metadata. */ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0) { @@ -1942,7 +1956,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->contiguous + if (stream->chan->metadata_cache->max_offset == stream->ust_metadata_pushed) { ret = 0; goto end; @@ -1950,7 +1964,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, &stream->chan->metadata_cache->data[stream->ust_metadata_pushed], - stream->chan->metadata_cache->contiguous + stream->chan->metadata_cache->max_offset - stream->ust_metadata_pushed); assert(write_len != 0); if (write_len < 0) { @@ -1960,7 +1974,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) } stream->ust_metadata_pushed += write_len; - assert(stream->chan->metadata_cache->contiguous >= + assert(stream->chan->metadata_cache->max_offset >= stream->ust_metadata_pushed); ret = write_len; @@ -1974,7 +1988,9 @@ end: * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. * - * Metadata stream lock MUST be acquired. + * Metadata stream lock is held here, but we need to release it when + * interacting with sessiond, else we cause a deadlock with live + * awaiting on metadata to be pushed out. * * Return 0 if new metadatda is available, EAGAIN if the metadata stream * is empty or a negative value on error. @@ -1988,6 +2004,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, assert(ctx); assert(metadata); + pthread_mutex_unlock(&metadata->lock); /* * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. @@ -1996,6 +2013,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, if (ret < 0) { goto end; } + pthread_mutex_lock(&metadata->lock); ret = commit_one_metadata_packet(metadata); if (ret <= 0) { @@ -2222,7 +2240,23 @@ retry: /* * In live, block until all the metadata is sent. */ + pthread_mutex_lock(&stream->metadata_timer_lock); + assert(!stream->missed_metadata_flush); + stream->waiting_on_metadata = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + err = consumer_stream_sync_metadata(ctx, stream->session_id); + + pthread_mutex_lock(&stream->metadata_timer_lock); + stream->waiting_on_metadata = false; + if (stream->missed_metadata_flush) { + stream->missed_metadata_flush = false; + pthread_mutex_unlock(&stream->metadata_timer_lock); + (void) consumer_flush_ust_index(stream); + } else { + pthread_mutex_unlock(&stream->metadata_timer_lock); + } + if (err < 0) { goto end; } @@ -2303,7 +2337,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) uint64_t contiguous, pushed; /* Ease our life a bit. */ - contiguous = stream->chan->metadata_cache->contiguous; + contiguous = stream->chan->metadata_cache->max_offset; pushed = stream->ust_metadata_pushed; /* @@ -2432,6 +2466,10 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * function or any of its callees. Timers have a very strict locking * semantic with respect to teardown. Failure to respect this semantic * introduces deadlocks. + * + * DON'T hold the metadata lock when calling this function, else this + * can cause deadlock involving consumer awaiting for metadata to be + * pushed out due to concurrent interaction with the session daemon. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, int timer, int wait) -- 2.34.1