X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=61f206fdf3c863041523ad6d92808d65a3033f33;hp=dcaaaa8dc0ec2c284eb211ebde21ab7234745b31;hb=f056029cc220cf6f218a29d48d9720f6603302bf;hpb=03e431550191df8609f921c7b4054c57ee4644d8 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index dcaaaa8dc..61f206fdf 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -18,7 +18,6 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -40,7 +39,6 @@ #include #include #include -#include #include #include @@ -56,7 +54,7 @@ #include #include #include -#include +#include #include #include "cmd.h" @@ -72,6 +70,15 @@ #include "stream.h" #include "connection.h" #include "tracefile-array.h" +#include "tcp_keep_alive.h" + +static const char *help_msg = +#ifdef LTTNG_EMBED_HELP +#include +#else +NULL +#endif +; /* command line options */ char *opt_output_path; @@ -83,6 +90,10 @@ static int opt_daemon, opt_background; */ #define NR_LTTNG_RELAY_READY 3 static int lttng_relay_ready = NR_LTTNG_RELAY_READY; + +/* Size of receive buffer. */ +#define RECV_DATA_BUFFER_SIZE 65536 + static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */ static pid_t child_ppid; /* Internal parent PID use with daemonize. */ @@ -159,28 +170,11 @@ static struct option long_options[] = { { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, { "config", 1, 0, 'f' }, + { "version", 0, 0, 'V' }, { NULL, 0, 0, 0, }, }; -static const char *config_ignore_options[] = { "help", "config" }; - -/* - * usage function on stderr - */ -static void usage(void) -{ - fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname); - fprintf(stderr, " -h, --help Display this usage.\n"); - fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); - fprintf(stderr, " -b, --background Start as a daemon, keeping console open.\n"); - fprintf(stderr, " -C, --control-port URL Control port listening.\n"); - fprintf(stderr, " -D, --data-port URL Data port listening.\n"); - fprintf(stderr, " -L, --live-port URL Live view port listening.\n"); - fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n"); - fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); - fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n"); - fprintf(stderr, " -f --config Load daemon configuration file\n"); -} +static const char *config_ignore_options[] = { "help", "config", "version" }; /* * Take an option from the getopt output and set it in the right variable to be @@ -265,8 +259,15 @@ static int set_option(int opt, const char *arg, const char *optname) } break; case 'h': - usage(); + ret = utils_show_help(8, "lttng-relayd", help_msg); + if (ret) { + ERR("Cannot show --help for `lttng-relayd`"); + perror("exec"); + } exit(EXIT_FAILURE); + case 'V': + fprintf(stdout, "%s\n", VERSION); + exit(EXIT_SUCCESS); case 'o': if (lttng_is_setuid_setgid()) { WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.", @@ -307,7 +308,7 @@ end: /* * config_entry_handler_cb used to handle options read from a config file. - * See config_entry_handler_cb comment in common/config/config.h for the + * See config_entry_handler_cb comment in common/config/session-config.h for the * return value conventions. */ static int config_entry_handler(const struct config_entry *entry, void *unused) @@ -594,9 +595,6 @@ int lttng_relay_stop_threads(void) static void sighandler(int sig) { switch (sig) { - case SIGPIPE: - DBG("SIGPIPE caught"); - return; case SIGINT: DBG("SIGINT caught"); if (lttng_relay_stop_threads()) { @@ -632,9 +630,10 @@ static int set_signal_handler(void) return ret; } - sa.sa_handler = sighandler; sa.sa_mask = sigset; sa.sa_flags = 0; + + sa.sa_handler = sighandler; if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; @@ -645,12 +644,13 @@ static int set_signal_handler(void) return ret; } - if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) { + if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; } - if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) { + sa.sa_handler = SIG_IGN; + if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) { PERROR("sigaction"); return ret; } @@ -900,6 +900,15 @@ restart: lttcomm_destroy_sock(newsock); goto error; } + + ret = socket_apply_keep_alive_config(newsock->fd); + if (ret < 0) { + ERR("Failed to apply TCP keep-alive configuration on socket (%i)", + newsock->fd); + lttcomm_destroy_sock(newsock); + goto error; + } + new_conn = connection_create(newsock, type); if (!new_conn) { lttcomm_destroy_sock(newsock); @@ -978,12 +987,16 @@ static void *relay_thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_conn_queue.futex); + if (CMM_LOAD_SHARED(dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -1038,7 +1051,8 @@ error_testpoint: * Set index data from the control port to a given index object. */ static int set_index_control_data(struct relay_index *index, - struct lttcomm_relayd_index *data) + struct lttcomm_relayd_index *data, + struct relay_connection *conn) { struct ctf_packet_index index_data; @@ -1054,6 +1068,12 @@ static int set_index_control_data(struct relay_index *index, index_data.timestamp_end = data->timestamp_end; index_data.events_discarded = data->events_discarded; index_data.stream_id = data->stream_id; + + if (conn->minor >= 8) { + index->index_data.stream_instance_id = data->stream_instance_id; + index->index_data.packet_seq_num = data->packet_seq_num; + } + return relay_index_set_data(index, &index_data); } @@ -1068,13 +1088,13 @@ static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, int ret = 0, send_ret; struct relay_session *session; struct lttcomm_relayd_status_session reply; - char session_name[NAME_MAX]; - char hostname[HOST_NAME_MAX]; + char session_name[LTTNG_NAME_MAX]; + char hostname[LTTNG_HOST_NAME_MAX]; uint32_t live_timer = 0; bool snapshot = false; - memset(session_name, 0, NAME_MAX); - memset(hostname, 0, HOST_NAME_MAX); + memset(session_name, 0, LTTNG_NAME_MAX); + memset(hostname, 0, LTTNG_HOST_NAME_MAX); memset(&reply, 0, sizeof(reply)); @@ -1325,6 +1345,90 @@ end_no_session: return ret; } +/* + * relay_reset_metadata: reset a metadata stream + */ +static +int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret, send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_reset_metadata stream_info; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + + DBG("Reset metadata received"); + + if (!session || conn->version_check_done == 0) { + ERR("Trying to reset a metadata stream before version check"); + ret = -1; + goto end_no_session; + } + + ret = conn->sock->ops->recvmsg(conn->sock, &stream_info, + sizeof(struct lttcomm_relayd_reset_metadata), 0); + if (ret < sizeof(struct lttcomm_relayd_reset_metadata)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid reset_metadata struct " + "size : %d", ret); + } + ret = -1; + goto end_no_session; + } + DBG("Update metadata to version %" PRIu64, be64toh(stream_info.version)); + + /* Unsupported for live sessions for now. */ + if (session->live_timer != 0) { + ret = -1; + goto end; + } + + stream = stream_get_by_id(be64toh(stream_info.stream_id)); + if (!stream) { + ret = -1; + goto end; + } + pthread_mutex_lock(&stream->lock); + if (!stream->is_metadata) { + ret = -1; + goto end_unlock; + } + + ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, + 0, 0, -1, -1, stream->stream_fd->fd, NULL, + &stream->stream_fd->fd); + if (ret < 0) { + ERR("Failed to rotate metadata file %s of channel %s", + stream->path_name, stream->channel_name); + goto end_unlock; + } + +end_unlock: + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (send_ret < 0) { + ERR("Relay sending reset metadata reply"); + ret = send_ret; + } + +end_no_session: + return ret; +} + /* * relay_unknown_command: send -1 if received unknown command */ @@ -1864,6 +1968,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; uint64_t net_seq_num; + size_t msg_len; assert(conn); @@ -1875,9 +1980,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } + msg_len = lttcomm_relayd_index_len( + lttng_to_index_major(conn->major, conn->minor), + lttng_to_index_minor(conn->major, conn->minor)); ret = conn->sock->ops->recvmsg(conn->sock, &index_info, - sizeof(index_info), 0); - if (ret < sizeof(index_info)) { + msg_len, 0); + if (ret < msg_len) { if (ret == 0) { /* Orderly shutdown. Not necessary to print an error. */ DBG("Socket %d did an orderly shutdown", conn->sock->fd); @@ -1927,7 +2035,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, ERR("relay_index_get_by_id_or_create index NULL"); goto end_stream_put; } - if (set_index_control_data(index, &index_info)) { + if (set_index_control_data(index, &index_info, conn)) { ERR("set_index_control_data error"); relay_index_put(index); ret = -1; @@ -2055,6 +2163,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_STREAMS_SENT: ret = relay_streams_sent(recv_hdr, conn); break; + case RELAYD_RESET_METADATA: + ret = relay_reset_metadata(recv_hdr, conn); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -2084,8 +2195,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* Get data offset because we are about to update the index. */ data_offset = htobe64(stream->tracefile_size_current); - DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64, - stream->stream_handle, stream->tracefile_size_current); + DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64, + stream->stream_handle, net_seq_num, stream->tracefile_size_current); /* * Lookup for an existing index for that stream id/sequence @@ -2098,41 +2209,36 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto end; } - if (rotate_index || !stream->index_fd) { - int fd; + if (rotate_index || !stream->index_file) { + uint32_t major, minor; - /* Put ref on previous index_fd. */ - if (stream->index_fd) { - stream_fd_put(stream->index_fd); - stream->index_fd = NULL; + /* Put ref on previous index_file. */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; } - - fd = index_create_file(stream->path_name, stream->channel_name, + major = stream->trace->session->major; + minor = stream->trace->session->minor; + stream->index_file = lttng_index_file_create(stream->path_name, + stream->channel_name, -1, -1, stream->tracefile_size, - tracefile_array_get_file_index_head(stream->tfa)); - if (fd < 0) { + tracefile_array_get_file_index_head(stream->tfa), + lttng_to_index_major(major, minor), + lttng_to_index_minor(major, minor)); + if (!stream->index_file) { ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); - goto end; - } - stream->index_fd = stream_fd_create(fd); - if (!stream->index_fd) { - ret = -1; - if (close(fd)) { - PERROR("Error closing FD %d", fd); - } - /* Put self-ref for this index due to error. */ - relay_index_put(index); - /* Will put the local ref. */ + index = NULL; goto end; } } - if (relay_index_set_fd(index, stream->index_fd, data_offset)) { + if (relay_index_set_file(index, stream->index_file, data_offset)) { ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; goto end; } @@ -2146,6 +2252,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, } else { /* Put self-ref for this index due to error. */ relay_index_put(index); + index = NULL; ret = -1; } end: @@ -2166,6 +2273,9 @@ static int relay_process_data(struct relay_connection *conn) uint32_t data_size; struct relay_session *session; bool new_stream = false, close_requested = false; + size_t chunk_size = RECV_DATA_BUFFER_SIZE; + size_t recv_off = 0; + char data_buffer[chunk_size]; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2183,39 +2293,17 @@ static int relay_process_data(struct relay_connection *conn) stream_id = be64toh(data_hdr.stream_id); stream = stream_get_by_id(stream_id); if (!stream) { + ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id); ret = -1; goto end; } session = stream->trace->session; data_size = be32toh(data_hdr.data_size); - if (data_buffer_size < data_size) { - char *tmp_data_ptr; - - tmp_data_ptr = realloc(data_buffer, data_size); - if (!tmp_data_ptr) { - ERR("Allocating data buffer"); - free(data_buffer); - ret = -1; - goto end_stream_put; - } - data_buffer = tmp_data_ptr; - data_buffer_size = data_size; - } - memset(data_buffer, 0, data_size); net_seq_num = be64toh(data_hdr.net_seq_num); DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, data_size, stream_id, net_seq_num); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret <= 0) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } - ret = -1; - goto end_stream_put; - } pthread_mutex_lock(&stream->lock); @@ -2255,24 +2343,45 @@ static int relay_process_data(struct relay_connection *conn) if (session->minor >= 4 && !session->snapshot) { ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { + ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } } - /* Write data to stream output fd. */ - size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size); - if (size_ret < data_size) { - ERR("Relay error writing data to file"); - ret = -1; - goto end_stream_unlock; - } + for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) { + size_t recv_size = min(data_size - recv_off, chunk_size); + + ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0); + if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Socket %d error %d", conn->sock->fd, ret); + } + ret = -1; + goto end_stream_unlock; + } + + /* Write data to stream output fd. */ + size_ret = lttng_write(stream->stream_fd->fd, data_buffer, + recv_size); + if (size_ret < recv_size) { + ERR("Relay error writing data to file"); + ret = -1; + goto end_stream_unlock; + } - DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, - size_ret, stream->stream_handle); + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, + size_ret, stream->stream_handle); + } ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size)); if (ret < 0) { + ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", + stream->stream_handle, net_seq_num, ret); goto end_stream_unlock; } stream->tracefile_size_current += @@ -2295,7 +2404,6 @@ end_stream_unlock: uatomic_set(&session->new_streams, 1); pthread_mutex_unlock(&session->lock); } -end_stream_put: stream_put(stream); end: return ret; @@ -2594,6 +2702,11 @@ error: destroy_conn, sock_n.node) { health_code_update(); + + if (session_abort(destroy_conn->session)) { + assert(0); + } + /* * No need to grab another ref, because we own * destroy_conn. @@ -2613,7 +2726,6 @@ relay_connections_ht_error: DBG("Thread exited with error"); } DBG("Worker thread cleanup complete"); - free(data_buffer); error_testpoint: if (err) { health_error(); @@ -2696,7 +2808,6 @@ int main(int argc, char **argv) } } - /* Initialize thread health monitoring */ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); if (!health_relayd) { @@ -2711,15 +2822,6 @@ int main(int argc, char **argv) goto exit_init_data; } - /* Check if daemon is UID = 0 */ - if (!getuid()) { - if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) { - ERR("Need to be root to use ports < 1024"); - retval = -1; - goto exit_init_data; - } - } - /* Setup the thread apps communication pipe. */ if (create_relay_conn_pipe()) { retval = -1; @@ -2729,12 +2831,6 @@ int main(int argc, char **argv) /* Init relay command queue. */ cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail); - /* Set up max poll set size */ - if (lttng_poll_set_max_size()) { - retval = -1; - goto exit_init_data; - } - /* Initialize communication library */ lttcomm_init(); lttcomm_inet_init(); @@ -2767,7 +2863,7 @@ int main(int argc, char **argv) } /* Create thread to manage the client socket */ - ret = pthread_create(&health_thread, NULL, + ret = pthread_create(&health_thread, default_pthread_attr(), thread_manage_health, (void *) NULL); if (ret) { errno = ret; @@ -2777,7 +2873,7 @@ int main(int argc, char **argv) } /* Setup the dispatcher thread */ - ret = pthread_create(&dispatcher_thread, NULL, + ret = pthread_create(&dispatcher_thread, default_pthread_attr(), relay_thread_dispatcher, (void *) NULL); if (ret) { errno = ret; @@ -2787,7 +2883,7 @@ int main(int argc, char **argv) } /* Setup the worker thread */ - ret = pthread_create(&worker_thread, NULL, + ret = pthread_create(&worker_thread, default_pthread_attr(), relay_thread_worker, NULL); if (ret) { errno = ret; @@ -2797,7 +2893,7 @@ int main(int argc, char **argv) } /* Setup the listener thread */ - ret = pthread_create(&listener_thread, NULL, + ret = pthread_create(&listener_thread, default_pthread_attr(), relay_thread_listener, (void *) NULL); if (ret) { errno = ret; @@ -2863,6 +2959,12 @@ exit_init_data: health_app_destroy(health_relayd); exit_health_app_create: exit_options: + /* + * Wait for all pending call_rcu work to complete before tearing + * down data structures. call_rcu worker may be trying to + * perform lookups in those structures. + */ + rcu_barrier(); relayd_cleanup(); /* Ensure all prior call_rcu are done. */