From 04ef1097c8f54a151c899c1773ac56907c97694d Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Mon, 15 Jul 2013 20:53:04 -0400 Subject: [PATCH] consumer: remove timeout for UST metadata Remove time out for UST metadata by generating metadata packets whenever they are about to be read from the ring buffer rather than filling them when the metadata cache is updated, which requires a time out, and therefore may fail. Reviewed-by: Julien Desfossez Signed-off-by: Mathieu Desnoyers --- src/common/consumer-metadata-cache.c | 37 ++++---- src/common/consumer-metadata-cache.h | 4 +- src/common/consumer.c | 20 ++++- src/common/ust-consumer/ust-consumer.c | 115 ++++++++++++++++++------- 4 files changed, 120 insertions(+), 56 deletions(-) diff --git a/src/common/consumer-metadata-cache.c b/src/common/consumer-metadata-cache.c index 4c8a665af..173cac049 100644 --- a/src/common/consumer-metadata-cache.c +++ b/src/common/consumer-metadata-cache.c @@ -105,15 +105,17 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel, } if (cache->max_offset == cache->total_bytes_written) { - offset = cache->rb_pushed; - len = cache->total_bytes_written - cache->rb_pushed; - ret = lttng_ustconsumer_push_metadata(channel, cache->data, offset, - len); - if (ret < 0) { - ERR("Pushing metadata"); - goto end; + char dummy = 'c'; + + cache->contiguous = cache->max_offset; + if (channel->monitor) { + ret = write(channel->metadata_stream->ust_metadata_poll_pipe[1], + &dummy, 1); + if (ret < 1) { + ERR("Wakeup UST metadata pipe"); + goto end; + } } - cache->rb_pushed += len; } end: @@ -177,11 +179,6 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel) DBG("Destroying metadata cache"); - if (channel->metadata_cache->max_offset > - channel->metadata_cache->rb_pushed) { - ERR("Destroying a cache not entirely commited"); - } - pthread_mutex_destroy(&channel->metadata_cache->lock); free(channel->metadata_cache->data); free(channel->metadata_cache); @@ -195,14 +192,12 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel) int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, uint64_t offset) { - int ret; - struct consumer_metadata_cache *cache; + int ret = 0; + struct lttng_consumer_stream *metadata_stream; assert(channel); assert(channel->metadata_cache); - cache = channel->metadata_cache; - /* * XXX This consumer_data.lock should eventually be replaced by * a channel lock. It protects metadata_stream read and endpoint @@ -212,14 +207,16 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->metadata_cache->lock); - if (cache->rb_pushed >= offset) { - ret = 0; - } else if (!channel->metadata_stream) { + metadata_stream = channel->metadata_stream; + + if (!metadata_stream) { /* * Having no metadata stream means the channel is being destroyed so there * is no cache to flush anymore. */ ret = 0; + } else if (metadata_stream->ust_metadata_pushed >= offset) { + ret = 0; } else if (channel->metadata_stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { /* An inactive endpoint means we don't have to flush anymore. */ diff --git a/src/common/consumer-metadata-cache.h b/src/common/consumer-metadata-cache.h index b1a4dc2b7..8f485d639 100644 --- a/src/common/consumer-metadata-cache.h +++ b/src/common/consumer-metadata-cache.h @@ -25,9 +25,9 @@ struct consumer_metadata_cache { char *data; uint64_t cache_alloc_size; /* - * How many bytes from the cache were already sent to the ring buffer. + * How many bytes from the cache are written contiguously. */ - uint64_t rb_pushed; + uint64_t contiguous; /* * How many bytes are written in the buffer (excluding the wholes). */ diff --git a/src/common/consumer.c b/src/common/consumer.c index 94a0cc3ef..a26a41554 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1880,6 +1880,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: + if (stream->monitor) { + /* close the write-side in close_metadata */ + ret = close(stream->ust_metadata_poll_pipe[0]); + if (ret < 0) { + PERROR("Close UST metadata read-side poll pipe"); + } + } lttng_ustconsumer_del_stream(stream); break; default: @@ -2252,14 +2259,21 @@ restart: DBG("Metadata available on fd %d", pollfd); assert(stream->wait_fd == pollfd); - len = ctx->on_buffer_ready(stream, ctx); + do { + len = ctx->on_buffer_ready(stream, ctx); + /* + * We don't check the return value here since if we get + * a negative len, it means an error occured thus we + * simply remove it from the poll set and free the + * stream. + */ + } while (len > 0); + /* It's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean up stream from consumer and free it. */ lttng_poll_del(&events, stream->wait_fd); consumer_del_metadata_stream(stream, metadata_ht); - } else if (len > 0) { - stream->data_read = 1; } } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index c1e987435..6b47ec0c9 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -230,8 +230,18 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, */ while ((ustream = ustctl_create_stream(channel->uchan, cpu))) { int wait_fd; + int ust_metadata_pipe[2]; - wait_fd = ustctl_stream_get_wait_fd(ustream); + if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) { + ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe); + if (ret < 0) { + ERR("Create ust metadata poll pipe"); + goto error; + } + wait_fd = ust_metadata_pipe[0]; + } else { + wait_fd = ustctl_stream_get_wait_fd(ustream); + } /* Allocate consumer stream object. */ stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret); @@ -285,6 +295,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, /* Keep stream reference when creating metadata. */ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) { channel->metadata_stream = stream; + stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0]; + stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1]; } } @@ -656,6 +668,13 @@ static int close_metadata(uint64_t chan_key) ret = LTTCOMM_CONSUMERD_ERROR_METADATA; goto error_unlock; } + if (channel->monitor) { + /* close the read-side in consumer_del_metadata_stream */ + ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]); + if (ret < 0) { + PERROR("Close UST metadata write-side poll pipe"); + } + } } error_unlock: @@ -752,8 +771,6 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, struct lttng_consumer_local_data *ctx) { int ret = 0; - ssize_t write_len; - uint64_t total_len = 0; struct lttng_consumer_channel *metadata_channel; struct lttng_consumer_stream *metadata_stream; @@ -813,30 +830,13 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, } pthread_mutex_lock(&metadata_channel->metadata_cache->lock); - while (total_len < metadata_channel->metadata_cache->total_bytes_written) { - /* - * Write at most one packet of metadata into the channel - * to avoid blocking here. - */ - write_len = ustctl_write_one_packet_to_channel(metadata_channel->uchan, - metadata_channel->metadata_cache->data, - metadata_channel->metadata_cache->total_bytes_written); - if (write_len < 0) { - ERR("UST consumer snapshot writing metadata packet"); - ret = -1; - goto error_unlock; - } - total_len += write_len; - DBG("Written %" PRIu64 " bytes to metadata (left: %" PRIu64 ")", - write_len, - metadata_channel->metadata_cache->total_bytes_written - write_len); - ustctl_flush_buffer(metadata_stream->ustream, 1); + do { ret = lttng_consumer_read_subbuffer(metadata_stream, ctx); if (ret < 0) { goto error_unlock; } - } + } while (ret > 0); error_unlock: pthread_mutex_unlock(&metadata_channel->metadata_cache->lock); @@ -1623,21 +1623,50 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ustream = stream->ustream; /* We can consume the 1 byte written into the wait_fd by UST */ - if (!stream->hangup_flush_done) { + if (stream->monitor && !stream->hangup_flush_done) { ssize_t readlen; do { readlen = read(stream->wait_fd, &dummy, 1); } while (readlen == -1 && errno == EINTR); - if (readlen == -1) { + if (readlen == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { ret = readlen; goto end; } } +retry: /* Get the next subbuffer */ err = ustctl_get_next_subbuf(ustream); if (err != 0) { + /* + * Populate metadata info if the existing info has + * already been read. + */ + if (stream->metadata_flag) { + ssize_t write_len; + + if (stream->chan->metadata_cache->contiguous + == stream->ust_metadata_pushed) { + ret = 0; + goto end; + } + + write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, + &stream->chan->metadata_cache->data[stream->ust_metadata_pushed], + stream->chan->metadata_cache->contiguous + - stream->ust_metadata_pushed); + assert(write_len != 0); + if (write_len < 0) { + ERR("Writing one metadata packet"); + ret = -1; + goto end; + } + stream->ust_metadata_pushed += write_len; + ustctl_flush_buffer(stream->ustream, 1); + goto retry; + } + ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, @@ -1739,13 +1768,37 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) goto end; } - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret == 0) { - /* There is still data so let's put back this subbuffer. */ - ret = ustctl_put_subbuf(stream->ustream); - assert(ret == 0); - ret = 1; /* Data is pending */ - goto end; + if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) { + /* + * We can simply check whether all contiguously available data + * has been pushed to the ring buffer, since the push operation + * is performed within get_next_subbuf(), and because both + * get_next_subbuf() and put_next_subbuf() are issued atomically + * thanks to the stream lock within + * lttng_ustconsumer_read_subbuffer(). This basically means that + * whetnever ust_metadata_pushed is incremented, the associated + * metadata has been consumed from the metadata stream. + */ + DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64, + stream->chan->metadata_cache->contiguous, + stream->ust_metadata_pushed); + if (stream->chan->metadata_cache->contiguous + != stream->ust_metadata_pushed) { + ret = 1; /* Data is pending */ + goto end; + } + } else { + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret == 0) { + /* + * There is still data so let's put back this + * subbuffer. + */ + ret = ustctl_put_subbuf(stream->ustream); + assert(ret == 0); + ret = 1; /* Data is pending */ + goto end; + } } /* Data is NOT pending so ready to be read. */ -- 2.34.1