X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=68b6cac6c9e459a3b1aaed8434930e3e1287a62c;hp=1a7de1cfd59cf0a095f80794e0ff96d462386a93;hb=fc4b93fa8aa36b19caad0f8dc4a6a3237fcc36bf;hpb=ff588497b3dfc3138c9ce005e9270ed5568c05df diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 1a7de1cfd..68b6cac6c 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1283,6 +1283,17 @@ error_unlock: return ret; } +static +void metadata_stream_reset_cache_consumed_position( + struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->lock); + + DBG("Reset metadata cache of session %" PRIu64, + stream->chan->session_id); + stream->ust_metadata_pushed = 0; +} + /* * Receive the metadata updates from the sessiond. Supports receiving * overlapping metadata, but is needs to always belong to a contiguous @@ -1297,6 +1308,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; + enum consumer_metadata_cache_write_status cache_write_status; DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); @@ -1320,9 +1332,41 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, health_code_update(); pthread_mutex_lock(&channel->metadata_cache->lock); - ret = consumer_metadata_cache_write(channel, offset, len, version, + cache_write_status = consumer_metadata_cache_write( + channel->metadata_cache, offset, len, version, metadata_str); - if (ret < 0) { + pthread_mutex_unlock(&channel->metadata_cache->lock); + switch (cache_write_status) { + case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE: + /* + * The write entirely overlapped with existing contents of the + * same metadata version (same content); there is nothing to do. + */ + break; + case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED: + /* + * The metadata cache was invalidated (previously pushed + * content has been overwritten). Reset the stream's consumed + * metadata position to ensure the metadata poll thread consumes + * the whole cache. + */ + pthread_mutex_lock(&channel->metadata_stream->lock); + metadata_stream_reset_cache_consumed_position( + channel->metadata_stream); + pthread_mutex_unlock(&channel->metadata_stream->lock); + /* Fall-through. */ + case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT: + /* + * In both cases, the metadata poll thread has new data to + * consume. + */ + ret = consumer_metadata_wakeup_pipe(channel); + if (ret) { + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto end_free; + } + break; + case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR: /* Unable to handle metadata. Notify session daemon. */ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; /* @@ -1330,10 +1374,10 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * not have been updated which could create an infinite loop below when * waiting for the metadata cache to be flushed. */ - pthread_mutex_unlock(&channel->metadata_cache->lock); goto end_free; + default: + abort(); } - pthread_mutex_unlock(&channel->metadata_cache->lock); if (!wait) { goto end_free; @@ -1516,15 +1560,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.u.ask_channel.output) { case LTTNG_EVENT_MMAP: default: - attr.output = LTTNG_UST_MMAP; + attr.output = LTTNG_UST_ABI_MMAP; break; } /* Translate and save channel type. */ switch (msg.u.ask_channel.type) { - case LTTNG_UST_CHAN_PER_CPU: + case LTTNG_UST_ABI_CHAN_PER_CPU: channel->type = CONSUMER_CHANNEL_TYPE_DATA; - attr.type = LTTNG_UST_CHAN_PER_CPU; + attr.type = LTTNG_UST_ABI_CHAN_PER_CPU; /* * Set refcount to 1 for owner. Below, we will * pass ownership to the @@ -1532,9 +1576,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ channel->refcount = 1; break; - case LTTNG_UST_CHAN_METADATA: + case LTTNG_UST_ABI_CHAN_METADATA: channel->type = CONSUMER_CHANNEL_TYPE_METADATA; - attr.type = LTTNG_UST_CHAN_METADATA; + attr.type = LTTNG_UST_ABI_CHAN_METADATA; break; default: assert(0); @@ -1548,7 +1592,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_channel_error; } - if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) { + if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) { ret = consumer_metadata_cache_allocate(channel); if (ret < 0) { ERR("Allocating metadata cache"); @@ -1581,7 +1625,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ ret = add_channel(channel, ctx); if (ret < 0) { - if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) { + if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) { if (channel->switch_timer_enabled == 1) { consumer_timer_switch_stop(channel); } @@ -2465,15 +2509,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) return ustctl_stream_close_wakeup_fd(stream->ustream); } -static -void metadata_stream_reset_cache_consumed_position( - struct lttng_consumer_stream *stream) -{ - DBG("Reset metadata cache of session %" PRIu64, - stream->chan->session_id); - stream->ust_metadata_pushed = 0; -} - /* * Write up to one packet from the metadata cache to the channel. * @@ -2487,8 +2522,8 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->max_offset == - stream->ust_metadata_pushed) { + if (stream->chan->metadata_cache->contents.size == + stream->ust_metadata_pushed) { /* * In the context of a user space metadata channel, a * change in version can be detected in two ways: @@ -2525,9 +2560,9 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) } write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, - &stream->chan->metadata_cache->data[stream->ust_metadata_pushed], - stream->chan->metadata_cache->max_offset - - stream->ust_metadata_pushed); + &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed], + stream->chan->metadata_cache->contents.size - + stream->ust_metadata_pushed); assert(write_len != 0); if (write_len < 0) { ERR("Writing one metadata packet"); @@ -2536,7 +2571,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) } stream->ust_metadata_pushed += write_len; - assert(stream->chan->metadata_cache->max_offset >= + assert(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); ret = write_len; @@ -2904,8 +2939,8 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, } } else { pthread_mutex_lock(&stream->chan->metadata_cache->lock); - cache_empty = stream->chan->metadata_cache->max_offset == - stream->ust_metadata_pushed; + cache_empty = stream->chan->metadata_cache->contents.size == + stream->ust_metadata_pushed; pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } } while (!got_subbuffer); @@ -2966,6 +3001,7 @@ static int put_next_subbuffer(struct lttng_consumer_stream *stream, static int signal_metadata(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { + ASSERT_LOCKED(stream->metadata_rdv_lock); return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } @@ -3051,6 +3087,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); assert(stream->ustream); + ASSERT_LOCKED(stream->lock); DBG("UST consumer checking data pending"); @@ -3063,7 +3100,9 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) uint64_t contiguous, pushed; /* Ease our life a bit. */ - contiguous = stream->chan->metadata_cache->max_offset; + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + contiguous = stream->chan->metadata_cache->contents.size; + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); pushed = stream->ust_metadata_pushed; /*