const unsigned long padding_size =
subbuffer->info.data.padded_subbuf_size -
subbuffer->info.data.subbuf_size;
-
- return lttng_consumer_on_read_subbuffer_mmap(
+ const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap(
stream, &subbuffer->buffer.buffer, padding_size);
+
+ if (stream->net_seq_idx == -1ULL) {
+ /*
+ * When writing on disk, check that only the subbuffer (no
+ * padding) was written to disk.
+ */
+ if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+ DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.padded_subbuf_size);
+ }
+ } else {
+ /*
+ * When streaming over the network, check that the entire
+ * subbuffer including padding was successfully written.
+ */
+ if (written_bytes != subbuffer->info.data.subbuf_size) {
+ DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.subbuf_size);
+ }
+ }
+
+ /*
+ * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass
+ * it along to the caller, else return zero.
+ */
+ if (written_bytes < 0) {
+ ERR("Error reading mmap subbuffer: %zd", written_bytes);
+ }
+
+ return written_bytes;
}
static ssize_t consumer_stream_consume_splice(
struct lttng_consumer_stream *stream,
const struct stream_subbuffer *subbuffer)
{
- return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
- subbuffer->info.data.padded_subbuf_size, 0);
+ const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice(
+ ctx, stream, subbuffer->info.data.padded_subbuf_size, 0);
+
+ if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+ DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.padded_subbuf_size);
+ }
+
+ /*
+ * If `lttng_consumer_on_read_subbuffer_splice()` returned an error,
+ * pass it along to the caller, else return zero.
+ */
+ if (written_bytes < 0) {
+ ERR("Error reading splice subbuffer: %zd", written_bytes);
+ }
+
+ return written_bytes;
}
static int consumer_stream_send_index(
assert(ctx);
/* Ease our life a bit. */
- ht = consumer_data.stream_list_ht;
+ ht = the_consumer_data.stream_list_ht;
rcu_read_lock();
goto end;
}
+ rcu_read_lock();
+
if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
ERR("Failed to acquire trace chunk reference during the creation of a stream");
ret = -1;
goto error;
}
- rcu_read_lock();
stream->chan = channel;
stream->key = stream_key;
stream->trace_chunk = trace_chunk;
assert(stream);
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
ret = munmap(stream->mmap_base, stream->mmap_len);
* that did not add the stream to a (all) hash table. Same goes for the
* next call ht del call.
*/
- (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
+ (void) lttng_ht_del(the_consumer_data.stream_per_chan_id_ht, &iter);
/* Delete from the global stream list. */
iter.iter.node = &stream->node_session_id.node;
/* See the previous ht del on why we ignore the returned value. */
- (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ (void) lttng_ht_del(the_consumer_data.stream_list_ht, &iter);
rcu_read_unlock();
if (!stream->metadata_flag) {
/* Decrement the stream count of the global consumer data. */
- assert(consumer_data.stream_count > 0);
- consumer_data.stream_count--;
+ assert(the_consumer_data.stream_count > 0);
+ the_consumer_data.stream_count--;
}
}
{
assert(stream);
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
* stream thus being globally visible.
*/
if (stream->globally_visible) {
- pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
/* Remove every reference of the stream in the consumer. */
free_chan = unref_channel(stream);
/* Indicates that the consumer data state MUST be updated after this. */
- consumer_data.need_update = 1;
+ the_consumer_data.need_update = 1;
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
} else {
/*
* If the stream is not visible globally, this needs to be done
goto end;
}
stream->out_fd = -1;
- }
+ }
DBG("Opening stream output file \"%s\"", stream_path);
chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
flags, mode, &stream->out_fd, false);
- if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->name);
ret = -1;
goto end;
- }
+ }
if (!stream->metadata_flag && (create_index || stream->index_file)) {
if (stream->index_file) {
{
int ret = 0;
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (producer_active) {
ret = kernctl_buffer_flush(stream->wait_fd);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
+ ret = lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
break;
default:
ERR("Unknown consumer_data type");