From 23d565989350c270c68e9a6c8edfbe2dd6a6895d Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 8 May 2020 16:00:11 -0400 Subject: [PATCH] consumerd: move rotation logic to domain-agnostic read path MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The "rotation ready" logic is duplicated in both user space and kernel specializations of the read subbuffer functions. It is moved to the domain-agnostic caller where it is needed only once. This makes it easier to implement a follow-up fix and reduces code duplication. Signed-off-by: Jérémie Galarneau Change-Id: Iae952a2cd52fa458cec956ae219492557e4adf79 --- src/common/consumer/consumer.c | 41 +++++++++++++++++++- src/common/kernel-consumer/kernel-consumer.c | 40 +++---------------- src/common/ust-consumer/ust-consumer.c | 37 ++---------------- 3 files changed, 49 insertions(+), 69 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index e2e7438f6..f13e90a68 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3413,6 +3413,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotation_ret; pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); @@ -3420,6 +3421,19 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_lock(&stream->metadata_rdv_lock); } + /* + * If the stream was flagged to be ready for rotation before we extract + * the next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before consuming data"); + ret = lttng_consumer_rotate_stream(ctx, stream); + if (ret < 0) { + ERR("Stream rotation error before consuming data"); + goto end; + } + } + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: ret = lttng_kconsumer_read_subbuffer(stream, ctx); @@ -3435,13 +3449,38 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, break; } + if (ret < 0) { + goto end; + } + + /* + * After extracting the packet, we check if the stream is now ready to + * be rotated and perform the action immediately. + * + * Don't overwrite `ret` as callers expect the number of bytes + * consumed to be returned on success. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); + if (rotation_ret < 0) { + ret = rotation_ret; + ERR("Stream rotation error after consuming data"); + goto end; + } + } else if (rotation_ret < 0) { + ret = rotation_ret; + ERR("Failed to check if stream was ready to rotate after consuming data"); + goto end; + } + +end: if (stream->metadata_flag) { pthread_cond_broadcast(&stream->metadata_rdv); pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); - return ret; } diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 7032a7f7f..4a2e4e07d 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1590,26 +1590,14 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; + int err, write_index = 1; ssize_t ret = 0; int infd = stream->wait_fd; struct ctf_packet_index index = {}; + bool in_error_state = false; DBG("In read_subbuffer (infd : %d)", infd); - /* - * If the stream was flagged to be ready for rotation before we extract the - * next packet, rotate it now. - */ - if (stream->rotate_ready) { - DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); @@ -1795,11 +1783,13 @@ error_put_subbuf: } ret = err; goto error; + } else if (in_error_state) { + goto error; } /* Write index if needed. */ if (!write_index) { - goto rotate; + goto end; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -1832,25 +1822,7 @@ error_put_subbuf: goto error; } -rotate: - /* - * After extracting the packet, we check if the stream is now ready to be - * rotated and perform the action immediately. - */ - rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); - if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } else if (rotation_ret < 0) { - ERR("Checking if stream is ready to rotate"); - ret = -1; - goto error; - } - +end: error: return ret; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index e5143cd4a..2f09eac80 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2789,7 +2789,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; + int err, write_index = 1; long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -2827,20 +2827,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } } - /* - * If the stream was flagged to be ready for rotation before we extract the - * next packet, rotate it now. - */ - if (stream->rotate_ready) { - DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } - retry: /* Get the next subbuffer */ err = ustctl_get_next_subbuf(ustream); @@ -2952,7 +2938,7 @@ error_put_subbuf: /* Write index if needed. */ if (!write_index) { - goto rotate; + goto end; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -2987,24 +2973,7 @@ error_put_subbuf: goto error; } -rotate: - /* - * After extracting the packet, we check if the stream is now ready to be - * rotated and perform the action immediately. - */ - rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); - if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } else if (rotation_ret < 0) { - ERR("Checking if stream is ready to rotate"); - ret = -1; - goto error; - } +end: error: return ret; } -- 2.34.1