X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=37b2d505057c9c470f8892626d211ff634195a39;hb=881fc67f7002469477a5ad00e67a8077db6c0514;hp=8c21d6ae1edd3afe71967d6bf5e5cb32cb998675;hpb=503fefca8a1b82cfafccaa096e900c41bf4570f6;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 8c21d6ae1..37b2d5050 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -152,9 +152,40 @@ static ssize_t consumer_stream_consume_mmap( const unsigned long padding_size = subbuffer->info.data.padded_subbuf_size - subbuffer->info.data.subbuf_size; - - return lttng_consumer_on_read_subbuffer_mmap( + const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuffer->buffer.buffer, padding_size); + + if (stream->net_seq_idx == -1ULL) { + /* + * When writing on disk, check that only the subbuffer (no + * padding) was written to disk. + */ + if (written_bytes != subbuffer->info.data.padded_subbuf_size) { + DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)", + written_bytes, + subbuffer->info.data.padded_subbuf_size); + } + } else { + /* + * When streaming over the network, check that the entire + * subbuffer including padding was successfully written. + */ + if (written_bytes != subbuffer->info.data.subbuf_size) { + DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)", + written_bytes, + subbuffer->info.data.subbuf_size); + } + } + + /* + * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass + * it along to the caller, else return zero. + */ + if (written_bytes < 0) { + ERR("Error reading mmap subbuffer: %zd", written_bytes); + } + + return written_bytes; } static ssize_t consumer_stream_consume_splice( @@ -162,8 +193,24 @@ static ssize_t consumer_stream_consume_splice( struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer) { - return lttng_consumer_on_read_subbuffer_splice(ctx, stream, - subbuffer->info.data.padded_subbuf_size, 0); + const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice( + ctx, stream, subbuffer->info.data.padded_subbuf_size, 0); + + if (written_bytes != subbuffer->info.data.padded_subbuf_size) { + DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)", + written_bytes, + subbuffer->info.data.padded_subbuf_size); + } + + /* + * If `lttng_consumer_on_read_subbuffer_splice()` returned an error, + * pass it along to the caller, else return zero. + */ + if (written_bytes < 0) { + ERR("Error reading splice subbuffer: %zd", written_bytes); + } + + return written_bytes; } static int consumer_stream_send_index( @@ -322,7 +369,7 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, assert(ctx); /* Ease our life a bit. */ - ht = consumer_data.stream_list_ht; + ht = the_consumer_data.stream_list_ht; rcu_read_lock(); @@ -770,7 +817,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) assert(stream); - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: if (stream->mmap_base != NULL) { ret = munmap(stream->mmap_base, stream->mmap_len); @@ -878,19 +925,19 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, * that did not add the stream to a (all) hash table. Same goes for the * next call ht del call. */ - (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter); + (void) lttng_ht_del(the_consumer_data.stream_per_chan_id_ht, &iter); /* Delete from the global stream list. */ iter.iter.node = &stream->node_session_id.node; /* See the previous ht del on why we ignore the returned value. */ - (void) lttng_ht_del(consumer_data.stream_list_ht, &iter); + (void) lttng_ht_del(the_consumer_data.stream_list_ht, &iter); rcu_read_unlock(); if (!stream->metadata_flag) { /* Decrement the stream count of the global consumer data. */ - assert(consumer_data.stream_count > 0); - consumer_data.stream_count--; + assert(the_consumer_data.stream_count > 0); + the_consumer_data.stream_count--; } } @@ -912,7 +959,7 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) { assert(stream); - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; case LTTNG_CONSUMER32_UST: @@ -984,7 +1031,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, * stream thus being globally visible. */ if (stream->globally_visible) { - pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&the_consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); /* Remove every reference of the stream in the consumer. */ @@ -996,11 +1043,11 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, free_chan = unref_channel(stream); /* Indicates that the consumer data state MUST be updated after this. */ - consumer_data.need_update = 1; + the_consumer_data.need_update = 1; pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); - pthread_mutex_unlock(&consumer_data.lock); + pthread_mutex_unlock(&the_consumer_data.lock); } else { /* * If the stream is not visible globally, this needs to be done @@ -1103,16 +1150,16 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, goto end; } stream->out_fd = -1; - } + } DBG("Opening stream output file \"%s\"", stream_path); chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path, flags, mode, &stream->out_fd, false); - if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to open stream file \"%s\"", stream->name); ret = -1; goto end; - } + } if (!stream->metadata_flag && (create_index || stream->index_file)) { if (stream->index_file) { @@ -1242,7 +1289,7 @@ int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, { int ret = 0; - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: if (producer_active) { ret = kernctl_buffer_flush(stream->wait_fd); @@ -1270,7 +1317,7 @@ int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_flush_buffer(stream, (int) producer_active); + ret = lttng_ustconsumer_flush_buffer(stream, (int) producer_active); break; default: ERR("Unknown consumer_data type");