X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.cpp;h=41c5e1d4419d740b452528410b2345156717cfbe;hb=0114db0ec2407029052eb61a0189c9b1cd64d520;hp=dbf69a0e6537f119992ca004ec34e8e25bd3b36f;hpb=8cd15f6adb2b119ef9ea231363d74818e14ecffc;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index dbf69a0e6..41c5e1d44 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -36,44 +36,45 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "backward-compatibility-group-by.h" -#include "cmd.h" -#include "connection.h" -#include "ctf-trace.h" -#include "health-relayd.h" -#include "index.h" -#include "live.h" -#include "lttng-relayd.h" -#include "session.h" -#include "sessiond-trace-chunks.h" -#include "stream.h" -#include "tcp_keep_alive.h" -#include "testpoint.h" -#include "tracefile-array.h" -#include "utils.h" -#include "version.h" -#include "viewer-stream.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "backward-compatibility-group-by.hpp" +#include "cmd.hpp" +#include "connection.hpp" +#include "ctf-trace.hpp" +#include "health-relayd.hpp" +#include "index.hpp" +#include "live.hpp" +#include "lttng-relayd.hpp" +#include "session.hpp" +#include "sessiond-trace-chunks.hpp" +#include "stream.hpp" +#include "tcp_keep_alive.hpp" +#include "testpoint.hpp" +#include "tracefile-array.hpp" +#include "utils.hpp" +#include "version.hpp" +#include "viewer-stream.hpp" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -404,7 +405,8 @@ end: * See config_entry_handler_cb comment in common/config/session-config.h for the * return value conventions. */ -static int config_entry_handler(const struct config_entry *entry, void *unused) +static int config_entry_handler(const struct config_entry *entry, + void *unused __attribute__((unused))) { int ret = 0, i; @@ -675,7 +677,8 @@ static void print_global_objects(void) print_sessions(); } -static int noop_close(void *data, int *fds) +static int noop_close(void *data __attribute__((unused)), + int *fds __attribute__((unused))) { return 0; } @@ -972,7 +975,7 @@ end: return ret; } -static int close_sock(void *data, int *in_fd) +static int close_sock(void *data, int *in_fd __attribute__((unused))) { struct lttcomm_sock *sock = (lttcomm_sock *) data; @@ -1084,7 +1087,7 @@ end: /* * This thread manages the listening for new connections on the network */ -static void *relay_thread_listener(void *data) +static void *relay_thread_listener(void *data __attribute__((unused))) { int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; @@ -1288,7 +1291,7 @@ error_sock_control: /* * This thread manages the dispatching of the requests to worker threads */ -static void *relay_thread_dispatcher(void *data) +static void *relay_thread_dispatcher(void *data __attribute__((unused))) { int err = -1; ssize_t ret; @@ -1326,7 +1329,7 @@ static void *relay_thread_dispatcher(void *data) /* Continue thread execution */ break; } - new_conn = caa_container_of(node, struct relay_connection, qnode); + new_conn = lttng::utils::container_of(node, &relay_connection::qnode); DBG("Dispatching request waiting on sock %d", new_conn->sock->fd); @@ -1375,7 +1378,8 @@ static bool session_streams_have_index(const struct relay_session *session) * * On success, send back the session id or else return a negative value. */ -static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_create_session( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -1567,7 +1571,8 @@ end: /* * relay_add_stream: allocate a new stream for a session */ -static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_add_stream( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -1706,7 +1711,8 @@ end_no_session: /* * relay_close_stream: close a specific stream */ -static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_close_stream( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -1784,7 +1790,8 @@ end_no_session: * relay_reset_metadata: reset a metadata stream */ static -int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr, +int relay_reset_metadata( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -1883,9 +1890,10 @@ static void relay_unknown_command(struct relay_connection *conn) * relay_start: send an acknowledgment to the client to tell if we are * ready to receive data. We are ready if a session is established. */ -static int relay_start(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_start( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, - const struct lttng_buffer_view *payload) + const struct lttng_buffer_view *payload __attribute__((unused))) { int ret = 0; ssize_t send_ret; @@ -1976,7 +1984,8 @@ end: /* * relay_send_version: send relayd version number */ -static int relay_send_version(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_send_version( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2046,7 +2055,8 @@ end: /* * Check for data pending for a given stream id from the session daemon. */ -static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_data_pending( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2135,7 +2145,8 @@ end_no_session: * the control socket has been handled. So, this is why we simply return * OK here. */ -static int relay_quiescent_control(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_quiescent_control( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2275,7 +2286,8 @@ end_no_session: * * Return to the client if there is data in flight or not with a ret_code. */ -static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_end_data_pending( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2367,7 +2379,8 @@ end_no_session: * * Return 0 on success else a negative value. */ -static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_recv_index( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2455,9 +2468,10 @@ end_no_session: * * Return 0 on success else a negative value. */ -static int relay_streams_sent(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_streams_sent( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, - const struct lttng_buffer_view *payload) + const struct lttng_buffer_view *payload __attribute__((unused))) { int ret; ssize_t send_ret; @@ -2529,6 +2543,7 @@ static ssize_t relay_unpack_rotate_streams_header( sizeof(rotate_streams.stream_count)); rotate_streams = (typeof(rotate_streams)) { .stream_count = be32toh(rotate_streams.stream_count), + .new_chunk_id = LTTNG_OPTIONAL_INIT_UNSET, }; /* @@ -2619,7 +2634,7 @@ error: * session rotation feature (not the tracefile rotation feature). */ static int relay_rotate_session_streams( - const struct lttcomm_relayd_hdr *recv_hdr, + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2663,7 +2678,10 @@ static int relay_rotate_session_streams( */ next_trace_chunk = sessiond_trace_chunk_registry_get_chunk( sessiond_trace_chunk_registry, - session->sessiond_uuid, session->id, + session->sessiond_uuid, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, rotate_streams.new_chunk_id.value); if (!next_trace_chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -2754,7 +2772,8 @@ end_no_reply: /* * relay_create_trace_chunk: create a new trace chunk */ -static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_create_trace_chunk( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2885,7 +2904,9 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, published_chunk = sessiond_trace_chunk_registry_publish_chunk( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, - conn->session->id, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, chunk); if (!published_chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -2938,7 +2959,8 @@ end_no_reply: /* * relay_close_trace_chunk: close a trace chunk */ -static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_close_trace_chunk( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -2991,7 +3013,9 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, chunk = sessiond_trace_chunk_registry_get_chunk( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, - conn->session->id, + conn->session->id_sessiond.is_set ? + conn->session->id_sessiond.value : + conn->session->id, chunk_id); if (!chunk) { char uuid_str[LTTNG_UUID_STR_LEN]; @@ -3192,7 +3216,8 @@ end_no_reply: /* * relay_trace_chunk_exists: check if a trace chunk exists */ -static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_trace_chunk_exists( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -3255,7 +3280,8 @@ end_no_reply: /* * relay_get_configuration: query whether feature is available */ -static int relay_get_configuration(const struct lttcomm_relayd_hdr *recv_hdr, +static int relay_get_configuration( + const struct lttcomm_relayd_hdr *recv_hdr __attribute__((unused)), struct relay_connection *conn, const struct lttng_buffer_view *payload) { @@ -3301,86 +3327,68 @@ end_no_reply: return ret; } -#define DBG_CMD(cmd_name, conn) \ - DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd); - static int relay_process_control_command(struct relay_connection *conn, const struct lttcomm_relayd_hdr *header, const struct lttng_buffer_view *payload) { int ret = 0; + DBG3("Processing \"%s\" command for socket %i", + lttcomm_relayd_command_str((lttcomm_relayd_command) header->cmd), + conn->sock->fd); switch (header->cmd) { case RELAYD_CREATE_SESSION: - DBG_CMD("RELAYD_CREATE_SESSION", conn); ret = relay_create_session(header, conn, payload); break; case RELAYD_ADD_STREAM: - DBG_CMD("RELAYD_ADD_STREAM", conn); ret = relay_add_stream(header, conn, payload); break; case RELAYD_START_DATA: - DBG_CMD("RELAYD_START_DATA", conn); ret = relay_start(header, conn, payload); break; case RELAYD_SEND_METADATA: - DBG_CMD("RELAYD_SEND_METADATA", conn); ret = relay_recv_metadata(header, conn, payload); break; case RELAYD_VERSION: - DBG_CMD("RELAYD_VERSION", conn); ret = relay_send_version(header, conn, payload); break; case RELAYD_CLOSE_STREAM: - DBG_CMD("RELAYD_CLOSE_STREAM", conn); ret = relay_close_stream(header, conn, payload); break; case RELAYD_DATA_PENDING: - DBG_CMD("RELAYD_DATA_PENDING", conn); ret = relay_data_pending(header, conn, payload); break; case RELAYD_QUIESCENT_CONTROL: - DBG_CMD("RELAYD_QUIESCENT_CONTROL", conn); ret = relay_quiescent_control(header, conn, payload); break; case RELAYD_BEGIN_DATA_PENDING: - DBG_CMD("RELAYD_BEGIN_DATA_PENDING", conn); ret = relay_begin_data_pending(header, conn, payload); break; case RELAYD_END_DATA_PENDING: - DBG_CMD("RELAYD_END_DATA_PENDING", conn); ret = relay_end_data_pending(header, conn, payload); break; case RELAYD_SEND_INDEX: - DBG_CMD("RELAYD_SEND_INDEX", conn); ret = relay_recv_index(header, conn, payload); break; case RELAYD_STREAMS_SENT: - DBG_CMD("RELAYD_STREAMS_SENT", conn); ret = relay_streams_sent(header, conn, payload); break; case RELAYD_RESET_METADATA: - DBG_CMD("RELAYD_RESET_METADATA", conn); ret = relay_reset_metadata(header, conn, payload); break; case RELAYD_ROTATE_STREAMS: - DBG_CMD("RELAYD_ROTATE_STREAMS", conn); ret = relay_rotate_session_streams(header, conn, payload); break; case RELAYD_CREATE_TRACE_CHUNK: - DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn); ret = relay_create_trace_chunk(header, conn, payload); break; case RELAYD_CLOSE_TRACE_CHUNK: - DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn); ret = relay_close_trace_chunk(header, conn, payload); break; case RELAYD_TRACE_CHUNK_EXISTS: - DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn); ret = relay_trace_chunk_exists(header, conn, payload); break; case RELAYD_GET_CONFIGURATION: - DBG_CMD("RELAYD_GET_CONFIGURATION", conn); ret = relay_get_configuration(header, conn, payload); break; case RELAYD_UPDATE_SYNC_INFO: @@ -3415,7 +3423,10 @@ static enum relay_connection_status relay_process_control_receive_payload( reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive command payload on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; @@ -3488,7 +3499,10 @@ static enum relay_connection_status relay_process_control_receive_header( reception_buffer->data + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive control command header on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; @@ -3528,9 +3542,9 @@ static enum relay_connection_status relay_process_control_receive_header( memcpy(&conn->protocol.ctrl.state.receive_payload.header, &header, sizeof(header)); - DBG("Done receiving control command header: fd = %i, cmd = %" PRIu32 ", cmd_version = %" PRIu32 ", payload size = %" PRIu64 " bytes", - conn->sock->fd, header.cmd, header.cmd_version, - header.data_size); + DBG("Done receiving control command header: fd = %i, cmd = %s, cmd_version = %" PRIu32 ", payload size = %" PRIu64 " bytes", + conn->sock->fd, lttcomm_relayd_command_str((enum lttcomm_relayd_command) header.cmd), + header.cmd_version, header.data_size); if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) { ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.", @@ -3599,7 +3613,10 @@ static enum relay_connection_status relay_process_data_receive_header( state->header_reception_buffer + state->received, state->left_to_receive, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Unable to receive data header on sock %d", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } @@ -3726,7 +3743,10 @@ static enum relay_connection_status relay_process_data_receive_payload( ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, MSG_DONTWAIT); if (ret < 0) { + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP if (errno != EAGAIN && errno != EWOULDBLOCK) { + DIAGNOSTIC_POP PERROR("Socket %d error", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } @@ -3895,7 +3915,7 @@ static void relay_thread_close_connection(struct lttng_poll_event *events, /* * This thread does the actual work */ -static void *relay_thread_worker(void *data) +static void *relay_thread_worker(void *data __attribute__((unused))) { int ret, err = -1, last_seen_data_fd = -1; uint32_t nb_fd; @@ -4223,7 +4243,7 @@ static int create_relay_conn_pipe(void) "Relayd connection pipe", relay_conn_pipe); } -static int stdio_open(void *data, int *fds) +static int stdio_open(void *data __attribute__((unused)), int *fds) { fds[0] = fileno(stdout); fds[1] = fileno(stderr); @@ -4422,7 +4442,7 @@ int main(int argc, char **argv) /* Create thread to manage the client socket */ ret = pthread_create(&health_thread, default_pthread_attr(), - thread_manage_health, (void *) NULL); + thread_manage_health_relayd, (void *) NULL); if (ret) { errno = ret; PERROR("pthread_create health");