X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=1e4b9c92a8813746a3e988a18b7c49d6c22beb8c;hp=b262b54d56868c27d2704b7cadeb52880c3ec40e;hb=514775d9bca89b3bd072c58e779201682304c57d;hpb=55954e07e828c0ec1c059a50996252a358f7dd23 diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index b262b54d5..1e4b9c92a 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -22,6 +22,7 @@ #include #include #include +#include #include "consumer-stream.h" @@ -69,8 +70,7 @@ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, { int ret = 0; uint64_t sequence_number; - const uint64_t discarded_events = - LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number); + const uint64_t discarded_events = subbuf->info.data.events_discarded; if (!subbuf->info.data.sequence_number.is_set) { /* Command not supported by the tracer. */ @@ -152,9 +152,40 @@ static ssize_t consumer_stream_consume_mmap( const unsigned long padding_size = subbuffer->info.data.padded_subbuf_size - subbuffer->info.data.subbuf_size; - - return lttng_consumer_on_read_subbuffer_mmap( + const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuffer->buffer.buffer, padding_size); + + if (stream->net_seq_idx == -1ULL) { + /* + * When writing on disk, check that only the subbuffer (no + * padding) was written to disk. + */ + if (written_bytes != subbuffer->info.data.padded_subbuf_size) { + DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)", + written_bytes, + subbuffer->info.data.padded_subbuf_size); + } + } else { + /* + * When streaming over the network, check that the entire + * subbuffer including padding was successfully written. + */ + if (written_bytes != subbuffer->info.data.subbuf_size) { + DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)", + written_bytes, + subbuffer->info.data.subbuf_size); + } + } + + /* + * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass + * it along to the caller, else return zero. + */ + if (written_bytes < 0) { + ERR("Error reading mmap subbuffer: %zd", written_bytes); + } + + return written_bytes; } static ssize_t consumer_stream_consume_splice( @@ -162,8 +193,24 @@ static ssize_t consumer_stream_consume_splice( struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer) { - return lttng_consumer_on_read_subbuffer_splice(ctx, stream, - subbuffer->info.data.padded_subbuf_size, 0); + const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice( + ctx, stream, subbuffer->info.data.padded_subbuf_size, 0); + + if (written_bytes != subbuffer->info.data.padded_subbuf_size) { + DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)", + written_bytes, + subbuffer->info.data.padded_subbuf_size); + } + + /* + * If `lttng_consumer_on_read_subbuffer_splice()` returned an error, + * pass it along to the caller, else return zero. + */ + if (written_bytes < 0) { + ERR("Error reading splice subbuffer: %zd", written_bytes); + } + + return written_bytes; } static int consumer_stream_send_index( @@ -197,6 +244,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, struct lttng_consumer_local_data *ctx) { int ret; + enum sync_metadata_status status; assert(metadata); assert(metadata->metadata_flag); @@ -244,7 +292,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, /* * Empty the metadata cache and flush the current stream. */ - ret = lttng_kconsumer_sync_metadata(metadata); + status = lttng_kconsumer_sync_metadata(metadata); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: @@ -252,18 +300,23 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_sync_metadata(ctx, metadata); + status = lttng_ustconsumer_sync_metadata(ctx, metadata); break; default: - assert(0); - ret = -1; - break; + abort(); } - /* - * Error or no new metadata, we exit here. - */ - if (ret <= 0 || ret == ENODATA) { + + switch (status) { + case SYNC_METADATA_STATUS_NEW_DATA: + break; + case SYNC_METADATA_STATUS_NO_DATA: + ret = 0; + goto end_unlock_mutex; + case SYNC_METADATA_STATUS_ERROR: + ret = -1; goto end_unlock_mutex; + default: + abort(); } /* @@ -285,7 +338,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, */ pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock); pthread_mutex_unlock(&metadata->metadata_rdv_lock); - } while (ret == EAGAIN); + } while (status == SYNC_METADATA_STATUS_NEW_DATA); /* Success */ return 0; @@ -404,6 +457,155 @@ end: return 0; } +static +bool stream_is_rotating_to_null_chunk( + const struct lttng_consumer_stream *stream) +{ + bool rotating_to_null_chunk = false; + + if (stream->rotate_position == -1ULL) { + /* No rotation ongoing. */ + goto end; + } + + if (stream->trace_chunk == stream->chan->trace_chunk || + !stream->chan->trace_chunk) { + rotating_to_null_chunk = true; + } +end: + return rotating_to_null_chunk; +} + +enum consumer_stream_open_packet_status consumer_stream_open_packet( + struct lttng_consumer_stream *stream) +{ + int ret; + enum consumer_stream_open_packet_status status; + unsigned long produced_pos_before, produced_pos_after; + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_get_produced_snapshot( + stream, &produced_pos_before); + if (ret < 0) { + ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = consumer_stream_flush_buffer(stream, 0); + if (ret) { + ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after); + if (ret < 0) { + ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + /* + * Determine if the flush had an effect by comparing the produced + * positons before and after the flush. + */ + status = produced_pos_before != produced_pos_after ? + CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED : + CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE; + if (status == CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED) { + stream->opened_packet_in_current_trace_chunk = true; + } + +end: + return status; +} + +/* + * An attempt to open a new packet is performed after a rotation completes to + * get a begin timestamp as close as possible to the rotation point. + * + * However, that initial attempt at opening a packet can fail due to a full + * ring-buffer. In that case, a second attempt is performed after consuming + * a packet since that will have freed enough space in the ring-buffer. + */ +static +int post_consume_open_new_packet(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + if (!stream->opened_packet_in_current_trace_chunk && + stream->trace_chunk && + !stream_is_rotating_to_null_chunk(stream)) { + const enum consumer_stream_open_packet_status status = + consumer_stream_open_packet(stream); + + switch (status) { + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: + /* + * Can't open a packet as there is no space left. + * This means that new events were produced, resulting + * in a packet being opened, which is what we want + * anyhow. + */ + DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: + /* Logged by callee. */ + ret = -1; + goto end; + default: + abort(); + } + + stream->opened_packet_in_current_trace_chunk = true; + } + +end: + return ret; +} + struct lttng_consumer_stream *consumer_stream_create( struct lttng_consumer_channel *channel, uint64_t channel_key, @@ -447,6 +649,8 @@ struct lttng_consumer_stream *consumer_stream_create( stream->index_file = NULL; stream->last_sequence_number = -1ULL; stream->rotate_position = -1ULL; + /* Buffer is created with an open packet. */ + stream->opened_packet_in_current_trace_chunk = true; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -499,6 +703,9 @@ struct lttng_consumer_stream *consumer_stream_create( rcu_read_unlock(); + lttng_dynamic_array_init(&stream->read_subbuffer_ops.post_consume_cbs, + sizeof(post_consume_cb), NULL); + if (type == CONSUMER_CHANNEL_TYPE_METADATA) { stream->read_subbuffer_ops.lock = consumer_stream_metadata_lock_all; @@ -507,18 +714,31 @@ struct lttng_consumer_stream *consumer_stream_create( stream->read_subbuffer_ops.pre_consume_subbuffer = metadata_stream_check_version; } else { + const post_consume_cb post_consume_index_op = channel->is_live ? + consumer_stream_sync_metadata_index : + consumer_stream_send_index; + + ret = lttng_dynamic_array_add_element( + &stream->read_subbuffer_ops.post_consume_cbs, + &post_consume_index_op); + if (ret) { + PERROR("Failed to add `send index` callback to stream's post consumption callbacks"); + goto error; + } + + ret = lttng_dynamic_array_add_element( + &stream->read_subbuffer_ops.post_consume_cbs, + &(post_consume_cb) { post_consume_open_new_packet }); + if (ret) { + PERROR("Failed to add `open new packet` callback to stream's post consumption callbacks"); + goto error; + } + stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all; stream->read_subbuffer_ops.unlock = consumer_stream_data_unlock_all; stream->read_subbuffer_ops.pre_consume_subbuffer = consumer_stream_update_stats; - if (channel->is_live) { - stream->read_subbuffer_ops.post_consume = - consumer_stream_sync_metadata_index; - } else { - stream->read_subbuffer_ops.post_consume = - consumer_stream_send_index; - } } if (channel->output == CONSUMER_CHANNEL_MMAP) { @@ -534,6 +754,7 @@ struct lttng_consumer_stream *consumer_stream_create( error: rcu_read_unlock(); lttng_trace_chunk_put(stream->trace_chunk); + lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs); free(stream); end: if (alloc_ret) { @@ -845,6 +1066,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, /* Free stream within a RCU call. */ lttng_trace_chunk_put(stream->trace_chunk); stream->trace_chunk = NULL; + lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs); consumer_stream_free(stream); } @@ -1061,3 +1283,47 @@ void consumer_stream_metadata_set_version( metadata_bucket_reset(stream->metadata_bucket); } } + +int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, + bool producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + if (producer_active) { + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + } else { + ret = kernctl_buffer_flush_empty(stream->wait_fd); + if (ret < 0) { + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect, + * but required as a fall-back when + * "flush_empty" is not implemented by + * lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + } + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_flush_buffer(stream, (int) producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +}