#include <sys/stat.h>
#include <stdint.h>
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/compat/fcntl.h>
-#include <common/compat/endian.h>
-#include <common/pipe.h>
-#include <common/relayd/relayd.h>
-#include <common/utils.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/index/index.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/optional.h>
-#include <common/buffer-view.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/metadata-bucket.h>
-
-#include "kernel-consumer.h"
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <common/common.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/compat/endian.hpp>
+#include <common/pipe.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/utils.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/index/index.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/optional.hpp>
+#include <common/buffer-view.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/consumer/metadata-bucket.hpp>
+
+#include "kernel-consumer.hpp"
extern struct lttng_consumer_global_data the_consumer_data;
extern int consumer_poll_timeout;
return ret;
}
+static void finalize_snapshot_stream(
+ struct lttng_consumer_stream *stream, uint64_t relayd_id)
+{
+ ASSERT_LOCKED(stream->lock);
+
+ if (relayd_id == (uint64_t) -1ULL) {
+ if (stream->out_fd >= 0) {
+ const int ret = close(stream->out_fd);
+
+ if (ret < 0) {
+ PERROR("Failed to close stream snapshot output file descriptor");
+ }
+
+ stream->out_fd = -1;
+ }
+ } else {
+ close_relayd_stream(stream);
+ stream->net_seq_idx = (uint64_t) -1ULL;
+ }
+
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
+}
+
/*
* Take a snapshot of all the stream of a channel
* RCU read-side lock must be held across this function to ensure existence of
static int lttng_kconsumer_snapshot_channel(
struct lttng_consumer_channel *channel,
uint64_t key, char *path, uint64_t relayd_id,
- uint64_t nb_packets_per_stream,
- struct lttng_consumer_local_data *ctx)
+ uint64_t nb_packets_per_stream)
{
int ret;
struct lttng_consumer_stream *stream;
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
ERR("sending stream to relayd");
- goto end_unlock;
+ goto error_finalize_stream;
}
} else {
ret = consumer_stream_create_output_files(stream,
false);
if (ret < 0) {
- goto end_unlock;
+ goto error_finalize_stream;
}
DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
stream->key);
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto end_unlock;
+ goto error_finalize_stream;
}
goto end_unlock;
}
ret = lttng_kconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking kernel snapshot");
- goto end_unlock;
+ goto error_finalize_stream;
}
ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced kernel snapshot position");
- goto end_unlock;
+ goto error_finalize_stream;
}
ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd kernel snapshot position");
- goto end_unlock;
+ goto error_finalize_stream;
}
consumed_pos = consumer_get_consume_start_pos(consumed_pos,
if (ret < 0) {
if (ret != -EAGAIN) {
PERROR("kernctl_get_subbuf snapshot");
- goto end_unlock;
+ goto error_finalize_stream;
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf");
- goto end_unlock;
+ goto error_finalize_stream;
}
consumed_pos += stream->max_sb_size;
}
- if (relayd_id == (uint64_t) -1ULL) {
- if (stream->out_fd >= 0) {
- ret = close(stream->out_fd);
- if (ret < 0) {
- PERROR("Kernel consumer snapshot close out_fd");
- goto end_unlock;
- }
- stream->out_fd = -1;
- }
- } else {
- close_relayd_stream(stream);
- stream->net_seq_idx = (uint64_t) -1ULL;
- }
- lttng_trace_chunk_put(stream->trace_chunk);
- stream->trace_chunk = NULL;
+ finalize_snapshot_stream(stream, relayd_id);
pthread_mutex_unlock(&stream->lock);
}
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf error path");
}
+error_finalize_stream:
+ finalize_snapshot_stream(stream, relayd_id);
end_unlock:
pthread_mutex_unlock(&stream->lock);
end:
ret = 0;
error_snapshot:
metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
- cds_list_del(&metadata_stream->send_node);
consumer_stream_destroy(metadata_stream, NULL);
metadata_channel->metadata_stream = NULL;
rcu_read_unlock();
msg.u.snapshot_channel.pathname,
msg.u.snapshot_channel.relayd_id,
msg.u.snapshot_channel
- .nb_packets_per_stream,
- ctx);
+ .nb_packets_per_stream);
if (ret_snapshot < 0) {
ERR("Snapshot channel failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
ret_rotate_channel = lttng_consumer_rotate_channel(
channel, key,
- msg.u.rotate_channel.relayd_id,
- msg.u.rotate_channel.metadata, ctx);
+ msg.u.rotate_channel.relayd_id);
if (ret_rotate_channel < 0) {
ERR("Rotate channel failed");
ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
int ret_rotate;
ret_rotate = lttng_consumer_rotate_ready_streams(
- channel, key, ctx);
+ channel, key);
if (ret_rotate < 0) {
ERR("Rotate ready streams failed");
}
case LTTNG_CONSUMER_INIT:
{
int ret_send_status;
+ lttng_uuid sessiond_uuid;
+
+ std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
+ sessiond_uuid.begin());
ret_code = lttng_consumer_init_command(ctx,
- msg.u.init.sessiond_uuid);
+ sessiond_uuid);
health_code_update();
ret_send_status = consumer_send_status_msg(sock, ret_code);
if (ret_send_status < 0) {
static
int put_next_subbuffer(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+ struct stream_subbuffer *subbuffer __attribute__((unused)))
{
const int ret = kernctl_put_next_subbuf(stream->wait_fd);
static
int signal_metadata(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx __attribute__((unused)))
{
ASSERT_LOCKED(stream->metadata_rdv_lock);
return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;