X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=b9290976186ea0fe232cd2e6a447307066463be8;hb=ab1f27f2803120be93a7c148022d5c9c412e2b7b;hp=6525a7203130e7bbf3ce3a97cba489735029a61b;hpb=b770aa7ff007d18a53a3293a396880ad7ce42c8e;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 6525a7203..b92909761 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -564,7 +564,9 @@ void consumer_stream_update_channel_attributes( channel->tracefile_size; } -struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, +struct lttng_consumer_stream *consumer_allocate_stream( + struct lttng_consumer_channel *channel, + uint64_t channel_key, uint64_t stream_key, const char *channel_name, uint64_t relayd_id, @@ -592,6 +594,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, } rcu_read_lock(); + stream->chan = channel; stream->key = stream_key; stream->trace_chunk = trace_chunk; stream->out_fd = -1; @@ -1064,6 +1067,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id_per_pid, unsigned int monitor, unsigned int live_timer_interval, + bool is_in_live_session, const char *root_shm_path, const char *shm_path) { @@ -1095,6 +1099,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_count = tracefile_count; channel->monitor = monitor; channel->live_timer_interval = live_timer_interval; + channel->is_live = is_in_live_session; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); @@ -3408,6 +3413,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotation_ret; pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); @@ -3415,6 +3421,19 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_lock(&stream->metadata_rdv_lock); } + /* + * If the stream was flagged to be ready for rotation before we extract + * the next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before consuming data"); + ret = lttng_consumer_rotate_stream(ctx, stream); + if (ret < 0) { + ERR("Stream rotation error before consuming data"); + goto end; + } + } + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: ret = lttng_kconsumer_read_subbuffer(stream, ctx); @@ -3430,13 +3449,38 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, break; } + if (ret < 0) { + goto end; + } + + /* + * After extracting the packet, we check if the stream is now ready to + * be rotated and perform the action immediately. + * + * Don't overwrite `ret` as callers expect the number of bytes + * consumed to be returned on success. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); + if (rotation_ret < 0) { + ret = rotation_ret; + ERR("Stream rotation error after consuming data"); + goto end; + } + } else if (rotation_ret < 0) { + ret = rotation_ret; + ERR("Failed to check if stream was ready to rotate after consuming data"); + goto end; + } + +end: if (stream->metadata_flag) { pthread_cond_broadcast(&stream->metadata_rdv); pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); - return ret; }