X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=2cd385545ab357faa7faeead6c770db877dc8e97;hp=4a8b14634d545b5587517caff54eddbd78cbfa48;hb=c1dcb8bb058481eb59317fb442fb9b3cc01a7e35;hpb=b32703d679aa5dd34dbee1ff21e44c3728c01b78 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 4a8b14634..2cd385545 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3253,7 +3253,7 @@ void *consumer_thread_sessiond_poll(void *data) err = 0; /* All is OK */ goto end; } - DBG("received command on sock"); + DBG("Received command on sock"); } /* All is OK */ err = 0; @@ -3383,24 +3383,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, written_bytes = stream->read_subbuffer_ops.consume_subbuffer( ctx, stream, &subbuffer); - /* - * Should write subbuf_size amount of data when network streaming or - * the full padded size when we are not streaming. - */ - if ((written_bytes != subbuffer.info.data.subbuf_size && - stream->net_seq_idx != (uint64_t) -1ULL) || - (written_bytes != subbuffer.info.data.padded_subbuf_size && - stream->net_seq_idx == - (uint64_t) -1ULL)) { - /* - * Display the error but continue processing to try to - * release the subbuffer. This is a DBG statement - * since this can happen without being a critical - * error. - */ - DBG("Failed to write to tracefile (written_bytes: %zd != padded subbuffer size: %lu, subbuffer size: %lu)", - written_bytes, subbuffer.info.data.padded_subbuf_size, - subbuffer.info.data.subbuf_size); + if (written_bytes <= 0) { + ERR("Error consuming subbuffer: (%zd)", written_bytes); + ret = (int) written_bytes; + goto error_put_subbuf; } ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer); @@ -3923,6 +3909,36 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, return start_pos; } +/* Stream lock must be held by the caller. */ +static int sample_stream_positions(struct lttng_consumer_stream *stream, + unsigned long *produced, unsigned long *consumed) +{ + int ret; + + ASSERT_LOCKED(stream->lock); + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to sample snapshot positions"); + goto end; + } + + ret = lttng_consumer_get_produced_snapshot(stream, produced); + if (ret < 0) { + ERR("Failed to sample produced position"); + goto end; + } + + ret = lttng_consumer_get_consumed_snapshot(stream, consumed); + if (ret < 0) { + ERR("Failed to sample consumed position"); + goto end; + } + +end: + return ret; +} + /* * Sample the rotate position for all the streams of a channel. If a stream * is already at the rotate position (produced == consumed), we flag it as @@ -3985,19 +4001,97 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, } /* - * Do not flush an empty packet when rotating from a NULL trace + * Do not flush a packet when rotating from a NULL trace * chunk. The stream has no means to output data, and the prior - * rotation which rotated to NULL performed that side-effect already. + * rotation which rotated to NULL performed that side-effect + * already. No new data can be produced when a stream has no + * associated trace chunk (e.g. a stop followed by a rotate). */ if (stream->trace_chunk) { + bool flush_active; + + if (stream->metadata_flag) { + /* + * Don't produce an empty metadata packet, + * simply close the current one. + * + * Metadata is regenerated on every trace chunk + * switch; there is no concern that no data was + * produced. + */ + flush_active = true; + } else { + /* + * Only flush an empty packet if the "packet + * open" could not be performed on transition + * to a new trace chunk and no packets were + * consumed within the chunk's lifetime. + */ + if (stream->opened_packet_in_current_trace_chunk) { + flush_active = true; + } else { + /* + * Stream could have been full at the + * time of rotation, but then have had + * no activity at all. + * + * It is important to flush a packet + * to prevent 0-length files from being + * produced as most viewers choke on + * them. + * + * Unfortunately viewers will not be + * able to know that tracing was active + * for this stream during this trace + * chunk's lifetime. + */ + ret = sample_stream_positions(stream, &produced_pos, &consumed_pos); + if (ret) { + goto end_unlock_stream; + } + + /* + * Don't flush an empty packet if data + * was produced; it will be consumed + * before the rotation completes. + */ + flush_active = produced_pos != consumed_pos; + if (!flush_active) { + enum lttng_trace_chunk_status chunk_status; + const char *trace_chunk_name; + uint64_t trace_chunk_id; + + chunk_status = lttng_trace_chunk_get_name( + stream->trace_chunk, + &trace_chunk_name, + NULL); + if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) { + trace_chunk_name = "none"; + } + + /* + * Consumer trace chunks are + * never anonymous. + */ + chunk_status = lttng_trace_chunk_get_id( + stream->trace_chunk, + &trace_chunk_id); + assert(chunk_status == + LTTNG_TRACE_CHUNK_STATUS_OK); + + DBG("Unable to open packet for stream during trace chunk's lifetime. " + "Flushing an empty packet to prevent an empty file from being created: " + "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64, + stream->key, trace_chunk_name, trace_chunk_id); + } + } + } + /* - * For metadata stream, do an active flush, which does not - * produce empty packets. For data streams, empty-flush; - * ensures we have at least one packet in each stream per trace - * chunk, even if no data was produced. + * Close the current packet before sampling the + * ring buffer positions. */ - ret = consumer_stream_flush_buffer( - stream, stream->metadata_flag ? 1 : 0); + ret = consumer_stream_flush_buffer(stream, flush_active); if (ret < 0) { ERR("Failed to flush stream %" PRIu64 " during channel rotation", stream->key); @@ -4207,7 +4301,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: /* Logged by callee. */ ret = -1; - goto end_unlock_stream; + goto end_unlock_channel; default: abort(); }