uint64_t key; /* del */
};
-enum open_packet_status {
- OPEN_PACKET_STATUS_OPENED,
- OPEN_PACKET_STATUS_NO_SPACE,
- OPEN_PACKET_STATUS_ERROR,
-};
-
/* Flag used to temporarily pause data consumption from testpoints. */
int data_consumption_paused;
return NULL;
}
-static
-int consumer_flush_buffer(struct lttng_consumer_stream *stream,
- int producer_active)
+static int post_consume(struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer,
+ struct lttng_consumer_local_data *ctx)
{
+ size_t i;
int ret = 0;
+ const size_t count = lttng_dynamic_array_get_count(
+ &stream->read_subbuffer_ops.post_consume_cbs);
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- if (producer_active) {
- ret = kernctl_buffer_flush(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to flush kernel stream");
- goto end;
- }
- } else {
- ret = kernctl_buffer_flush_empty(stream->wait_fd);
- if (ret < 0) {
- /*
- * Doing a buffer flush which does not take into
- * account empty packets. This is not perfect,
- * but required as a fall-back when
- * "flush_empty" is not implemented by
- * lttng-modules.
- */
- ret = kernctl_buffer_flush(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to flush kernel stream");
- goto end;
- }
- }
+ for (i = 0; i < count; i++) {
+ const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
+ &stream->read_subbuffer_ops.post_consume_cbs,
+ i);
+
+ ret = op(stream, subbuffer, ctx);
+ if (ret) {
+ goto end;
}
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_flush_buffer(stream, producer_active);
- break;
- default:
- ERR("Unknown consumer_data type");
- abort();
}
-
end:
return ret;
}
-static enum open_packet_status open_packet(struct lttng_consumer_stream *stream)
-{
- int ret;
- enum open_packet_status status;
- unsigned long produced_pos_before, produced_pos_after;
-
- ret = lttng_consumer_sample_snapshot_positions(stream);
- if (ret < 0) {
- ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- status = OPEN_PACKET_STATUS_ERROR;
- goto end;
- }
-
- ret = lttng_consumer_get_produced_snapshot(
- stream, &produced_pos_before);
- if (ret < 0) {
- ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- status = OPEN_PACKET_STATUS_ERROR;
- goto end;
- }
-
- ret = consumer_flush_buffer(stream, 0);
- if (ret) {
- ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- status = OPEN_PACKET_STATUS_ERROR;
- goto end;
- }
-
- ret = lttng_consumer_sample_snapshot_positions(stream);
- if (ret < 0) {
- ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- status = OPEN_PACKET_STATUS_ERROR;
- goto end;
- }
-
- ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
- if (ret < 0) {
- ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- status = OPEN_PACKET_STATUS_ERROR;
- goto end;
- }
-
- /*
- * Determine if the flush had an effect by comparing the produced
- * positons before and after the flush.
- */
- status = produced_pos_before != produced_pos_after ?
- OPEN_PACKET_STATUS_OPENED :
- OPEN_PACKET_STATUS_NO_SPACE;
- if (status == OPEN_PACKET_STATUS_OPENED) {
- stream->opened_packet_in_current_trace_chunk = true;
- }
-end:
- return status;
-}
-
-static bool stream_is_rotating_to_null_chunk(
- const struct lttng_consumer_stream *stream)
-{
- bool rotating_to_null_chunk = false;
-
- if (stream->rotate_position == -1ULL) {
- /* No rotation ongoing. */
- goto end;
- }
-
- if (stream->trace_chunk == stream->chan->trace_chunk ||
- !stream->chan->trace_chunk) {
- rotating_to_null_chunk = true;
- }
-end:
- return rotating_to_null_chunk;
-}
-
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx,
bool locked_by_caller)
goto end;
}
- if (stream->read_subbuffer_ops.post_consume) {
- ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx);
- if (ret) {
- goto end;
- }
+ ret = post_consume(stream, &subbuffer, ctx);
+ if (ret) {
+ goto end;
}
/*
ERR("Stream rotation error after consuming data");
goto end;
}
+
} else if (rotation_ret < 0) {
ret = rotation_ret;
ERR("Failed to check if stream was ready to rotate after consuming data");
goto end;
}
- /*
- * TODO roll into a post_consume op as this doesn't apply to metadata
- * streams.
- */
- if (!stream->opened_packet_in_current_trace_chunk &&
- stream->trace_chunk && !stream->metadata_flag &&
- !stream_is_rotating_to_null_chunk(stream)) {
- const enum open_packet_status status = open_packet(stream);
-
- switch (status) {
- case OPEN_PACKET_STATUS_OPENED:
- DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- break;
- case OPEN_PACKET_STATUS_NO_SPACE:
- /*
- * Can't open a packet as there is no space left.
- * This means that new events were produced, resulting
- * in a packet being opened, which is what we wanted
- * anyhow.
- */
- DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- stream->opened_packet_in_current_trace_chunk = true;
- break;
- case OPEN_PACKET_STATUS_ERROR:
- /* Logged by callee. */
- ret = -1;
- goto end;
- default:
- abort();
- }
- }
-
sleep_stream:
if (stream->read_subbuffer_ops.on_sleep) {
stream->read_subbuffer_ops.on_sleep(stream, ctx);
const bool is_local_trace = relayd_id == -1ULL;
struct consumer_relayd_sock_pair *relayd = NULL;
bool rotating_to_new_chunk = true;
+ /* Array of `struct lttng_consumer_stream *` */
+ struct lttng_dynamic_pointer_array streams_packet_to_open;
+ size_t stream_idx;
DBG("Consumer sample rotate position for channel %" PRIu64, key);
lttng_dynamic_array_init(&stream_rotation_positions,
sizeof(struct relayd_stream_rotation_position), NULL);
+ lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
rcu_read_lock();
* ensures we have at least one packet in each stream per trace
* chunk, even if no data was produced.
*/
- ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
+ ret = consumer_stream_flush_buffer(
+ stream, stream->metadata_flag ? 1 : 0);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
stream->key);
* packets in this scenario and allows the tracer to
* "stamp" the beginning of the new trace chunk at the
* earliest possible point.
+ *
+ * The packet open is performed after the channel
+ * rotation to ensure that no attempt to open a packet
+ * is performed in a stream that has no active trace
+ * chunk.
*/
- const enum open_packet_status status =
- open_packet(stream);
-
- switch (status) {
- case OPEN_PACKET_STATUS_OPENED:
- DBG("Opened a packet after a rotation: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- break;
- case OPEN_PACKET_STATUS_NO_SPACE:
- /*
- * Can't open a packet as there is no space left
- * in the buffer. A new packet will be opened
- * once one has been consumed.
- */
- DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key, stream->chan->name,
- stream->chan->session_id);
- break;
- case OPEN_PACKET_STATUS_ERROR:
- /* Logged by callee. */
+ ret = lttng_dynamic_pointer_array_add_pointer(
+ &streams_packet_to_open, stream);
+ if (ret) {
+ PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
ret = -1;
goto end_unlock_stream;
- default:
- abort();
}
}
pthread_mutex_unlock(&stream->lock);
}
stream = NULL;
- pthread_mutex_unlock(&channel->lock);
- if (is_local_trace) {
- ret = 0;
- goto end;
- }
+ if (!is_local_trace) {
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end_unlock_channel;
+ }
- relayd = consumer_find_relayd(relayd_id);
- if (!relayd) {
- ERR("Failed to find relayd %" PRIu64, relayd_id);
- ret = -1;
- goto end;
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer
+ .data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end_unlock_channel;
+ }
}
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
- rotating_to_new_chunk ? &next_chunk_id : NULL,
- (const struct relayd_stream_rotation_position *)
- stream_rotation_positions.buffer.data);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
- relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
- goto end;
+ for (stream_idx = 0;
+ stream_idx < lttng_dynamic_pointer_array_get_count(
+ &streams_packet_to_open);
+ stream_idx++) {
+ enum consumer_stream_open_packet_status status;
+
+ stream = lttng_dynamic_pointer_array_get_pointer(
+ &streams_packet_to_open, stream_idx);
+
+ pthread_mutex_lock(&stream->lock);
+ status = consumer_stream_open_packet(stream);
+ pthread_mutex_unlock(&stream->lock);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left
+ * in the buffer. A new packet will be opened
+ * once one has been consumed.
+ */
+ DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end_unlock_stream;
+ default:
+ abort();
+ }
}
+ pthread_mutex_unlock(&channel->lock);
ret = 0;
goto end;
end:
rcu_read_unlock();
lttng_dynamic_array_reset(&stream_rotation_positions);
+ lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
return ret;
}
{
int ret;
- ret = consumer_flush_buffer(stream, 1);
+ ret = consumer_stream_flush_buffer(stream, 1);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel clear",
stream->key);
rcu_read_lock();
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
- enum open_packet_status status;
+ enum consumer_stream_open_packet_status status;
pthread_mutex_lock(&stream->lock);
if (cds_lfht_is_node_deleted(&stream->node.node)) {
goto next;
}
- status = open_packet(stream);
+ status = consumer_stream_open_packet(stream);
switch (status) {
- case OPEN_PACKET_STATUS_OPENED:
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
", channel name = %s, session id = %" PRIu64,
stream->key, stream->chan->name,
stream->chan->session_id);
stream->opened_packet_in_current_trace_chunk = true;
break;
- case OPEN_PACKET_STATUS_NO_SPACE:
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
", channel name = %s, session id = %" PRIu64,
stream->key, stream->chan->name,
stream->chan->session_id);
break;
- case OPEN_PACKET_STATUS_ERROR:
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
/*
* Only unexpected internal errors can lead to this
* failing. Report an unknown error.