X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;fp=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=6505490cdc04e212c54aadd3d8eb58d569eb523e;hp=311c64020327cdcfdb1f55a925c7b2cc90b60729;hb=f96af312b7d09d834f9d13408654ada93320ae2a;hpb=577eea73132dee4da47752590ed535206678eb34 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 311c64020..6505490cd 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -63,6 +63,12 @@ struct consumer_channel_msg { uint64_t key; /* del */ }; +enum open_packet_status { + OPEN_PACKET_STATUS_OPENED, + OPEN_PACKET_STATUS_NO_SPACE, + OPEN_PACKET_STATUS_ERROR, +}; + /* Flag used to temporarily pause data consumption from testpoints. */ int data_consumption_paused; @@ -3310,6 +3316,137 @@ error_testpoint: return NULL; } +static +int consumer_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + if (producer_active) { + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + } else { + ret = kernctl_buffer_flush_empty(stream->wait_fd); + if (ret < 0) { + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect, + * but required as a fall-back when + * "flush_empty" is not implemented by + * lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + } + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_flush_buffer(stream, producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} + +static enum open_packet_status open_packet(struct lttng_consumer_stream *stream) +{ + int ret; + enum open_packet_status status; + unsigned long produced_pos_before, produced_pos_after; + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_get_produced_snapshot( + stream, &produced_pos_before); + if (ret < 0) { + ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = consumer_flush_buffer(stream, 0); + if (ret) { + ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after); + if (ret < 0) { + ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + /* + * Determine if the flush had an effect by comparing the produced + * positons before and after the flush. + */ + status = produced_pos_before != produced_pos_after ? + OPEN_PACKET_STATUS_OPENED : + OPEN_PACKET_STATUS_NO_SPACE; +end: + return status; +} + +static bool stream_is_rotating_to_null_chunk( + const struct lttng_consumer_stream *stream) +{ + bool rotating_to_null_chunk = false; + + if (stream->rotate_position == -1ULL) { + /* No rotation ongoing. */ + goto end; + } + + if (stream->trace_chunk == stream->chan->trace_chunk || + !stream->chan->trace_chunk) { + rotating_to_null_chunk = true; + } +end: + return rotating_to_null_chunk; +} + ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx, bool locked_by_caller) @@ -3413,6 +3550,48 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + /* + * TODO roll into a post_consume op as this doesn't apply to metadata + * streams. + */ + if (!stream->opened_packet_in_current_trace_chunk && + stream->trace_chunk && !stream->metadata_flag && + !stream_is_rotating_to_null_chunk(stream)) { + const enum open_packet_status status = open_packet(stream); + + switch (status) { + case OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = + true; + break; + case OPEN_PACKET_STATUS_NO_SPACE: + /* + * Can't open a packet as there is no space left. + * This means that new events were produced, resulting + * in a packet being opened, which is what we want + * anyhow. + */ + DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case OPEN_PACKET_STATUS_ERROR: + /* Logged by callee. */ + ret = -1; + goto end; + default: + abort(); + } + + stream->opened_packet_in_current_trace_chunk = true; + } + sleep_stream: if (stream->read_subbuffer_ops.on_sleep) { stream->read_subbuffer_ops.on_sleep(stream, ctx); @@ -3901,50 +4080,6 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, return start_pos; } -static -int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) -{ - int ret = 0; - - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - if (producer_active) { - ret = kernctl_buffer_flush(stream->wait_fd); - if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto end; - } - } else { - ret = kernctl_buffer_flush_empty(stream->wait_fd); - if (ret < 0) { - /* - * Doing a buffer flush which does not take into - * account empty packets. This is not perfect, - * but required as a fall-back when - * "flush_empty" is not implemented by - * lttng-modules. - */ - ret = kernctl_buffer_flush(stream->wait_fd); - if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto end; - } - } - } - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_flush_buffer(stream, producer_active); - break; - default: - ERR("Unknown consumer_data type"); - abort(); - } - -end: - return ret; -} - /* * Sample the rotate position for all the streams of a channel. If a stream * is already at the rotate position (produced == consumed), we flag it as @@ -4102,6 +4237,86 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, } stream_count++; } + + stream->opened_packet_in_current_trace_chunk = false; + + if (rotating_to_new_chunk && !stream->metadata_flag) { + /* + * Attempt to flush an empty packet as close to the + * rotation point as possible. In the event where a + * stream remains inactive after the rotation point, + * this ensures that the new trace chunk has a + * beginning timestamp set at the begining of the + * trace chunk instead of only creating an empty + * packet when the trace chunk is stopped. + * + * This indicates to the viewers that the stream + * was being recorded, but more importantly it + * allows viewers to determine a useable trace + * intersection. + * + * This presents a problem in the case where the + * ring-buffer is completely full. + * + * Consider the following scenario: + * - The consumption of data is slow (slow network, + * for instance), + * - The ring buffer is full, + * - A rotation is initiated, + * - The flush below does nothing (no space left to + * open a new packet), + * - The other streams rotate very soon, and new + * data is produced in the new chunk, + * - This stream completes its rotation long after the + * rotation was initiated + * - The session is stopped before any event can be + * produced in this stream's buffers. + * + * The resulting trace chunk will have a single packet + * temporaly at the end of the trace chunk for this + * stream making the stream intersection more narrow + * than it should be. + * + * To work-around this, an empty flush is performed + * after the first consumption of a packet during a + * rotation if open_packet fails. The idea is that + * consuming a packet frees enough space to switch + * packets in this scenario and allows the tracer to + * "stamp" the beginning of the new trace chunk at the + * earliest possible point. + */ + const enum open_packet_status status = + open_packet(stream); + + switch (status) { + case OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet after a rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = + true; + break; + case OPEN_PACKET_STATUS_NO_SPACE: + /* + * Can't open a packet as there is no space left + * in the buffer. A new packet will be opened + * once one has been consumed. + */ + DBG("No space left to open a packet after a rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + break; + case OPEN_PACKET_STATUS_ERROR: + /* Logged by callee. */ + ret = -1; + goto end_unlock_stream; + default: + abort(); + } + } + pthread_mutex_unlock(&stream->lock); } stream = NULL;