X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=4e30ac49882e5c392097b51dfedf95b0d7207686;hp=e7c5b6c52d28dfcd1fa80fdf925c8d5317f45b27;hb=f77a6ec2ae90f5d58e9389e47228c18b081cf97a;hpb=a8b66566890a353cc3a89836281541195ad69d73 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index e7c5b6c52..4e30ac498 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -62,22 +62,23 @@ #include #include -#include "version.h" +#include "backward-compatibility-group-by.h" #include "cmd.h" +#include "connection.h" #include "ctf-trace.h" +#include "health-relayd.h" #include "index.h" -#include "utils.h" -#include "lttng-relayd.h" #include "live.h" -#include "health-relayd.h" -#include "testpoint.h" -#include "viewer-stream.h" +#include "lttng-relayd.h" #include "session.h" +#include "sessiond-trace-chunks.h" #include "stream.h" -#include "connection.h" -#include "tracefile-array.h" #include "tcp_keep_alive.h" -#include "sessiond-trace-chunks.h" +#include "testpoint.h" +#include "tracefile-array.h" +#include "utils.h" +#include "version.h" +#include "viewer-stream.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -97,7 +98,7 @@ enum relay_connection_status { /* command line options */ char *opt_output_path, *opt_working_directory; -static int opt_daemon, opt_background, opt_print_version; +static int opt_daemon, opt_background, opt_print_version, opt_allow_clear = 1; enum relay_group_output_by opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_UNKNOWN; /* @@ -188,6 +189,7 @@ static struct option long_options[] = { { "working-directory", 1, 0, 'w', }, { "group-output-by-session", 0, 0, 's', }, { "group-output-by-host", 0, 0, 'p', }, + { "disallow-clear", 0, 0, 'x' }, { NULL, 0, 0, 0, }, }; @@ -354,6 +356,10 @@ static int set_option(int opt, const char *arg, const char *optname) } opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_HOST; break; + case 'x': + /* Disallow clear */ + opt_allow_clear = 0; + break; default: /* Unknown option or other error. * Error is printed by getopt, just return */ @@ -561,6 +567,19 @@ static int set_options(int argc, char **argv) if (opt_group_output_by == RELAYD_GROUP_OUTPUT_BY_UNKNOWN) { opt_group_output_by = RELAYD_GROUP_OUTPUT_BY_HOST; } + if (opt_allow_clear) { + /* Check if env variable exists. */ + const char *value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV); + if (value) { + ret = config_parse_value(value); + if (ret < 0) { + ERR("Invalid value for %s specified", DEFAULT_LTTNG_RELAYD_DISALLOW_CLEAR_ENV); + retval = -1; + goto exit; + } + opt_allow_clear = !ret; + } + } exit: free(optstring); @@ -1370,12 +1389,57 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, goto send_reply; } + /* + * Backward compatibility for --group-output-by-session. + * Prior to lttng 2.11, the complete path is passed by the stream. + * Starting at 2.11, lttng-relayd uses chunk. When dealing with producer + * >=2.11 the chunk is responsible for the output path. When dealing + * with producer < 2.11 the chunk output_path is the root output path + * and the stream carries the complete path (path_name). + * To support --group-output-by-session with older producer (<2.11), we + * need to craft the path based on the stream path. + */ + if (opt_group_output_by == RELAYD_GROUP_OUTPUT_BY_SESSION) { + if (conn->minor < 4) { + /* + * From 2.1 to 2.3, the session_name is not passed on + * the RELAYD_CREATE_SESSION command. The session name + * is necessary to detect the presence of a base_path + * inside the stream path. Without it we cannot perform + * a valid group-output-by-session transformation. + */ + WARN("Unable to perform a --group-by-session transformation for session %" PRIu64 + " for stream with path \"%s\" as it is produced by a peer using a protocol older than v2.4", + session->id, path_name); + } else if (conn->minor >= 4 && conn->minor < 11) { + char *group_by_session_path_name; + + assert(session->session_name[0] != '\0'); + + group_by_session_path_name = + backward_compat_group_by_session( + path_name, + session->session_name); + if (!group_by_session_path_name) { + ERR("Failed to apply group by session to stream of session %" PRIu64, + session->id); + goto send_reply; + } + + DBG("Transformed session path from \"%s\" to \"%s\" to honor per-session name grouping", + path_name, group_by_session_path_name); + + free(path_name); + path_name = group_by_session_path_name; + } + } + trace = ctf_trace_get_by_path_or_create(session, path_name); if (!trace) { goto send_reply; } - /* This stream here has one reference on the trace. */ + /* This stream here has one reference on the trace. */ pthread_mutex_lock(&last_relay_stream_id_lock); stream_handle = ++last_relay_stream_id; pthread_mutex_unlock(&last_relay_stream_id_lock); @@ -1475,7 +1539,7 @@ static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr, vstream = viewer_stream_get_by_id(stream->stream_handle); if (vstream) { - if (vstream->metadata_sent == stream->metadata_received) { + if (stream->no_new_metadata_notified) { /* * Since all the metadata has been sent to the * viewer and that we have a request to close @@ -2142,6 +2206,9 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, index_info.stream_instance_id = be64toh(index_info.stream_instance_id); index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + } else { + index_info.stream_instance_id = -1ULL; + index_info.packet_seq_num = -1ULL; } stream = stream_get_by_id(index_info.relay_stream_id); @@ -2286,7 +2353,7 @@ static int relay_rotate_session_streams( session->sessiond_uuid, session->id, rotate_streams.new_chunk_id.value); if (!next_trace_chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(session->sessiond_uuid, uuid_str); ERR("Unknown next trace chunk in ROTATE_STREAMS command: sessiond_uuid = {%s}, session_id = %" PRIu64 @@ -2305,7 +2372,6 @@ static int relay_rotate_session_streams( } else { chunk_id_str = chunk_id_buf; } - session->has_rotated = true; } DBG("Rotate %" PRIu32 " streams of session \"%s\" to chunk \"%s\"", @@ -2391,7 +2457,8 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL; enum lttng_error_code reply_code = LTTNG_OK; enum lttng_trace_chunk_status chunk_status; - struct lttng_directory_handle session_output; + struct lttng_directory_handle *session_output = NULL; + const char *new_path; if (!session || !conn->version_check_done) { ERR("Trying to create a trace chunk before version check"); @@ -2418,8 +2485,29 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, msg->creation_timestamp = be64toh(msg->creation_timestamp); msg->override_name_length = be32toh(msg->override_name_length); + if (session->current_trace_chunk && + !lttng_trace_chunk_get_name_overridden(session->current_trace_chunk)) { + chunk_status = lttng_trace_chunk_rename_path(session->current_trace_chunk, + DEFAULT_CHUNK_TMP_OLD_DIRECTORY); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to rename old chunk"); + ret = -1; + reply_code = LTTNG_ERR_UNK; + goto end; + } + } + session->ongoing_rotation = true; + if (!session->current_trace_chunk) { + if (!session->has_rotated) { + new_path = ""; + } else { + new_path = NULL; + } + } else { + new_path = DEFAULT_CHUNK_TMP_NEW_DIRECTORY; + } chunk = lttng_trace_chunk_create( - msg->chunk_id, msg->creation_timestamp); + msg->chunk_id, msg->creation_timestamp, new_path); if (!chunk) { ERR("Failed to create trace chunk in trace chunk creation command"); ret = -1; @@ -2466,14 +2554,15 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } - ret = session_init_output_directory_handle( - conn->session, &session_output); - if (ret) { + session_output = session_create_output_directory_handle( + conn->session); + if (!session_output) { reply_code = LTTNG_ERR_CREATE_DIR_FAIL; goto end; } - chunk_status = lttng_trace_chunk_set_as_owner(chunk, &session_output); - lttng_directory_handle_fini(&session_output); + chunk_status = lttng_trace_chunk_set_as_owner(chunk, session_output); + lttng_directory_handle_put(session_output); + session_output = NULL; if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { reply_code = LTTNG_ERR_UNK; ret = -1; @@ -2486,7 +2575,7 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, conn->session->id, chunk); if (!published_chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); ERR("Failed to publish chunk: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, @@ -2513,6 +2602,9 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, conn->session->current_trace_chunk; conn->session->current_trace_chunk = published_chunk; published_chunk = NULL; + if (!conn->session->pending_closure_trace_chunk) { + session->ongoing_rotation = false; + } end_unlock_session: pthread_mutex_unlock(&conn->session->lock); end: @@ -2529,6 +2621,7 @@ end: end_no_reply: lttng_trace_chunk_put(chunk); lttng_trace_chunk_put(published_chunk); + lttng_directory_handle_put(session_output); return ret; } @@ -2555,6 +2648,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, size_t path_length = 0; const char *chunk_name = NULL; struct lttng_dynamic_buffer reply_payload; + const char *new_path; lttng_dynamic_buffer_init(&reply_payload); @@ -2592,7 +2686,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, conn->session->id, chunk_id); if (!chunk) { - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; lttng_uuid_to_str(conn->session->sessiond_uuid, uuid_str); ERR("Failed to find chunk to close: sessiond_uuid = %s, session_id = %" PRIu64 ", chunk_id = %" PRIu64, @@ -2605,6 +2699,20 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, } pthread_mutex_lock(&session->lock); + if (close_command.is_set && + close_command.value == LTTNG_TRACE_CHUNK_COMMAND_TYPE_DELETE) { + /* + * Clear command. It is a protocol error to ask for a + * clear on a relay which does not allow it. Querying + * the configuration allows figuring out whether + * clearing is allowed before doing the clear. + */ + if (!opt_allow_clear) { + ret = -1; + reply_code = LTTNG_ERR_INVALID_PROTOCOL; + goto end_unlock_session; + } + } if (session->pending_closure_trace_chunk && session->pending_closure_trace_chunk != chunk) { ERR("Trace chunk close command for session \"%s\" does not target the trace chunk pending closure", @@ -2614,6 +2722,43 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock_session; } + if (session->current_trace_chunk && session->current_trace_chunk != chunk && + !lttng_trace_chunk_get_name_overridden(session->current_trace_chunk)) { + if (close_command.is_set && + close_command.value == LTTNG_TRACE_CHUNK_COMMAND_TYPE_DELETE && + !session->has_rotated) { + /* New chunk stays in session output directory. */ + new_path = ""; + } else { + /* Use chunk name for new chunk. */ + new_path = NULL; + } + /* Rename new chunk path. */ + chunk_status = lttng_trace_chunk_rename_path(session->current_trace_chunk, + new_path); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + session->ongoing_rotation = false; + } + if ((!close_command.is_set || + close_command.value == LTTNG_TRACE_CHUNK_COMMAND_TYPE_NO_OPERATION) && + !lttng_trace_chunk_get_name_overridden(chunk)) { + const char *old_path; + + if (!session->has_rotated) { + old_path = ""; + } else { + old_path = NULL; + } + /* We need to move back the .tmp_old_chunk to its rightful place. */ + chunk_status = lttng_trace_chunk_rename_path(chunk, old_path); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + } chunk_status = lttng_trace_chunk_set_close_timestamp( chunk, close_timestamp); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { @@ -2671,6 +2816,10 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock_session; } } + if (close_command.is_set && + close_command.value == LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED) { + session->has_rotated = true; + } DBG("Reply chunk path on close: %s", closed_trace_chunk_path); path_length = strlen(closed_trace_chunk_path) + 1; if (path_length > UINT32_MAX) { @@ -2797,6 +2946,57 @@ end_no_reply: return ret; } +/* + * relay_get_configuration: query whether feature is available + */ +static int relay_get_configuration(const struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn, + const struct lttng_buffer_view *payload) +{ + int ret = 0; + ssize_t send_ret; + struct lttcomm_relayd_get_configuration *msg; + struct lttcomm_relayd_get_configuration_reply reply = {}; + struct lttng_buffer_view header_view; + uint64_t query_flags = 0; + uint64_t result_flags = 0; + + header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg)); + if (!header_view.data) { + ERR("Failed to receive payload of chunk close command"); + ret = -1; + goto end_no_reply; + } + + /* Convert to host endianness. */ + msg = (typeof(msg)) header_view.data; + query_flags = be64toh(msg->query_flags); + + if (query_flags) { + ret = LTTNG_ERR_INVALID_PROTOCOL; + goto reply; + } + if (opt_allow_clear) { + result_flags |= LTTCOMM_RELAYD_CONFIGURATION_FLAG_CLEAR_ALLOWED; + } + ret = 0; +reply: + reply = (typeof(reply)){ + .generic.ret_code = htobe32((uint32_t) + (ret == 0 ? LTTNG_OK : LTTNG_ERR_INVALID_PROTOCOL)), + .relayd_configuration_flags = htobe64(result_flags), + }; + send_ret = conn->sock->ops->sendmsg( + conn->sock, &reply, sizeof(reply), 0); + if (send_ret < (ssize_t) sizeof(reply)) { + ERR("Failed to send \"get configuration\" command reply (ret = %zd)", + send_ret); + ret = -1; + } +end_no_reply: + return ret; +} + #define DBG_CMD(cmd_name, conn) \ DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); @@ -2875,6 +3075,10 @@ static int relay_process_control_command(struct relay_connection *conn, DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn); ret = relay_trace_chunk_exists(header, conn, payload); break; + case RELAYD_GET_CONFIGURATION: + DBG_CMD("RELAYD_GET_CONFIGURATION", conn); + ret = relay_get_configuration(header, conn, payload); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", header->cmd); @@ -3754,6 +3958,14 @@ int main(int argc, char **argv) goto exit_options; } + ret = fclose(stdin); + if (ret) { + PERROR("Failed to close stdin"); + goto exit_options; + } + + DBG("Clear command %s", opt_allow_clear ? "allowed" : "disallowed"); + /* Try to create directory if -o, --output is specified. */ if (opt_output_path) { if (*opt_output_path != '/') {