X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=e8f3087b11bd95e966be7e4e5c9dcf9ad1eb5a1e;hp=e4ded2b8655d17c5f270bd67cfffca65697766f8;hb=1a1a34b40ab10a195633b1ed5e2e9b42fdae0a78;hpb=234cd6367843a2106a4cb10f8fb99443208516df diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index e4ded2b86..e8f3087b1 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -81,6 +81,10 @@ static int opt_daemon, opt_background; */ #define NR_LTTNG_RELAY_READY 3 static int lttng_relay_ready = NR_LTTNG_RELAY_READY; + +/* Size of receive buffer. */ +#define RECV_DATA_BUFFER_SIZE 65536 + static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */ static pid_t child_ppid; /* Internal parent PID use with daemonize. */ @@ -162,24 +166,6 @@ static struct option long_options[] = { static const char *config_ignore_options[] = { "help", "config" }; -/* - * usage function on stderr - */ -static void usage(void) -{ - fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname); - fprintf(stderr, " -h, --help Display this usage.\n"); - fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); - fprintf(stderr, " -b, --background Start as a daemon, keeping console open.\n"); - fprintf(stderr, " -C, --control-port URL Control port listening.\n"); - fprintf(stderr, " -D, --data-port URL Data port listening.\n"); - fprintf(stderr, " -L, --live-port URL Live view port listening.\n"); - fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n"); - fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); - fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n"); - fprintf(stderr, " -f --config Load daemon configuration file\n"); -} - /* * Take an option from the getopt output and set it in the right variable to be * used later. @@ -263,7 +249,11 @@ static int set_option(int opt, const char *arg, const char *optname) } break; case 'h': - usage(); + ret = utils_show_man_page(8, "lttng-relayd"); + if (ret) { + ERR("Cannot view man page lttng-relayd(8)"); + perror("exec"); + } exit(EXIT_FAILURE); case 'o': if (lttng_is_setuid_setgid()) { @@ -1330,6 +1320,90 @@ end_no_session: return ret; } +/* + * relay_reset_metadata: reset a metadata stream + */ +static +int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + int ret, send_ret; + struct relay_session *session = conn->session; + struct lttcomm_relayd_reset_metadata stream_info; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + + DBG("Reset metadata received"); + + if (!session || conn->version_check_done == 0) { + ERR("Trying to reset a metadata stream before version check"); + ret = -1; + goto end_no_session; + } + + ret = conn->sock->ops->recvmsg(conn->sock, &stream_info, + sizeof(struct lttcomm_relayd_reset_metadata), 0); + if (ret < sizeof(struct lttcomm_relayd_reset_metadata)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid reset_metadata struct " + "size : %d", ret); + } + ret = -1; + goto end_no_session; + } + DBG("Update metadata to version %" PRIu64, be64toh(stream_info.version)); + + /* Unsupported for live sessions for now. */ + if (session->live_timer != 0) { + ret = -1; + goto end; + } + + stream = stream_get_by_id(be64toh(stream_info.stream_id)); + if (!stream) { + ret = -1; + goto end; + } + pthread_mutex_lock(&stream->lock); + if (!stream->is_metadata) { + ret = -1; + goto end_unlock; + } + + ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, + 0, 0, -1, -1, stream->stream_fd->fd, NULL, + &stream->stream_fd->fd); + if (ret < 0) { + ERR("Failed to rotate metadata file %s of channel %s", + stream->path_name, stream->channel_name); + goto end_unlock; + } + +end_unlock: + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + +end: + memset(&reply, 0, sizeof(reply)); + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (send_ret < 0) { + ERR("Relay sending reset metadata reply"); + ret = send_ret; + } + +end_no_session: + return ret; +} + /* * relay_unknown_command: send -1 if received unknown command */ @@ -2060,6 +2134,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_STREAMS_SENT: ret = relay_streams_sent(recv_hdr, conn); break; + case RELAYD_RESET_METADATA: + ret = relay_reset_metadata(recv_hdr, conn); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -2171,6 +2248,9 @@ static int relay_process_data(struct relay_connection *conn) uint32_t data_size; struct relay_session *session; bool new_stream = false, close_requested = false; + size_t chunk_size = RECV_DATA_BUFFER_SIZE; + size_t recv_off = 0; + char data_buffer[chunk_size]; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2194,36 +2274,11 @@ static int relay_process_data(struct relay_connection *conn) } session = stream->trace->session; data_size = be32toh(data_hdr.data_size); - if (data_buffer_size < data_size) { - char *tmp_data_ptr; - - tmp_data_ptr = realloc(data_buffer, data_size); - if (!tmp_data_ptr) { - ERR("Allocating data buffer"); - free(data_buffer); - ret = -1; - goto end_stream_put; - } - data_buffer = tmp_data_ptr; - data_buffer_size = data_size; - } - memset(data_buffer, 0, data_size); net_seq_num = be64toh(data_hdr.net_seq_num); DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, data_size, stream_id, net_seq_num); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret <= 0) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Socket %d error %d", conn->sock->fd, ret); - } - ret = -1; - goto end_stream_put; - } pthread_mutex_lock(&stream->lock); @@ -2269,16 +2324,33 @@ static int relay_process_data(struct relay_connection *conn) } } - /* Write data to stream output fd. */ - size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size); - if (size_ret < data_size) { - ERR("Relay error writing data to file"); - ret = -1; - goto end_stream_unlock; - } + for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) { + size_t recv_size = min(data_size - recv_off, chunk_size); - DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, - size_ret, stream->stream_handle); + ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0); + if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Socket %d error %d", conn->sock->fd, ret); + } + ret = -1; + goto end_stream_unlock; + } + + /* Write data to stream output fd. */ + size_ret = lttng_write(stream->stream_fd->fd, data_buffer, + recv_size); + if (size_ret < recv_size) { + ERR("Relay error writing data to file"); + ret = -1; + goto end_stream_unlock; + } + + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, + size_ret, stream->stream_handle); + } ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size)); @@ -2307,7 +2379,6 @@ end_stream_unlock: uatomic_set(&session->new_streams, 1); pthread_mutex_unlock(&session->lock); } -end_stream_put: stream_put(stream); end: return ret; @@ -2625,7 +2696,6 @@ relay_connections_ht_error: DBG("Thread exited with error"); } DBG("Worker thread cleanup complete"); - free(data_buffer); error_testpoint: if (err) { health_error(); @@ -2708,7 +2778,6 @@ int main(int argc, char **argv) } } - /* Initialize thread health monitoring */ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); if (!health_relayd) { @@ -2723,15 +2792,6 @@ int main(int argc, char **argv) goto exit_init_data; } - /* Check if daemon is UID = 0 */ - if (!getuid()) { - if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) { - ERR("Need to be root to use ports < 1024"); - retval = -1; - goto exit_init_data; - } - } - /* Setup the thread apps communication pipe. */ if (create_relay_conn_pipe()) { retval = -1; @@ -2741,12 +2801,6 @@ int main(int argc, char **argv) /* Init relay command queue. */ cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail); - /* Set up max poll set size */ - if (lttng_poll_set_max_size()) { - retval = -1; - goto exit_init_data; - } - /* Initialize communication library */ lttcomm_init(); lttcomm_inet_init(); @@ -2779,7 +2833,7 @@ int main(int argc, char **argv) } /* Create thread to manage the client socket */ - ret = pthread_create(&health_thread, NULL, + ret = pthread_create(&health_thread, default_pthread_attr(), thread_manage_health, (void *) NULL); if (ret) { errno = ret; @@ -2789,7 +2843,7 @@ int main(int argc, char **argv) } /* Setup the dispatcher thread */ - ret = pthread_create(&dispatcher_thread, NULL, + ret = pthread_create(&dispatcher_thread, default_pthread_attr(), relay_thread_dispatcher, (void *) NULL); if (ret) { errno = ret; @@ -2799,7 +2853,7 @@ int main(int argc, char **argv) } /* Setup the worker thread */ - ret = pthread_create(&worker_thread, NULL, + ret = pthread_create(&worker_thread, default_pthread_attr(), relay_thread_worker, NULL); if (ret) { errno = ret; @@ -2809,7 +2863,7 @@ int main(int argc, char **argv) } /* Setup the listener thread */ - ret = pthread_create(&listener_thread, NULL, + ret = pthread_create(&listener_thread, default_pthread_attr(), relay_thread_listener, (void *) NULL); if (ret) { errno = ret;