return ret;
}
-static void finalize_snapshot_stream(
- struct lttng_consumer_stream *stream, uint64_t relayd_id)
-{
- ASSERT_LOCKED(stream->lock);
-
- if (relayd_id == (uint64_t) -1ULL) {
- if (stream->out_fd >= 0) {
- const int ret = close(stream->out_fd);
-
- if (ret < 0) {
- PERROR("Failed to close stream snapshot output file descriptor");
- }
-
- stream->out_fd = -1;
- }
- } else {
- close_relayd_stream(stream);
- stream->net_seq_idx = (uint64_t) -1ULL;
- }
-
- lttng_trace_chunk_put(stream->trace_chunk);
- stream->trace_chunk = NULL;
-}
-
/*
* Take a snapshot of all the stream of a channel
* RCU read-side lock must be held across this function to ensure existence of
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
ERR("sending stream to relayd");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
} else {
ret = consumer_stream_create_output_files(stream,
false);
if (ret < 0) {
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
stream->key);
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
goto end_unlock;
}
ret = lttng_kconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking kernel snapshot");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced kernel snapshot position");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd kernel snapshot position");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
consumed_pos = consumer_get_consume_start_pos(consumed_pos,
if (ret < 0) {
if (ret != -EAGAIN) {
PERROR("kernctl_get_subbuf snapshot");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
consumed_pos += stream->max_sb_size;
}
- finalize_snapshot_stream(stream, relayd_id);
+ consumer_stream_close_output(stream);
pthread_mutex_unlock(&stream->lock);
}
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf error path");
}
-error_finalize_stream:
- finalize_snapshot_stream(stream, relayd_id);
+error_close_stream_output:
+ consumer_stream_close_output(stream);
end_unlock:
pthread_mutex_unlock(&stream->lock);
end: