return NULL;
}
+static int post_consume(struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer,
+ struct lttng_consumer_local_data *ctx)
+{
+ size_t i;
+ int ret = 0;
+ const size_t count = lttng_dynamic_array_get_count(
+ &stream->read_subbuffer_ops.post_consume_cbs);
+
+ for (i = 0; i < count; i++) {
+ const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
+ &stream->read_subbuffer_ops.post_consume_cbs,
+ i);
+
+ ret = op(stream, subbuffer, ctx);
+ if (ret) {
+ goto end;
+ }
+ }
+end:
+ return ret;
+}
+
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx,
bool locked_by_caller)
goto end;
}
- if (stream->read_subbuffer_ops.post_consume) {
- ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx);
- if (ret) {
- goto end;
- }
+ ret = post_consume(stream, &subbuffer, ctx);
+ if (ret) {
+ goto end;
}
/*
ERR("Stream rotation error after consuming data");
goto end;
}
+
} else if (rotation_ret < 0) {
ret = rotation_ret;
ERR("Failed to check if stream was ready to rotate after consuming data");
return start_pos;
}
-static
-int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
-{
- int ret = 0;
-
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- if (producer_active) {
- ret = kernctl_buffer_flush(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to flush kernel stream");
- goto end;
- }
- } else {
- ret = kernctl_buffer_flush_empty(stream->wait_fd);
- if (ret < 0) {
- /*
- * Doing a buffer flush which does not take into
- * account empty packets. This is not perfect,
- * but required as a fall-back when
- * "flush_empty" is not implemented by
- * lttng-modules.
- */
- ret = kernctl_buffer_flush(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to flush kernel stream");
- goto end;
- }
- }
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_flush_buffer(stream, producer_active);
- break;
- default:
- ERR("Unknown consumer_data type");
- abort();
- }
-
-end:
- return ret;
-}
-
/*
* Sample the rotate position for all the streams of a channel. If a stream
* is already at the rotate position (produced == consumed), we flag it as
* ensures we have at least one packet in each stream per trace
* chunk, even if no data was produced.
*/
- ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
+ ret = consumer_stream_flush_buffer(
+ stream, stream->metadata_flag ? 1 : 0);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
stream->key);
}
stream_count++;
}
+
+ stream->opened_packet_in_current_trace_chunk = false;
+
+ if (rotating_to_new_chunk && !stream->metadata_flag) {
+ /*
+ * Attempt to flush an empty packet as close to the
+ * rotation point as possible. In the event where a
+ * stream remains inactive after the rotation point,
+ * this ensures that the new trace chunk has a
+ * beginning timestamp set at the begining of the
+ * trace chunk instead of only creating an empty
+ * packet when the trace chunk is stopped.
+ *
+ * This indicates to the viewers that the stream
+ * was being recorded, but more importantly it
+ * allows viewers to determine a useable trace
+ * intersection.
+ *
+ * This presents a problem in the case where the
+ * ring-buffer is completely full.
+ *
+ * Consider the following scenario:
+ * - The consumption of data is slow (slow network,
+ * for instance),
+ * - The ring buffer is full,
+ * - A rotation is initiated,
+ * - The flush below does nothing (no space left to
+ * open a new packet),
+ * - The other streams rotate very soon, and new
+ * data is produced in the new chunk,
+ * - This stream completes its rotation long after the
+ * rotation was initiated
+ * - The session is stopped before any event can be
+ * produced in this stream's buffers.
+ *
+ * The resulting trace chunk will have a single packet
+ * temporaly at the end of the trace chunk for this
+ * stream making the stream intersection more narrow
+ * than it should be.
+ *
+ * To work-around this, an empty flush is performed
+ * after the first consumption of a packet during a
+ * rotation if open_packet fails. The idea is that
+ * consuming a packet frees enough space to switch
+ * packets in this scenario and allows the tracer to
+ * "stamp" the beginning of the new trace chunk at the
+ * earliest possible point.
+ */
+ const enum consumer_stream_open_packet_status status =
+ consumer_stream_open_packet(stream);
+
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left
+ * in the buffer. A new packet will be opened
+ * once one has been consumed.
+ */
+ DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end_unlock_stream;
+ default:
+ abort();
+ }
+ }
+
pthread_mutex_unlock(&stream->lock);
}
stream = NULL;
{
int ret;
- ret = consumer_flush_buffer(stream, 1);
+ ret = consumer_stream_flush_buffer(stream, 1);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel clear",
stream->key);
end:
return ret;
}
+
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+ struct lttng_consumer_channel *channel)
+{
+ struct lttng_consumer_stream *stream;
+ enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
+
+ if (channel->metadata_stream) {
+ ERR("Open channel packets command attempted on a metadata channel");
+ ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end;
+ }
+
+ rcu_read_lock();
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ enum consumer_stream_open_packet_status status;
+
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
+ status = consumer_stream_open_packet(stream);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /*
+ * Only unexpected internal errors can lead to this
+ * failing. Report an unknown error.
+ */
+ ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel id = %" PRIu64
+ ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key, channel->key,
+ channel->name, channel->session_id);
+ ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+ goto error_unlock;
+ default:
+ abort();
+ }
+
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ return ret;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ goto end_rcu_unlock;
+}