#include <inttypes.h>
#include <urcu/futex.h>
#include <urcu/uatomic.h>
+#include <urcu/rculist.h>
#include <unistd.h>
#include <fcntl.h>
+#include <strings.h>
#include <lttng/lttng.h>
#include <common/common.h>
#include <common/config/session-config.h>
#include <common/dynamic-buffer.h>
#include <common/buffer-view.h>
-#include <urcu/rculist.h>
+#include <common/string-utils/format.h>
+#include "version.h"
#include "cmd.h"
#include "ctf-trace.h"
#include "index.h"
#include "connection.h"
#include "tracefile-array.h"
#include "tcp_keep_alive.h"
+#include "sessiond-trace-chunks.h"
static const char *help_msg =
#ifdef LTTNG_EMBED_HELP
enum relay_connection_status {
RELAY_CONNECTION_STATUS_OK,
- /* An error occured while processing an event on the connection. */
+ /* An error occurred while processing an event on the connection. */
RELAY_CONNECTION_STATUS_ERROR,
/* Connection closed/shutdown cleanly. */
RELAY_CONNECTION_STATUS_CLOSED,
/* command line options */
char *opt_output_path;
-static int opt_daemon, opt_background;
+static int opt_daemon, opt_background, opt_print_version;
/*
* We need to wait for listener and live listener threads, as well as
/* Size of receive buffer. */
#define RECV_DATA_BUFFER_SIZE 65536
-#define FILE_COPY_BUFFER_SIZE 65536
static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */
static pid_t child_ppid; /* Internal parent PID use with daemonize. */
/* Relayd health monitoring */
struct health_app *health_relayd;
+struct sessiond_trace_chunk_registry *sessiond_trace_chunk_registry;
+
static struct option long_options[] = {
{ "control-port", 1, 0, 'C', },
{ "data-port", 1, 0, 'D', },
static const char *config_ignore_options[] = { "help", "config", "version" };
+static void print_version(void) {
+ fprintf(stdout, "%s\n", VERSION);
+}
+
+static void relayd_config_log(void)
+{
+ DBG("LTTng-relayd " VERSION " - " VERSION_NAME "%s%s",
+ GIT_VERSION[0] == '\0' ? "" : " - " GIT_VERSION,
+ EXTRA_VERSION_NAME[0] == '\0' ? "" : " - " EXTRA_VERSION_NAME);
+ if (EXTRA_VERSION_DESCRIPTION[0] != '\0') {
+ DBG("LTTng-relayd extra version description:\n\t" EXTRA_VERSION_DESCRIPTION "\n");
+ }
+ if (EXTRA_VERSION_PATCHES[0] != '\0') {
+ DBG("LTTng-relayd extra patches:\n\t" EXTRA_VERSION_PATCHES "\n");
+ }
+}
+
/*
* Take an option from the getopt output and set it in the right variable to be
* used later.
}
exit(EXIT_FAILURE);
case 'V':
- fprintf(stdout, "%s\n", VERSION);
- exit(EXIT_SUCCESS);
+ opt_print_version = 1;
+ break;
case 'o':
if (lttng_is_setuid_setgid()) {
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /*
- * No activity for this FD (poll
- * implementation).
- */
- continue;
- }
-
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
return NULL;
}
-/*
- * Set index data from the control port to a given index object.
- */
-static int set_index_control_data(struct relay_index *index,
- struct lttcomm_relayd_index *data,
- struct relay_connection *conn)
-{
- struct ctf_packet_index index_data;
-
- /*
- * The index on disk is encoded in big endian.
- */
- index_data.packet_size = htobe64(data->packet_size);
- index_data.content_size = htobe64(data->content_size);
- index_data.timestamp_begin = htobe64(data->timestamp_begin);
- index_data.timestamp_end = htobe64(data->timestamp_end);
- index_data.events_discarded = htobe64(data->events_discarded);
- index_data.stream_id = htobe64(data->stream_id);
-
- if (conn->minor >= 8) {
- index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
- index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
- }
-
- return relay_index_set_data(index, &index_data);
-}
-
static bool session_streams_have_index(const struct relay_session *session)
{
return session->minor >= 4 && !session->snapshot;
{
int ret = 0;
ssize_t send_ret;
- struct relay_session *session;
- struct lttcomm_relayd_status_session reply;
- char session_name[LTTNG_NAME_MAX];
- char hostname[LTTNG_HOST_NAME_MAX];
+ struct relay_session *session = NULL;
+ struct lttcomm_relayd_create_session_reply_2_11 reply = {};
+ char session_name[LTTNG_NAME_MAX] = {};
+ char hostname[LTTNG_HOST_NAME_MAX] = {};
uint32_t live_timer = 0;
bool snapshot = false;
+ bool session_name_contains_creation_timestamp = false;
+ /* Left nil for peers < 2.11. */
+ char base_path[LTTNG_PATH_MAX] = {};
+ lttng_uuid sessiond_uuid = {};
+ LTTNG_OPTIONAL(uint64_t) id_sessiond = {};
+ LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
+ LTTNG_OPTIONAL(time_t) creation_time = {};
+ struct lttng_dynamic_buffer reply_payload;
- memset(session_name, 0, LTTNG_NAME_MAX);
- memset(hostname, 0, LTTNG_HOST_NAME_MAX);
-
- memset(&reply, 0, sizeof(reply));
+ lttng_dynamic_buffer_init(&reply_payload);
if (conn->minor < 4) {
/* From 2.1 to 2.3 */
ret = cmd_create_session_2_4(payload, session_name,
hostname, &live_timer, &snapshot);
} else {
+ bool has_current_chunk;
+ uint64_t current_chunk_id_value;
+ time_t creation_time_value;
+ uint64_t id_sessiond_value;
+
/* From 2.11 to ... */
- ret = cmd_create_session_2_11(payload, session_name,
- hostname, &live_timer, &snapshot);
+ ret = cmd_create_session_2_11(payload, session_name, hostname,
+ base_path, &live_timer, &snapshot, &id_sessiond_value,
+ sessiond_uuid, &has_current_chunk,
+ ¤t_chunk_id_value, &creation_time_value,
+ &session_name_contains_creation_timestamp);
+ if (lttng_uuid_is_nil(sessiond_uuid)) {
+ /* The nil UUID is reserved for pre-2.11 clients. */
+ ERR("Illegal nil UUID announced by peer in create session command");
+ ret = -1;
+ goto send_reply;
+ }
+ LTTNG_OPTIONAL_SET(&id_sessiond, id_sessiond_value);
+ LTTNG_OPTIONAL_SET(&creation_time, creation_time_value);
+ if (has_current_chunk) {
+ LTTNG_OPTIONAL_SET(¤t_chunk_id,
+ current_chunk_id_value);
+ }
}
if (ret < 0) {
goto send_reply;
}
- session = session_create(session_name, hostname, live_timer,
- snapshot, conn->major, conn->minor);
+ session = session_create(session_name, hostname, base_path, live_timer,
+ snapshot, sessiond_uuid,
+ id_sessiond.is_set ? &id_sessiond.value : NULL,
+ current_chunk_id.is_set ? ¤t_chunk_id.value : NULL,
+ creation_time.is_set ? &creation_time.value : NULL,
+ conn->major, conn->minor,
+ session_name_contains_creation_timestamp);
if (!session) {
ret = -1;
goto send_reply;
conn->session = session;
DBG("Created session %" PRIu64, session->id);
- reply.session_id = htobe64(session->id);
+ reply.generic.session_id = htobe64(session->id);
send_reply:
if (ret < 0) {
- reply.ret_code = htobe32(LTTNG_ERR_FATAL);
+ reply.generic.ret_code = htobe32(LTTNG_ERR_FATAL);
} else {
- reply.ret_code = htobe32(LTTNG_OK);
+ reply.generic.ret_code = htobe32(LTTNG_OK);
}
- send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
- if (send_ret < (ssize_t) sizeof(reply)) {
- ERR("Failed to send \"create session\" command reply (ret = %zd)",
- send_ret);
- ret = -1;
+ if (conn->minor < 11) {
+ /* From 2.1 to 2.10 */
+ ret = lttng_dynamic_buffer_append(&reply_payload,
+ &reply.generic, sizeof(reply.generic));
+ if (ret) {
+ ERR("Failed to append \"create session\" command reply header to payload buffer");
+ ret = -1;
+ goto end;
+ }
+ } else {
+ const uint32_t output_path_length =
+ session ? strlen(session->output_path) + 1 : 0;
+
+ reply.output_path_length = htobe32(output_path_length);
+ ret = lttng_dynamic_buffer_append(
+ &reply_payload, &reply, sizeof(reply));
+ if (ret) {
+ ERR("Failed to append \"create session\" command reply header to payload buffer");
+ goto end;
+ }
+
+ if (output_path_length) {
+ ret = lttng_dynamic_buffer_append(&reply_payload,
+ session->output_path,
+ output_path_length);
+ if (ret) {
+ ERR("Failed to append \"create session\" command reply path to payload buffer");
+ goto end;
+ }
+ }
}
+ send_ret = conn->sock->ops->sendmsg(conn->sock, reply_payload.data,
+ reply_payload.size, 0);
+ if (send_ret < (ssize_t) reply_payload.size) {
+ ERR("Failed to send \"create session\" command reply of %zu bytes (ret = %zd)",
+ reply_payload.size, send_ret);
+ ret = -1;
+ }
+end:
+ if (ret < 0 && session) {
+ session_put(session);
+ }
+ lttng_dynamic_buffer_reset(&reply_payload);
return ret;
}
pthread_mutex_unlock(&session->lock);
}
+static int conform_channel_path(char *channel_path)
+{
+ int ret = 0;
+
+ if (strstr("../", channel_path)) {
+ ERR("Refusing channel path as it walks up the path hierarchy: \"%s\"",
+ channel_path);
+ ret = -1;
+ goto end;
+ }
+
+ if (*channel_path == '/') {
+ const size_t len = strlen(channel_path);
+
+ /*
+ * Channel paths from peers prior to 2.11 are expressed as an
+ * absolute path that is, in reality, relative to the relay
+ * daemon's output directory. Remove the leading slash so it
+ * is correctly interpreted as a relative path later on.
+ *
+ * len (and not len - 1) is used to copy the trailing NULL.
+ */
+ bcopy(channel_path + 1, channel_path, len);
+ }
+end:
+ return ret;
+}
+
/*
* relay_add_stream: allocate a new stream for a session
*/
uint64_t stream_handle = -1ULL;
char *path_name = NULL, *channel_name = NULL;
uint64_t tracefile_size = 0, tracefile_count = 0;
- struct relay_stream_chunk_id stream_chunk_id = { 0 };
+ LTTNG_OPTIONAL(uint64_t) stream_chunk_id = {};
if (!session || !conn->version_check_done) {
ERR("Trying to add a stream before version check");
goto send_reply;
}
+ if (conform_channel_path(path_name)) {
+ goto send_reply;
+ }
+
trace = ctf_trace_get_by_path_or_create(session, path_name);
if (!trace) {
goto send_reply;
/* We pass ownership of path_name and channel_name. */
stream = stream_create(trace, stream_handle, path_name,
- channel_name, tracefile_size, tracefile_count,
- &stream_chunk_id);
+ channel_name, tracefile_size, tracefile_count);
path_name = NULL;
channel_name = NULL;
goto end_unlock;
}
- ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
- 0, 0, -1, -1, stream->stream_fd->fd, NULL,
- &stream->stream_fd->fd);
+ ret = stream_reset_file(stream);
if (ret < 0) {
- ERR("Failed to rotate metadata file %s of channel %s",
- stream->path_name, stream->channel_name);
+ ERR("Failed to reset metadata stream %" PRIu64
+ ": stream_path = %s, channel = %s",
+ stream->stream_handle, stream->path_name,
+ stream->channel_name);
goto end_unlock;
}
-
end_unlock:
pthread_mutex_unlock(&stream->lock);
stream_put(stream);
return ret;
}
-/*
- * Append padding to the file pointed by the file descriptor fd.
- */
-static int write_padding_to_file(int fd, uint32_t size)
-{
- ssize_t ret = 0;
- char *zeros;
-
- if (size == 0) {
- goto end;
- }
-
- zeros = zmalloc(size);
- if (zeros == NULL) {
- PERROR("zmalloc zeros for padding");
- ret = -1;
- goto end;
- }
-
- ret = lttng_write(fd, zeros, size);
- if (ret < size) {
- PERROR("write padding to file");
- }
-
- free(zeros);
-
-end:
- return ret;
-}
-
-/*
- * Close the current index file if it is open, and create a new one.
- *
- * Return 0 on success, -1 on error.
- */
-static
-int create_rotate_index_file(struct relay_stream *stream)
-{
- int ret;
- uint32_t major, minor;
-
- /* Put ref on previous index_file. */
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = NULL;
- }
- major = stream->trace->session->major;
- minor = stream->trace->session->minor;
- stream->index_file = lttng_index_file_create(stream->path_name,
- stream->channel_name,
- -1, -1, stream->tracefile_size,
- tracefile_array_get_file_index_head(stream->tfa),
- lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor));
- if (!stream->index_file) {
- ret = -1;
- goto end;
- }
-
- ret = 0;
-
-end:
- return ret;
-}
-
-static
-int do_rotate_stream(struct relay_stream *stream)
-{
- int ret;
-
- /* Perform the stream rotation. */
- ret = utils_rotate_stream_file(stream->path_name,
- stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- NULL, &stream->stream_fd->fd);
- if (ret < 0) {
- ERR("Rotating stream output file");
- goto end;
- }
- stream->tracefile_size_current = 0;
-
- /* Rotate also the index if the stream is not a metadata stream. */
- if (!stream->is_metadata) {
- ret = create_rotate_index_file(stream);
- if (ret < 0) {
- ERR("Failed to rotate index file");
- goto end;
- }
- }
-
- stream->rotate_at_seq_num = -1ULL;
- stream->pos_after_last_complete_data_index = 0;
-
-end:
- return ret;
-}
-
-/*
- * If too much data has been written in a tracefile before we received the
- * rotation command, we have to move the excess data to the new tracefile and
- * perform the rotation. This can happen because the control and data
- * connections are separate, the indexes as well as the commands arrive from
- * the control connection and we have no control over the order so we could be
- * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives. We don't need
- * to update the index because its order is guaranteed with the rotation
- * command message.
- */
-static
-int rotate_truncate_stream(struct relay_stream *stream)
-{
- int ret, new_fd;
- off_t lseek_ret;
- uint64_t diff, pos = 0;
- char buf[FILE_COPY_BUFFER_SIZE];
-
- assert(!stream->is_metadata);
-
- assert(stream->tracefile_size_current >
- stream->pos_after_last_complete_data_index);
- diff = stream->tracefile_size_current -
- stream->pos_after_last_complete_data_index;
-
- /* Create the new tracefile. */
- new_fd = utils_create_stream_file(stream->path_name,
- stream->channel_name,
- stream->tracefile_size, stream->tracefile_count,
- /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
- if (new_fd < 0) {
- ERR("Failed to create new stream file at path %s for channel %s",
- stream->path_name, stream->channel_name);
- ret = -1;
- goto end;
- }
-
- /*
- * Rewind the current tracefile to the position at which the rotation
- * should have occured.
- */
- lseek_ret = lseek(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index, SEEK_SET);
- if (lseek_ret < 0) {
- PERROR("seek truncate stream");
- ret = -1;
- goto end;
- }
-
- /* Move data from the old file to the new file. */
- while (pos < diff) {
- uint64_t count, bytes_left;
- ssize_t io_ret;
-
- bytes_left = diff - pos;
- count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
- assert(count <= SIZE_MAX);
-
- io_ret = lttng_read(stream->stream_fd->fd, buf, count);
- if (io_ret < (ssize_t) count) {
- char error_string[256];
-
- snprintf(error_string, sizeof(error_string),
- "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, stream->stream_fd->fd, io_ret);
- if (io_ret == -1) {
- PERROR("%s", error_string);
- } else {
- ERR("%s", error_string);
- }
- ret = -1;
- goto end;
- }
-
- io_ret = lttng_write(new_fd, buf, count);
- if (io_ret < (ssize_t) count) {
- char error_string[256];
-
- snprintf(error_string, sizeof(error_string),
- "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, new_fd, io_ret);
- if (io_ret == -1) {
- PERROR("%s", error_string);
- } else {
- ERR("%s", error_string);
- }
- ret = -1;
- goto end;
- }
-
- pos += count;
- }
-
- /* Truncate the file to get rid of the excess data. */
- ret = ftruncate(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index);
- if (ret) {
- PERROR("ftruncate");
- goto end;
- }
-
- ret = close(stream->stream_fd->fd);
- if (ret < 0) {
- PERROR("Closing tracefile");
- goto end;
- }
-
- ret = create_rotate_index_file(stream);
- if (ret < 0) {
- ERR("Rotate stream index file");
- goto end;
- }
-
- /*
- * Update the offset and FD of all the eventual indexes created by the
- * data connection before the rotation command arrived.
- */
- ret = relay_index_switch_all_files(stream);
- if (ret < 0) {
- ERR("Failed to rotate index file");
- goto end;
- }
-
- stream->stream_fd->fd = new_fd;
- stream->tracefile_size_current = diff;
- stream->pos_after_last_complete_data_index = 0;
- stream->rotate_at_seq_num = -1ULL;
-
- ret = 0;
-
-end:
- return ret;
-}
-
-/*
- * Check if a stream should perform a rotation (for session rotation).
- * Must be called with the stream lock held.
- *
- * Return 0 on success, a negative value on error.
- */
-static
-int try_rotate_stream(struct relay_stream *stream)
-{
- int ret = 0;
- uint64_t trace_seq;
-
- /* No rotation expected. */
- if (stream->rotate_at_seq_num == -1ULL) {
- goto end;
- }
-
- trace_seq = min(stream->prev_data_seq, stream->prev_index_seq);
- if (stream->prev_data_seq == -1ULL || stream->prev_index_seq == -1ULL ||
- trace_seq < stream->rotate_at_seq_num) {
- DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_data_seq,
- stream->prev_index_seq);
- goto end;
- } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
- /*
- * prev_data_seq is checked here since indexes and rotation
- * commands are serialized with respect to each other.
- */
- DBG("Rotation after too much data has been written in tracefile "
- "for stream %" PRIu64 ", need to truncate before "
- "rotating", stream->stream_handle);
- ret = rotate_truncate_stream(stream);
- if (ret) {
- ERR("Failed to truncate stream");
- goto end;
- }
- } else {
- if (trace_seq != stream->rotate_at_seq_num) {
- /*
- * Unexpected, protocol error/bug.
- * It could mean that we received a rotation position
- * that is in the past.
- */
- ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_data_seq,
- stream->prev_index_seq);
- ret = -1;
- goto end;
- }
- DBG("Stream %" PRIu64 " ready for rotation",
- stream->stream_handle);
- ret = do_rotate_stream(stream);
- }
-
-end:
- return ret;
-}
-
/*
* relay_recv_metadata: receive the metadata for the session.
*/
const struct lttng_buffer_view *payload)
{
int ret = 0;
- ssize_t size_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_metadata_payload metadata_payload_header;
struct relay_stream *metadata_stream;
uint64_t metadata_payload_size;
+ struct lttng_buffer_view packet_view;
if (!session) {
ERR("Metadata sent before version check");
goto end;
}
- pthread_mutex_lock(&metadata_stream->lock);
-
- size_ret = lttng_write(metadata_stream->stream_fd->fd,
- payload->data + sizeof(metadata_payload_header),
- metadata_payload_size);
- if (size_ret < metadata_payload_size) {
- ERR("Relay error writing metadata on file");
+ packet_view = lttng_buffer_view_from_view(payload,
+ sizeof(metadata_payload_header), metadata_payload_size);
+ if (!packet_view.data) {
+ ERR("Invalid metadata packet length announced by header");
ret = -1;
goto end_put;
}
- size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ pthread_mutex_lock(&metadata_stream->lock);
+ ret = stream_write(metadata_stream, &packet_view,
metadata_payload_header.padding_size);
- if (size_ret < (int64_t) metadata_payload_header.padding_size) {
+ pthread_mutex_unlock(&metadata_stream->lock);
+ if (ret){
ret = -1;
goto end_put;
}
-
- metadata_stream->metadata_received +=
- metadata_payload_size + metadata_payload_header.padding_size;
- DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
- metadata_stream->metadata_received);
-
- ret = try_rotate_stream(metadata_stream);
- if (ret < 0) {
- goto end_put;
- }
-
end_put:
- pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
end:
return ret;
ssize_t send_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_index index_info;
- struct relay_index *index;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
size_t msg_len;
ret = -1;
goto end;
}
- pthread_mutex_lock(&stream->lock);
-
- /* Live beacon handling */
- if (index_info.packet_size == 0) {
- DBG("Received live beacon for stream %" PRIu64,
- stream->stream_handle);
- /*
- * Only flag a stream inactive when it has already
- * received data and no indexes are in flight.
- */
- if (stream->index_received_seqcount > 0
- && stream->indexes_in_flight == 0) {
- stream->beacon_ts_end = index_info.timestamp_end;
- }
- ret = 0;
+ pthread_mutex_lock(&stream->lock);
+ ret = stream_add_index(stream, &index_info);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret) {
goto end_stream_put;
- } else {
- stream->beacon_ts_end = -1ULL;
}
- if (stream->ctf_stream_id == -1ULL) {
- stream->ctf_stream_id = index_info.stream_id;
+end_stream_put:
+ stream_put(stream);
+end:
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
}
- index = relay_index_get_by_id_or_create(stream, index_info.net_seq_num);
- if (!index) {
- ret = -1;
- ERR("relay_index_get_by_id_or_create index NULL");
- goto end_stream_put;
- }
- if (set_index_control_data(index, &index_info, conn)) {
- ERR("set_index_control_data error");
- relay_index_put(index);
- ret = -1;
- goto end_stream_put;
- }
- ret = relay_index_try_flush(index);
- if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
- stream->index_received_seqcount++;
- stream->pos_after_last_complete_data_index += index->total_size;
- stream->prev_index_seq = index_info.net_seq_num;
- } else if (ret > 0) {
- /* no flush. */
- ret = 0;
- } else {
- /*
- * ret < 0
- *
- * relay_index_try_flush is responsible for the self-reference
- * put of the index object on error.
- */
- ERR("relay_index_try_flush error %d", ret);
- ret = -1;
- }
-
-end_stream_put:
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
-
-end:
-
- memset(&reply, 0, sizeof(reply));
- if (ret < 0) {
- reply.ret_code = htobe32(LTTNG_ERR_UNK);
- } else {
- reply.ret_code = htobe32(LTTNG_OK);
- }
- send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
- if (send_ret < (ssize_t) sizeof(reply)) {
- ERR("Failed to send \"recv index\" command reply (ret = %zd)", send_ret);
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"recv index\" command reply (ret = %zd)", send_ret);
ret = -1;
}
}
/*
- * relay_rotate_session_stream: rotate a stream to a new tracefile for the session
- * rotation feature (not the tracefile rotation feature).
+ * relay_rotate_session_stream: rotate a stream to a new tracefile for the
+ * session rotation feature (not the tracefile rotation feature).
*/
-static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_rotate_session_streams(
+ const struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn,
const struct lttng_buffer_view *payload)
{
- int ret;
+ int ret = 0;
+ uint32_t i;
ssize_t send_ret;
+ enum lttng_error_code reply_code = LTTNG_ERR_UNK;
struct relay_session *session = conn->session;
- struct lttcomm_relayd_rotate_stream stream_info;
- struct lttcomm_relayd_generic_reply reply;
- struct relay_stream *stream;
- size_t header_len;
- size_t path_len;
- struct lttng_buffer_view new_path_view;
-
- DBG("Rotate stream received");
+ struct lttcomm_relayd_rotate_streams rotate_streams;
+ struct lttcomm_relayd_generic_reply reply = {};
+ struct relay_stream *stream = NULL;
+ const size_t header_len = sizeof(struct lttcomm_relayd_rotate_streams);
+ struct lttng_trace_chunk *next_trace_chunk = NULL;
+ struct lttng_buffer_view stream_positions;
+ char chunk_id_buf[MAX_INT_DEC_LEN(uint64_t)];
+ const char *chunk_id_str = "none";
if (!session || !conn->version_check_done) {
ERR("Trying to rotate a stream before version check");
goto end_no_reply;
}
- header_len = sizeof(struct lttcomm_relayd_rotate_stream);
-
if (payload->size < header_len) {
ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes",
header_len, payload->size);
goto end_no_reply;
}
- memcpy(&stream_info, payload->data, header_len);
+ memcpy(&rotate_streams, payload->data, header_len);
- /* Convert to host */
- stream_info.pathname_length = be32toh(stream_info.pathname_length);
- stream_info.stream_id = be64toh(stream_info.stream_id);
- stream_info.new_chunk_id = be64toh(stream_info.new_chunk_id);
- stream_info.rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+ /* Convert header to host endianness. */
+ rotate_streams = (typeof(rotate_streams)) {
+ .stream_count = be32toh(rotate_streams.stream_count),
+ .new_chunk_id = (typeof(rotate_streams.new_chunk_id)) {
+ .is_set = !!rotate_streams.new_chunk_id.is_set,
+ .value = be64toh(rotate_streams.new_chunk_id.value),
+ }
+ };
- path_len = stream_info.pathname_length;
- if (payload->size < header_len + path_len) {
- ERR("Unexpected payload size in \"relay_rotate_session_stream\" including path: expected >= %zu bytes, got %zu bytes",
- header_len + path_len, payload->size);
- ret = -1;
- goto end_no_reply;
- }
-
- /* Ensure it fits in local filename length. */
- if (path_len >= LTTNG_PATH_MAX) {
- ret = -ENAMETOOLONG;
- ERR("Length of relay_rotate_session_stream command's path name (%zu bytes) exceeds the maximal allowed length of %i bytes",
- path_len, LTTNG_PATH_MAX);
- goto end;
+ if (rotate_streams.new_chunk_id.is_set) {
+ /*
+ * Retrieve the trace chunk the stream must transition to. As
+ * per the protocol, this chunk should have been created
+ * before this command is received.
+ */
+ next_trace_chunk = sessiond_trace_chunk_registry_get_chunk(
+ sessiond_trace_chunk_registry,
+ session->sessiond_uuid, session->id,
+ rotate_streams.new_chunk_id.value);
+ if (!next_trace_chunk) {
+ char uuid_str[UUID_STR_LEN];
+
+ lttng_uuid_to_str(session->sessiond_uuid, uuid_str);
+ ERR("Unknown next trace chunk in ROTATE_STREAMS command: sessiond_uuid = {%s}, session_id = %" PRIu64
+ ", trace_chunk_id = %" PRIu64,
+ uuid_str, session->id,
+ rotate_streams.new_chunk_id.value);
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+ ret = -1;
+ goto end;
+ }
+
+ ret = snprintf(chunk_id_buf, sizeof(chunk_id_buf), "%" PRIu64,
+ rotate_streams.new_chunk_id.value);
+ if (ret < 0 || ret >= sizeof(chunk_id_buf)) {
+ chunk_id_str = "formatting error";
+ } else {
+ chunk_id_str = chunk_id_buf;
+ }
+ session->has_rotated = true;
}
- new_path_view = lttng_buffer_view_from_view(payload, header_len,
- stream_info.pathname_length);
+ DBG("Rotate %" PRIu32 " streams of session \"%s\" to chunk \"%s\"",
+ rotate_streams.stream_count, session->session_name,
+ chunk_id_str);
- stream = stream_get_by_id(stream_info.stream_id);
- if (!stream) {
+ stream_positions = lttng_buffer_view_from_view(payload,
+ sizeof(rotate_streams), -1);
+ if (!stream_positions.data ||
+ stream_positions.size <
+ (rotate_streams.stream_count *
+ sizeof(struct lttcomm_relayd_stream_rotation_position))) {
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
ret = -1;
goto end;
}
- pthread_mutex_lock(&stream->lock);
+ for (i = 0; i < rotate_streams.stream_count; i++) {
+ struct lttcomm_relayd_stream_rotation_position *position_comm =
+ &((typeof(position_comm)) stream_positions.data)[i];
+ const struct lttcomm_relayd_stream_rotation_position pos = {
+ .stream_id = be64toh(position_comm->stream_id),
+ .rotate_at_seq_num = be64toh(
+ position_comm->rotate_at_seq_num),
+ };
- /*
- * Update the trace path (just the folder, the stream name does not
- * change).
- */
- free(stream->path_name);
- stream->path_name = create_output_path(new_path_view.data);
- if (!stream->path_name) {
- ERR("Failed to create a new output path");
- ret = -1;
- goto end_stream_unlock;
- }
- ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
- -1, -1);
- if (ret < 0) {
- ERR("relay creating output directory");
- ret = -1;
- goto end_stream_unlock;
- }
+ stream = stream_get_by_id(pos.stream_id);
+ if (!stream) {
+ reply_code = LTTNG_ERR_INVALID;
+ ret = -1;
+ goto end;
+ }
- assert(stream->current_chunk_id.is_set);
- stream->current_chunk_id.value = stream_info.new_chunk_id;
+ pthread_mutex_lock(&stream->lock);
+ ret = stream_set_pending_rotation(stream, next_trace_chunk,
+ pos.rotate_at_seq_num);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret) {
+ reply_code = LTTNG_ERR_FILE_CREATION_ERROR;
+ goto end;
+ }
- if (stream->is_metadata) {
- /*
- * The metadata stream is sent only over the control connection
- * so we know we have all the data to perform the stream
- * rotation.
- */
- ret = do_rotate_stream(stream);
- } else {
- stream->rotate_at_seq_num = stream_info.rotate_at_seq_num;
- ret = try_rotate_stream(stream);
- }
- if (ret < 0) {
- goto end_stream_unlock;
+ stream_put(stream);
+ stream = NULL;
}
-end_stream_unlock:
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
+ reply_code = LTTNG_OK;
+ ret = 0;
end:
- memset(&reply, 0, sizeof(reply));
- if (ret < 0) {
- reply.ret_code = htobe32(LTTNG_ERR_UNK);
- } else {
- reply.ret_code = htobe32(LTTNG_OK);
+ if (stream) {
+ stream_put(stream);
}
+
+ reply.ret_code = htobe32((uint32_t) reply_code);
send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
sizeof(struct lttcomm_relayd_generic_reply), 0);
if (send_ret < (ssize_t) sizeof(reply)) {
send_ret);
ret = -1;
}
-
end_no_reply:
+ lttng_trace_chunk_put(next_trace_chunk);
return ret;
}
+
+
/*
- * relay_mkdir: Create a folder on the disk.
+ * relay_create_trace_chunk: create a new trace chunk
*/
-static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn,
const struct lttng_buffer_view *payload)
{
- int ret;
- struct relay_session *session = conn->session;
- struct lttcomm_relayd_mkdir path_info_header;
- struct lttcomm_relayd_generic_reply reply;
- char *path = NULL;
- size_t header_len;
+ int ret = 0;
ssize_t send_ret;
- struct lttng_buffer_view path_view;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_create_trace_chunk *msg;
+ struct lttcomm_relayd_generic_reply reply = {};
+ struct lttng_buffer_view header_view;
+ struct lttng_buffer_view chunk_name_view;
+ struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL;
+ enum lttng_error_code reply_code = LTTNG_OK;
+ enum lttng_trace_chunk_status chunk_status;
+ struct lttng_directory_handle session_output;
if (!session || !conn->version_check_done) {
- ERR("Trying to create a directory before version check");
+ ERR("Trying to create a trace chunk before version check");
ret = -1;
- goto end_no_session;
+ goto end_no_reply;
}
if (session->major == 2 && session->minor < 11) {
- /*
- * This client is not supposed to use this command since
- * it predates its introduction.
- */
- ERR("relay_mkdir command is unsupported before LTTng 2.11");
+ ERR("Chunk creation command is unsupported before 2.11");
ret = -1;
- goto end_no_session;
+ goto end_no_reply;
}
- header_len = sizeof(path_info_header);
- if (payload->size < header_len) {
- ERR("Unexpected payload size in \"relay_mkdir\": expected >= %zu bytes, got %zu bytes",
- header_len, payload->size);
+ header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+ if (!header_view.data) {
+ ERR("Failed to receive payload of chunk creation command");
ret = -1;
- goto end_no_session;
+ goto end_no_reply;
}
- memcpy(&path_info_header, payload->data, header_len);
+ /* Convert to host endianness. */
+ msg = (typeof(msg)) header_view.data;
+ msg->chunk_id = be64toh(msg->chunk_id);
+ msg->creation_timestamp = be64toh(msg->creation_timestamp);
+ msg->override_name_length = be32toh(msg->override_name_length);
- path_info_header.length = be32toh(path_info_header.length);
-
- if (payload->size < header_len + path_info_header.length) {
- ERR("Unexpected payload size in \"relay_mkdir\" including path: expected >= %zu bytes, got %zu bytes",
- header_len + path_info_header.length, payload->size);
+ chunk = lttng_trace_chunk_create(
+ msg->chunk_id, msg->creation_timestamp);
+ if (!chunk) {
+ ERR("Failed to create trace chunk in trace chunk creation command");
ret = -1;
- goto end_no_session;
- }
-
- /* Ensure that it fits in local path length. */
- if (path_info_header.length >= LTTNG_PATH_MAX) {
- ret = -ENAMETOOLONG;
- ERR("Path name argument of mkdir command (%" PRIu32 " bytes) exceeds the maximal length allowed (%d bytes)",
- path_info_header.length, LTTNG_PATH_MAX);
+ reply_code = LTTNG_ERR_NOMEM;
goto end;
}
- path_view = lttng_buffer_view_from_view(payload, header_len,
- path_info_header.length);
+ if (msg->override_name_length) {
+ const char *name;
- path = create_output_path(path_view.data);
- if (!path) {
- ERR("Failed to create output path");
- ret = -1;
- goto end;
+ chunk_name_view = lttng_buffer_view_from_view(payload,
+ sizeof(*msg),
+ msg->override_name_length);
+ name = chunk_name_view.data;
+ if (!name || name[msg->override_name_length - 1]) {
+ ERR("Failed to receive payload of chunk creation command");
+ ret = -1;
+ reply_code = LTTNG_ERR_INVALID;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_override_name(
+ chunk, chunk_name_view.data);
+ switch (chunk_status) {
+ case LTTNG_TRACE_CHUNK_STATUS_OK:
+ break;
+ case LTTNG_TRACE_CHUNK_STATUS_INVALID_ARGUMENT:
+ ERR("Failed to set the name of new trace chunk in trace chunk creation command (invalid name)");
+ reply_code = LTTNG_ERR_INVALID;
+ ret = -1;
+ goto end;
+ default:
+ ERR("Failed to set the name of new trace chunk in trace chunk creation command (unknown error)");
+ reply_code = LTTNG_ERR_UNK;
+ ret = -1;
+ goto end;
+ }
}
- ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1);
- if (ret < 0) {
- ERR("relay creating output directory");
+ chunk_status = lttng_trace_chunk_set_credentials_current_user(chunk);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ reply_code = LTTNG_ERR_UNK;
+ ret = -1;
goto end;
}
- ret = 0;
-
-end:
- memset(&reply, 0, sizeof(reply));
- if (ret < 0) {
- reply.ret_code = htobe32(LTTNG_ERR_UNK);
- } else {
- reply.ret_code = htobe32(LTTNG_OK);
+ ret = session_init_output_directory_handle(
+ conn->session, &session_output);
+ if (ret) {
+ reply_code = LTTNG_ERR_CREATE_DIR_FAIL;
+ goto end;
}
- send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
- if (send_ret < (ssize_t) sizeof(reply)) {
- ERR("Failed to send \"mkdir\" command reply (ret = %zd)", send_ret);
+ chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output);
+ lttng_directory_handle_fini(&session_output);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ reply_code = LTTNG_ERR_UNK;
ret = -1;
+ goto end;
}
-end_no_session:
- free(path);
- return ret;
-}
+ published_chunk = sessiond_trace_chunk_registry_publish_chunk(
+ sessiond_trace_chunk_registry,
+ conn->session->sessiond_uuid,
+ conn->session->id,
+ chunk);
+ if (!published_chunk) {
+ char uuid_str[UUID_STR_LEN];
-static int validate_rotate_rename_path_length(const char *path_type,
- uint32_t path_length)
-{
- int ret = 0;
+ lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+ ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ uuid_str,
+ conn->session->id,
+ msg->chunk_id);
+ ret = -1;
+ reply_code = LTTNG_ERR_NOMEM;
+ goto end;
+ }
- if (path_length > LTTNG_PATH_MAX) {
- ret = -ENAMETOOLONG;
- ERR("rotate rename \"%s\" path name length (%" PRIu32 " bytes) exceeds the allowed size of %i bytes",
- path_type, path_length, LTTNG_PATH_MAX);
- } else if (path_length == 0) {
- ret = -EINVAL;
- ERR("rotate rename \"%s\" path name has an illegal length of 0", path_type);
+ pthread_mutex_lock(&conn->session->lock);
+ if (conn->session->pending_closure_trace_chunk) {
+ /*
+ * Invalid; this means a second create_trace_chunk command was
+ * received before a close_trace_chunk.
+ */
+ ERR("Invalid trace chunk close command received; a trace chunk is already waiting for a trace chunk close command");
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+ ret = -1;
+ goto end_unlock_session;
+ }
+ conn->session->pending_closure_trace_chunk =
+ conn->session->current_trace_chunk;
+ conn->session->current_trace_chunk = published_chunk;
+ published_chunk = NULL;
+end_unlock_session:
+ pthread_mutex_unlock(&conn->session->lock);
+end:
+ reply.ret_code = htobe32((uint32_t) reply_code);
+ send_ret = conn->sock->ops->sendmsg(conn->sock,
+ &reply,
+ sizeof(struct lttcomm_relayd_generic_reply),
+ 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
}
+end_no_reply:
+ lttng_trace_chunk_put(chunk);
+ lttng_trace_chunk_put(published_chunk);
return ret;
}
/*
- * relay_rotate_rename: rename the trace folder after a rotation is
- * completed. We are not closing any fd here, just moving the folder, so it
- * works even if data is still in-flight.
+ * relay_close_trace_chunk: close a trace chunk
*/
-static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn,
const struct lttng_buffer_view *payload)
{
- int ret;
+ int ret = 0, buf_ret;
ssize_t send_ret;
struct relay_session *session = conn->session;
- struct lttcomm_relayd_generic_reply reply;
- struct lttcomm_relayd_rotate_rename header;
- size_t header_len;
- size_t received_paths_size;
- char *complete_old_path = NULL, *complete_new_path = NULL;
- struct lttng_buffer_view old_path_view;
- struct lttng_buffer_view new_path_view;
+ struct lttcomm_relayd_close_trace_chunk *msg;
+ struct lttcomm_relayd_close_trace_chunk_reply reply = {};
+ struct lttng_buffer_view header_view;
+ struct lttng_trace_chunk *chunk = NULL;
+ enum lttng_error_code reply_code = LTTNG_OK;
+ enum lttng_trace_chunk_status chunk_status;
+ uint64_t chunk_id;
+ LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
+ time_t close_timestamp;
+ char closed_trace_chunk_path[LTTNG_PATH_MAX];
+ size_t path_length = 0;
+ const char *chunk_name = NULL;
+ struct lttng_dynamic_buffer reply_payload;
+
+ lttng_dynamic_buffer_init(&reply_payload);
if (!session || !conn->version_check_done) {
- ERR("Trying to rename a trace folder before version check");
+ ERR("Trying to close a trace chunk before version check");
ret = -1;
goto end_no_reply;
}
if (session->major == 2 && session->minor < 11) {
- ERR("relay_rotate_rename command is unsupported before LTTng 2.11");
+ ERR("Chunk close command is unsupported before 2.11");
ret = -1;
goto end_no_reply;
}
- header_len = sizeof(header);
- if (payload->size < header_len) {
- ERR("Unexpected payload size in \"relay_rotate_rename\": expected >= %zu bytes, got %zu bytes",
- header_len, payload->size);
+ header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+ if (!header_view.data) {
+ ERR("Failed to receive payload of chunk close command");
ret = -1;
goto end_no_reply;
}
- memcpy(&header, payload->data, header_len);
-
- header.old_path_length = be32toh(header.old_path_length);
- header.new_path_length = be32toh(header.new_path_length);
- received_paths_size = header.old_path_length + header.new_path_length;
-
- if (payload->size < header_len + received_paths_size) {
- ERR("Unexpected payload size in \"relay_rotate_rename\" including paths: expected >= %zu bytes, got %zu bytes",
- header_len, payload->size);
- ret = -1;
- goto end_no_reply;
- }
-
- /* Ensure the paths don't exceed their allowed size. */
- ret = validate_rotate_rename_path_length("old", header.old_path_length);
- if (ret) {
+ /* Convert to host endianness. */
+ msg = (typeof(msg)) header_view.data;
+ chunk_id = be64toh(msg->chunk_id);
+ close_timestamp = (time_t) be64toh(msg->close_timestamp);
+ close_command = (typeof(close_command)){
+ .value = be32toh(msg->close_command.value),
+ .is_set = msg->close_command.is_set,
+ };
+
+ chunk = sessiond_trace_chunk_registry_get_chunk(
+ sessiond_trace_chunk_registry,
+ conn->session->sessiond_uuid,
+ conn->session->id,
+ chunk_id);
+ if (!chunk) {
+ char uuid_str[UUID_STR_LEN];
+
+ lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str);
+ ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ uuid_str,
+ conn->session->id,
+ msg->chunk_id);
+ ret = -1;
+ reply_code = LTTNG_ERR_NOMEM;
goto end;
}
- ret = validate_rotate_rename_path_length("new", header.new_path_length);
- if (ret) {
- goto end;
- }
-
- old_path_view = lttng_buffer_view_from_view(payload, header_len,
- header.old_path_length);
- new_path_view = lttng_buffer_view_from_view(payload,
- header_len + header.old_path_length,
- header.new_path_length);
- /* Validate that both paths received are NULL terminated. */
- if (old_path_view.data[old_path_view.size - 1] != '\0') {
- ERR("relay_rotate_rename command's \"old\" path is invalid (not NULL terminated)");
+ pthread_mutex_lock(&session->lock);
+ if (session->pending_closure_trace_chunk &&
+ session->pending_closure_trace_chunk != chunk) {
+ ERR("Trace chunk close command for session \"%s\" does not target the trace chunk pending closure",
+ session->session_name);
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
ret = -1;
- goto end;
+ goto end_unlock_session;
}
- if (new_path_view.data[new_path_view.size - 1] != '\0') {
- ERR("relay_rotate_rename command's \"new\" path is invalid (not NULL terminated)");
+
+ chunk_status = lttng_trace_chunk_set_close_timestamp(
+ chunk, close_timestamp);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk close timestamp");
ret = -1;
- goto end;
+ reply_code = LTTNG_ERR_UNK;
+ goto end_unlock_session;
}
- complete_old_path = create_output_path(old_path_view.data);
- if (!complete_old_path) {
- ERR("Failed to build old output path in rotate_rename command");
+ if (close_command.is_set) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, close_command.value);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ reply_code = LTTNG_ERR_INVALID;
+ goto end_unlock_session;
+ }
+ }
+ chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, NULL);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to get chunk name");
ret = -1;
- goto end;
+ reply_code = LTTNG_ERR_UNK;
+ goto end_unlock_session;
}
-
- complete_new_path = create_output_path(new_path_view.data);
- if (!complete_new_path) {
- ERR("Failed to build new output path in rotate_rename command");
+ if (!session->has_rotated && !session->snapshot) {
+ ret = lttng_strncpy(closed_trace_chunk_path,
+ session->output_path,
+ sizeof(closed_trace_chunk_path));
+ if (ret) {
+ ERR("Failed to send trace chunk path: path length of %zu bytes exceeds the maximal allowed length of %zu bytes",
+ strlen(session->output_path),
+ sizeof(closed_trace_chunk_path));
+ reply_code = LTTNG_ERR_NOMEM;
+ ret = -1;
+ goto end_unlock_session;
+ }
+ } else {
+ if (session->snapshot) {
+ ret = snprintf(closed_trace_chunk_path,
+ sizeof(closed_trace_chunk_path),
+ "%s/%s", session->output_path,
+ chunk_name);
+ } else {
+ ret = snprintf(closed_trace_chunk_path,
+ sizeof(closed_trace_chunk_path),
+ "%s/" DEFAULT_ARCHIVED_TRACE_CHUNKS_DIRECTORY
+ "/%s",
+ session->output_path, chunk_name);
+ }
+ if (ret < 0 || ret == sizeof(closed_trace_chunk_path)) {
+ ERR("Failed to format closed trace chunk resulting path");
+ reply_code = ret < 0 ? LTTNG_ERR_UNK : LTTNG_ERR_NOMEM;
+ ret = -1;
+ goto end_unlock_session;
+ }
+ }
+ DBG("Reply chunk path on close: %s", closed_trace_chunk_path);
+ path_length = strlen(closed_trace_chunk_path) + 1;
+ if (path_length > UINT32_MAX) {
+ ERR("Closed trace chunk path exceeds the maximal length allowed by the protocol");
ret = -1;
- goto end;
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+ goto end_unlock_session;
}
- ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG,
- -1, -1);
- if (ret < 0) {
- ERR("Failed to mkdir() rotate_rename's \"new\" output directory at \"%s\"",
- complete_new_path);
- goto end;
+ if (session->current_trace_chunk == chunk) {
+ /*
+ * After a trace chunk close command, no new streams
+ * referencing the chunk may be created. Hence, on the
+ * event that no new trace chunk have been created for
+ * the session, the reference to the current trace chunk
+ * is released in order to allow it to be reclaimed when
+ * the last stream releases its reference to it.
+ */
+ lttng_trace_chunk_put(session->current_trace_chunk);
+ session->current_trace_chunk = NULL;
}
+ lttng_trace_chunk_put(session->pending_closure_trace_chunk);
+ session->pending_closure_trace_chunk = NULL;
+end_unlock_session:
+ pthread_mutex_unlock(&session->lock);
- /*
- * If a domain has not yet created its channel, the domain-specific
- * folder might not exist, but this is not an error.
- */
- ret = rename(complete_old_path, complete_new_path);
- if (ret < 0 && errno != ENOENT) {
- PERROR("Renaming chunk in rotate_rename command from \"%s\" to \"%s\"",
- complete_old_path, complete_new_path);
- goto end;
+end:
+ reply.generic.ret_code = htobe32((uint32_t) reply_code);
+ reply.path_length = htobe32((uint32_t) path_length);
+ buf_ret = lttng_dynamic_buffer_append(
+ &reply_payload, &reply, sizeof(reply));
+ if (buf_ret) {
+ ERR("Failed to append \"close trace chunk\" command reply header to payload buffer");
+ goto end_no_reply;
}
- ret = 0;
-end:
- memset(&reply, 0, sizeof(reply));
- if (ret < 0) {
- reply.ret_code = htobe32(LTTNG_ERR_UNK);
- } else {
- reply.ret_code = htobe32(LTTNG_OK);
+ if (reply_code == LTTNG_OK) {
+ buf_ret = lttng_dynamic_buffer_append(&reply_payload,
+ closed_trace_chunk_path, path_length);
+ if (buf_ret) {
+ ERR("Failed to append \"close trace chunk\" command reply path to payload buffer");
+ goto end_no_reply;
+ }
}
- send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
- sizeof(reply), 0);
- if (send_ret < sizeof(reply)) {
- ERR("Failed to send \"rotate rename\" command reply (ret = %zd)",
- send_ret);
+
+ send_ret = conn->sock->ops->sendmsg(conn->sock,
+ reply_payload.data,
+ reply_payload.size,
+ 0);
+ if (send_ret < reply_payload.size) {
+ ERR("Failed to send \"close trace chunk\" command reply of %zu bytes (ret = %zd)",
+ reply_payload.size, send_ret);
ret = -1;
+ goto end_no_reply;
}
-
end_no_reply:
- free(complete_old_path);
- free(complete_new_path);
+ lttng_trace_chunk_put(chunk);
+ lttng_dynamic_buffer_reset(&reply_payload);
return ret;
}
/*
- * Check if all the streams in the session have completed the last rotation.
- * The chunk_id value is used to distinguish the cases where a stream was
- * closed on the consumerd before the rotation started but it still active on
- * the relayd, and the case where a stream appeared on the consumerd/relayd
- * after the last rotation started (in that case, it is already writing in the
- * new chunk folder).
+ * relay_trace_chunk_exists: check if a trace chunk exists
*/
-static
-int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn,
const struct lttng_buffer_view *payload)
{
- struct relay_session *session = conn->session;
- struct lttcomm_relayd_rotate_pending msg;
- struct lttcomm_relayd_rotate_pending_reply reply;
- struct lttng_ht_iter iter;
- struct relay_stream *stream;
int ret = 0;
ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_trace_chunk_exists *msg;
+ struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
+ struct lttng_buffer_view header_view;
uint64_t chunk_id;
- bool rotate_pending = false;
-
- DBG("Rotate pending command received");
+ bool chunk_exists;
if (!session || !conn->version_check_done) {
- ERR("Trying to check for data before version check");
+ ERR("Trying to close a trace chunk before version check");
ret = -1;
goto end_no_reply;
}
if (session->major == 2 && session->minor < 11) {
- ERR("Unsupported feature before 2.11");
+ ERR("Chunk close command is unsupported before 2.11");
ret = -1;
goto end_no_reply;
}
- if (payload->size < sizeof(msg)) {
- ERR("Unexpected payload size in \"relay_rotate_pending\": expected >= %zu bytes, got %zu bytes",
- sizeof(msg), payload->size);
+ header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+ if (!header_view.data) {
+ ERR("Failed to receive payload of chunk close command");
ret = -1;
goto end_no_reply;
}
- memcpy(&msg, payload->data, sizeof(msg));
-
- chunk_id = be64toh(msg.chunk_id);
-
- DBG("Evaluating rotate pending for session \"%s\" and chunk id %" PRIu64,
- session->session_name, chunk_id);
+ /* Convert to host endianness. */
+ msg = (typeof(msg)) header_view.data;
+ chunk_id = be64toh(msg->chunk_id);
+ ret = sessiond_trace_chunk_registry_chunk_exists(
+ sessiond_trace_chunk_registry,
+ conn->session->sessiond_uuid,
+ conn->session->id,
+ chunk_id, &chunk_exists);
/*
- * Iterate over all the streams in the session and check if they are
- * still waiting for data to perform their rotation.
+ * If ret is not 0, send the reply and report the error to the caller.
+ * It is a protocol (or internal) error and the session/connection
+ * should be torn down.
*/
- rcu_read_lock();
- cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- node.node) {
- if (!stream_get(stream)) {
- continue;
- }
- if (stream->trace->session != session) {
- stream_put(stream);
- continue;
- }
- pthread_mutex_lock(&stream->lock);
- if (stream->rotate_at_seq_num != -1ULL) {
- /* We have not yet performed the rotation. */
- rotate_pending = true;
- DBG("Stream %" PRIu64 " is still rotating",
- stream->stream_handle);
- } else if (stream->current_chunk_id.value < chunk_id) {
- /*
- * Stream closed on the consumer but still active on the
- * relay.
- */
- rotate_pending = true;
- DBG("Stream %" PRIu64 " did not exist on the consumer "
- "when the last rotation started, but is"
- "still waiting for data before getting"
- "closed",
- stream->stream_handle);
- }
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
- if (rotate_pending) {
- goto send_reply;
- }
- }
-
-send_reply:
- rcu_read_unlock();
- memset(&reply, 0, sizeof(reply));
- reply.generic.ret_code = htobe32((uint32_t) LTTNG_OK);
- reply.is_pending = (uint8_t) !!rotate_pending;
- send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
- sizeof(reply), 0);
+ reply = (typeof(reply)){
+ .generic.ret_code = htobe32((uint32_t)
+ (ret == 0 ? LTTNG_OK : LTTNG_ERR_INVALID_PROTOCOL)),
+ .trace_chunk_exists = ret == 0 ? chunk_exists : 0,
+ };
+ send_ret = conn->sock->ops->sendmsg(
+ conn->sock, &reply, sizeof(reply), 0);
if (send_ret < (ssize_t) sizeof(reply)) {
- ERR("Failed to send \"rotate pending\" command reply (ret = %zd)",
+ ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
send_ret);
ret = -1;
}
-
end_no_reply:
return ret;
}
DBG_CMD("RELAYD_RESET_METADATA", conn);
ret = relay_reset_metadata(header, conn, payload);
break;
- case RELAYD_ROTATE_STREAM:
- DBG_CMD("RELAYD_ROTATE_STREAM", conn);
- ret = relay_rotate_session_stream(header, conn, payload);
+ case RELAYD_ROTATE_STREAMS:
+ DBG_CMD("RELAYD_ROTATE_STREAMS", conn);
+ ret = relay_rotate_session_streams(header, conn, payload);
break;
- case RELAYD_ROTATE_RENAME:
- DBG_CMD("RELAYD_ROTATE_RENAME", conn);
- ret = relay_rotate_rename(header, conn, payload);
+ case RELAYD_CREATE_TRACE_CHUNK:
+ DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
+ ret = relay_create_trace_chunk(header, conn, payload);
break;
- case RELAYD_ROTATE_PENDING:
- DBG_CMD("RELAYD_ROTATE_PENDING", conn);
- ret = relay_rotate_pending(header, conn, payload);
+ case RELAYD_CLOSE_TRACE_CHUNK:
+ DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
+ ret = relay_close_trace_chunk(header, conn, payload);
break;
- case RELAYD_MKDIR:
- DBG_CMD("RELAYD_MKDIR", conn);
- ret = relay_mkdir(header, conn, payload);
+ case RELAYD_TRACE_CHUNK_EXISTS:
+ DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn);
+ ret = relay_trace_chunk_exists(header, conn, payload);
break;
case RELAYD_UPDATE_SYNC_INFO:
default:
return status;
}
-/*
- * Handle index for a data stream.
- *
- * Called with the stream lock held.
- *
- * Return 0 on success else a negative value.
- */
-static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
- bool rotate_index, bool *flushed, uint64_t total_size)
-{
- int ret = 0;
- uint64_t data_offset;
- struct relay_index *index;
-
- /* Get data offset because we are about to update the index. */
- data_offset = htobe64(stream->tracefile_size_current);
-
- DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
- stream->stream_handle, net_seq_num, stream->tracefile_size_current);
-
- /*
- * Lookup for an existing index for that stream id/sequence
- * number. If it exists, the control thread has already received the
- * data for it, thus we need to write it to disk.
- */
- index = relay_index_get_by_id_or_create(stream, net_seq_num);
- if (!index) {
- ret = -1;
- goto end;
- }
-
- if (rotate_index || !stream->index_file) {
- ret = create_rotate_index_file(stream);
- if (ret < 0) {
- ERR("Failed to rotate index");
- /* Put self-ref for this index due to error. */
- relay_index_put(index);
- index = NULL;
- goto end;
- }
- }
-
- if (relay_index_set_file(index, stream->index_file, data_offset)) {
- ret = -1;
- /* Put self-ref for this index due to error. */
- relay_index_put(index);
- index = NULL;
- goto end;
- }
-
- ret = relay_index_try_flush(index);
- if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
- stream->index_received_seqcount++;
- *flushed = true;
- } else if (ret > 0) {
- index->total_size = total_size;
- /* No flush. */
- ret = 0;
- } else {
- /*
- * ret < 0
- *
- * relay_index_try_flush is responsible for the self-reference
- * put of the index object on error.
- */
- ERR("relay_index_try_flush error %d", ret);
- ret = -1;
- }
-end:
- return ret;
-}
-
static enum relay_connection_status relay_process_data_receive_header(
struct relay_connection *conn)
{
}
pthread_mutex_lock(&stream->lock);
-
- /* Check if a rotation is needed. */
- if (stream->tracefile_size > 0 &&
- (stream->tracefile_size_current + header.data_size) >
- stream->tracefile_size) {
- uint64_t old_id, new_id;
-
- old_id = tracefile_array_get_file_index_head(stream->tfa);
- tracefile_array_file_rotate(stream->tfa);
-
- /* new_id is updated by utils_rotate_stream_file. */
- new_id = old_id;
-
- ret = utils_rotate_stream_file(stream->path_name,
- stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- &new_id, &stream->stream_fd->fd);
- if (ret < 0) {
- ERR("Failed to rotate stream output file");
- status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
- }
-
- /*
- * Reset current size because we just performed a stream
- * rotation.
- */
- stream->tracefile_size_current = 0;
- conn->protocol.data.state.receive_payload.rotate_index = true;
+ /* Prepare stream for the reception of a new packet. */
+ ret = stream_init_packet(stream, header.data_size,
+ &conn->protocol.data.state.receive_payload.rotate_index);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret) {
+ ERR("Failed to rotate stream output file");
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
}
end_stream_unlock:
- pthread_mutex_unlock(&stream->lock);
stream_put(stream);
end:
return status;
* - the on-stack data buffer
*/
while (left_to_receive > 0 && !partial_recv) {
- ssize_t write_ret;
size_t recv_size = min(left_to_receive, chunk_size);
+ struct lttng_buffer_view packet_chunk;
ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
recv_size, MSG_DONTWAIT);
* consumed.
*/
partial_recv = true;
+ recv_size = ret;
}
- recv_size = ret;
+ packet_chunk = lttng_buffer_view_init(data_buffer,
+ 0, recv_size);
+ assert(packet_chunk.data);
- /* Write data to stream output fd. */
- write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
- recv_size);
- if (write_ret < (ssize_t) recv_size) {
+ ret = stream_write(stream, &packet_chunk, 0);
+ if (ret) {
ERR("Relay error writing data to file");
status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
left_to_receive -= recv_size;
state->received += recv_size;
state->left_to_receive = left_to_receive;
-
- DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
- write_ret, stream->stream_handle);
}
if (state->left_to_receive > 0) {
goto end_stream_unlock;
}
- ret = write_padding_to_file(stream->stream_fd->fd,
- state->header.padding_size);
- if ((int64_t) ret < (int64_t) state->header.padding_size) {
- ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
- stream->stream_handle,
- state->header.net_seq_num, ret);
+ ret = stream_write(stream, NULL, state->header.padding_size);
+ if (ret) {
status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
-
if (session_streams_have_index(session)) {
- ret = handle_index_data(stream, state->header.net_seq_num,
- state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
+ ret = stream_update_index(stream, state->header.net_seq_num,
+ state->rotate_index, &index_flushed,
+ state->header.data_size + state->header.padding_size);
if (ret < 0) {
- ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ ERR("Failed to update index: stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
}
}
- stream->tracefile_size_current += state->header.data_size +
- state->header.padding_size;
-
if (stream->prev_data_seq == -1ULL) {
new_stream = true;
}
- if (index_flushed) {
- stream->pos_after_last_complete_data_index =
- stream->tracefile_size_current;
- stream->prev_index_seq = state->header.net_seq_num;
- }
- stream->prev_data_seq = state->header.net_seq_num;
+ ret = stream_complete_packet(stream, state->header.data_size +
+ state->header.padding_size, state->header.net_seq_num,
+ index_flushed);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
/*
* Resetting the protocol state (to RECEIVE_HEADER) will trash the
connection_reset_protocol_state(conn);
state = NULL;
- ret = try_rotate_stream(stream);
- if (ret < 0) {
- status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
- }
-
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
health_code_update();
- if (!revents) {
- /*
- * No activity for this FD (poll
- * implementation).
- */
- continue;
- }
-
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
if (ret < 0) {
goto error;
}
- lttng_poll_add(&events, conn->sock->fd,
+ ret = lttng_poll_add(&events,
+ conn->sock->fd,
LPOLLIN | LPOLLRDHUP);
+ if (ret) {
+ ERR("Failed to add new connection file descriptor to poll set");
+ goto error;
+ }
connection_ht_add(relay_connections_ht, conn);
DBG("Connection socket %d added", conn->sock->fd);
} else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
goto exit_options;
}
+ relayd_config_log();
+
+ if (opt_print_version) {
+ print_version();
+ retval = 0;
+ goto exit_options;
+ }
+
/* Try to create directory if -o, --output is specified. */
if (opt_output_path) {
if (*opt_output_path != '/') {
}
}
+ sessiond_trace_chunk_registry = sessiond_trace_chunk_registry_create();
+ if (!sessiond_trace_chunk_registry) {
+ ERR("Failed to initialize session daemon trace chunk registry");
+ retval = -1;
+ goto exit_sessiond_trace_chunk_registry;
+ }
+
/* Initialize thread health monitoring */
health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
if (!health_relayd) {
exit_init_data:
health_app_destroy(health_relayd);
+ sessiond_trace_chunk_registry_destroy(sessiond_trace_chunk_registry);
exit_health_app_create:
+exit_sessiond_trace_chunk_registry:
exit_options:
/*
* Wait for all pending call_rcu work to complete before tearing