#include <common/consumer/consumer.h>
#include <common/consumer/consumer-timer.h>
#include <common/consumer/metadata-bucket.h>
+#include <common/kernel-ctl/kernel-ctl.h>
#include "consumer-stream.h"
{
int ret = 0;
uint64_t sequence_number;
- const uint64_t discarded_events =
- LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number);
+ const uint64_t discarded_events = subbuf->info.data.events_discarded;
if (!subbuf->info.data.sequence_number.is_set) {
/* Command not supported by the tracer. */
const unsigned long padding_size =
subbuffer->info.data.padded_subbuf_size -
subbuffer->info.data.subbuf_size;
-
- return lttng_consumer_on_read_subbuffer_mmap(
+ const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap(
stream, &subbuffer->buffer.buffer, padding_size);
+
+ if (stream->net_seq_idx == -1ULL) {
+ /*
+ * When writing on disk, check that only the subbuffer (no
+ * padding) was written to disk.
+ */
+ if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+ DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.padded_subbuf_size);
+ }
+ } else {
+ /*
+ * When streaming over the network, check that the entire
+ * subbuffer including padding was successfully written.
+ */
+ if (written_bytes != subbuffer->info.data.subbuf_size) {
+ DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.subbuf_size);
+ }
+ }
+
+ /*
+ * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass
+ * it along to the caller, else return zero.
+ */
+ if (written_bytes < 0) {
+ ERR("Error reading mmap subbuffer: %zd", written_bytes);
+ }
+
+ return written_bytes;
}
static ssize_t consumer_stream_consume_splice(
struct lttng_consumer_stream *stream,
const struct stream_subbuffer *subbuffer)
{
- return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
- subbuffer->info.data.padded_subbuf_size, 0);
+ const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice(
+ ctx, stream, subbuffer->info.data.padded_subbuf_size, 0);
+
+ if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+ DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.padded_subbuf_size);
+ }
+
+ /*
+ * If `lttng_consumer_on_read_subbuffer_splice()` returned an error,
+ * pass it along to the caller, else return zero.
+ */
+ if (written_bytes < 0) {
+ ERR("Error reading splice subbuffer: %zd", written_bytes);
+ }
+
+ return written_bytes;
}
static int consumer_stream_send_index(
struct lttng_consumer_local_data *ctx)
{
int ret;
+ enum sync_metadata_status status;
assert(metadata);
assert(metadata->metadata_flag);
/*
* Empty the metadata cache and flush the current stream.
*/
- ret = lttng_kconsumer_sync_metadata(metadata);
+ status = lttng_kconsumer_sync_metadata(metadata);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+ status = lttng_ustconsumer_sync_metadata(ctx, metadata);
break;
default:
- assert(0);
- ret = -1;
- break;
+ abort();
}
- /*
- * Error or no new metadata, we exit here.
- */
- if (ret <= 0 || ret == ENODATA) {
+
+ switch (status) {
+ case SYNC_METADATA_STATUS_NEW_DATA:
+ break;
+ case SYNC_METADATA_STATUS_NO_DATA:
+ ret = 0;
goto end_unlock_mutex;
+ case SYNC_METADATA_STATUS_ERROR:
+ ret = -1;
+ goto end_unlock_mutex;
+ default:
+ abort();
}
/*
*/
pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
pthread_mutex_unlock(&metadata->metadata_rdv_lock);
- } while (ret == EAGAIN);
+ } while (status == SYNC_METADATA_STATUS_NEW_DATA);
/* Success */
return 0;
}
DBG("New metadata version detected");
- stream->metadata_version = subbuffer->info.metadata.version;
- stream->reset_metadata_flag = 1;
-
- if (stream->metadata_bucket) {
- metadata_bucket_reset(stream->metadata_bucket);
- }
+ consumer_stream_metadata_set_version(stream,
+ subbuffer->info.metadata.version);
if (stream->read_subbuffer_ops.reset_metadata) {
stream->read_subbuffer_ops.reset_metadata(stream);
return 0;
}
+static
+bool stream_is_rotating_to_null_chunk(
+ const struct lttng_consumer_stream *stream)
+{
+ bool rotating_to_null_chunk = false;
+
+ if (stream->rotate_position == -1ULL) {
+ /* No rotation ongoing. */
+ goto end;
+ }
+
+ if (stream->trace_chunk == stream->chan->trace_chunk ||
+ !stream->chan->trace_chunk) {
+ rotating_to_null_chunk = true;
+ }
+end:
+ return rotating_to_null_chunk;
+}
+
+enum consumer_stream_open_packet_status consumer_stream_open_packet(
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+ enum consumer_stream_open_packet_status status;
+ unsigned long produced_pos_before, produced_pos_after;
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(
+ stream, &produced_pos_before);
+ if (ret < 0) {
+ ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = consumer_stream_flush_buffer(stream, 0);
+ if (ret) {
+ ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
+ if (ret < 0) {
+ ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ /*
+ * Determine if the flush had an effect by comparing the produced
+ * positons before and after the flush.
+ */
+ status = produced_pos_before != produced_pos_after ?
+ CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED :
+ CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE;
+ if (status == CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED) {
+ stream->opened_packet_in_current_trace_chunk = true;
+ }
+
+end:
+ return status;
+}
+
+/*
+ * An attempt to open a new packet is performed after a rotation completes to
+ * get a begin timestamp as close as possible to the rotation point.
+ *
+ * However, that initial attempt at opening a packet can fail due to a full
+ * ring-buffer. In that case, a second attempt is performed after consuming
+ * a packet since that will have freed enough space in the ring-buffer.
+ */
+static
+int post_consume_open_new_packet(struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+
+ if (!stream->opened_packet_in_current_trace_chunk &&
+ stream->trace_chunk &&
+ !stream_is_rotating_to_null_chunk(stream)) {
+ const enum consumer_stream_open_packet_status status =
+ consumer_stream_open_packet(stream);
+
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left.
+ * This means that new events were produced, resulting
+ * in a packet being opened, which is what we want
+ * anyhow.
+ */
+ DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end;
+ default:
+ abort();
+ }
+
+ stream->opened_packet_in_current_trace_chunk = true;
+ }
+
+end:
+ return ret;
+}
+
struct lttng_consumer_stream *consumer_stream_create(
struct lttng_consumer_channel *channel,
uint64_t channel_key,
stream->index_file = NULL;
stream->last_sequence_number = -1ULL;
stream->rotate_position = -1ULL;
+ /* Buffer is created with an open packet. */
+ stream->opened_packet_in_current_trace_chunk = true;
pthread_mutex_init(&stream->lock, NULL);
pthread_mutex_init(&stream->metadata_timer_lock, NULL);
rcu_read_unlock();
+ lttng_dynamic_array_init(&stream->read_subbuffer_ops.post_consume_cbs,
+ sizeof(post_consume_cb), NULL);
+
if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
stream->read_subbuffer_ops.lock =
consumer_stream_metadata_lock_all;
stream->read_subbuffer_ops.pre_consume_subbuffer =
metadata_stream_check_version;
} else {
+ const post_consume_cb post_consume_index_op = channel->is_live ?
+ consumer_stream_sync_metadata_index :
+ consumer_stream_send_index;
+
+ ret = lttng_dynamic_array_add_element(
+ &stream->read_subbuffer_ops.post_consume_cbs,
+ &post_consume_index_op);
+ if (ret) {
+ PERROR("Failed to add `send index` callback to stream's post consumption callbacks");
+ goto error;
+ }
+
+ ret = lttng_dynamic_array_add_element(
+ &stream->read_subbuffer_ops.post_consume_cbs,
+ &(post_consume_cb) { post_consume_open_new_packet });
+ if (ret) {
+ PERROR("Failed to add `open new packet` callback to stream's post consumption callbacks");
+ goto error;
+ }
+
stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
stream->read_subbuffer_ops.unlock =
consumer_stream_data_unlock_all;
stream->read_subbuffer_ops.pre_consume_subbuffer =
consumer_stream_update_stats;
- if (channel->is_live) {
- stream->read_subbuffer_ops.post_consume =
- consumer_stream_sync_metadata_index;
- } else {
- stream->read_subbuffer_ops.post_consume =
- consumer_stream_send_index;
- }
}
if (channel->output == CONSUMER_CHANNEL_MMAP) {
error:
rcu_read_unlock();
lttng_trace_chunk_put(stream->trace_chunk);
+ lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs);
free(stream);
end:
if (alloc_ret) {
/* Free stream within a RCU call. */
lttng_trace_chunk_put(stream->trace_chunk);
stream->trace_chunk = NULL;
+ lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs);
consumer_stream_free(stream);
}
end:
return ret;
}
+
+void consumer_stream_metadata_set_version(
+ struct lttng_consumer_stream *stream, uint64_t new_version)
+{
+ assert(new_version > stream->metadata_version);
+ stream->metadata_version = new_version;
+ stream->reset_metadata_flag = 1;
+
+ if (stream->metadata_bucket) {
+ metadata_bucket_reset(stream->metadata_bucket);
+ }
+}
+
+int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream,
+ bool producer_active)
+{
+ int ret = 0;
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ if (producer_active) {
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+ } else {
+ ret = kernctl_buffer_flush_empty(stream->wait_fd);
+ if (ret < 0) {
+ /*
+ * Doing a buffer flush which does not take into
+ * account empty packets. This is not perfect,
+ * but required as a fall-back when
+ * "flush_empty" is not implemented by
+ * lttng-modules.
+ */
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+ }
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+end:
+ return ret;
+}