X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=f943488be5b94309efd332196bd52e876d2eea1a;hb=43c5442ea57aca20701fd49fba1e3456d48cc7b2;hp=e7c57f32e2457595b31d20709837c5b7727741be;hpb=f385ae0a8f5c34cc33ca57cdb412393f90f9c0a8;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index e7c57f32e..f943488be 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -68,6 +68,8 @@ static struct lttng_uri *live_uri; const char *progname; +const char *tracing_group_name = DEFAULT_TRACING_GROUP; + /* * Quit pipe for all threads. This permits a single cancellation point * for all threads when receiving an event on the pipe. @@ -86,6 +88,7 @@ static int dispatch_thread_exit; static pthread_t listener_thread; static pthread_t dispatcher_thread; static pthread_t worker_thread; +static pthread_t health_thread; static uint64_t last_relay_stream_id; static uint64_t last_relay_session_id; @@ -116,7 +119,7 @@ struct lttng_ht *viewer_streams_ht; struct lttng_ht *indexes_ht; /* Relayd health monitoring */ -static struct health_app *health_relayd; +struct health_app *health_relayd; /* * usage function on stderr @@ -131,6 +134,7 @@ void usage(void) fprintf(stderr, " -D, --data-port URL Data port listening.\n"); fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n"); fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); + fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n"); } static @@ -144,6 +148,7 @@ int parse_args(int argc, char **argv) { "control-port", 1, 0, 'C', }, { "data-port", 1, 0, 'D', }, { "daemonize", 0, 0, 'd', }, + { "group", 1, 0, 'g', }, { "help", 0, 0, 'h', }, { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, @@ -152,7 +157,7 @@ int parse_args(int argc, char **argv) while (1) { int option_index = 0; - c = getopt_long(argc, argv, "dhv" "C:D:o:", + c = getopt_long(argc, argv, "dhv" "C:D:o:g:", long_options, &option_index); if (c == -1) { break; @@ -188,6 +193,9 @@ int parse_args(int argc, char **argv) case 'd': opt_daemon = 1; break; + case 'g': + tracing_group_name = optarg; + break; case 'h': usage(); exit(EXIT_FAILURE); @@ -297,6 +305,18 @@ int notify_thread_pipe(int wpipe) return ret; } +static void notify_health_quit_pipe(int *pipe) +{ + int ret; + + do { + ret = write(pipe[1], "4", 1); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != 1) { + PERROR("write relay health quit"); + } +} + /* * Stop all threads by closing the thread quit pipe. */ @@ -312,6 +332,8 @@ void stop_threads(void) ERR("write error on thread quit pipe"); } + notify_health_quit_pipe(health_quit_pipe); + /* Dispatch thread */ CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&relay_cmd_queue.futex); @@ -804,8 +826,7 @@ void deferred_free_session(struct rcu_head *head) * RCU read side lock MUST be acquired. If NO close_stream_check() was called * BEFORE the stream lock MUST be acquired. */ -static void destroy_stream(struct relay_stream *stream, - struct lttng_ht *ctf_traces_ht) +static void destroy_stream(struct relay_stream *stream) { int delret; struct relay_viewer_stream *vstream; @@ -843,7 +864,7 @@ static void destroy_stream(struct relay_stream *stream, delret = lttng_ht_del(relay_streams_ht, &iter); assert(!delret); iter.iter.node = &stream->ctf_trace_node.node; - delret = lttng_ht_del(ctf_traces_ht, &iter); + delret = lttng_ht_del(stream->ctf_traces_ht, &iter); assert(!delret); call_rcu(&stream->rcu_node, deferred_free_stream); DBG("Closed tracefile %d from close stream", stream->fd); @@ -876,7 +897,7 @@ void relay_delete_session(struct relay_command *cmd, } stream = caa_container_of(node, struct relay_stream, stream_n); if (stream->session == cmd->session) { - destroy_stream(stream, cmd->ctf_traces_ht); + destroy_stream(stream); } } @@ -1064,6 +1085,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->ctf_trace->metadata_stream = stream; } ctf_trace_assign(cmd->ctf_traces_ht, stream); + stream->ctf_traces_ht = cmd->ctf_traces_ht; lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); @@ -1150,7 +1172,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->close_flag = 1; if (close_stream_check(stream)) { - destroy_stream(stream, cmd->ctf_traces_ht); + destroy_stream(stream); } end_unlock: @@ -2066,7 +2088,7 @@ int relay_process_data(struct relay_command *cmd) /* Check if we need to close the FD */ if (close_stream_check(stream)) { - destroy_stream(stream, cmd->ctf_traces_ht); + destroy_stream(stream); } end_rcu_unlock: @@ -2108,9 +2130,17 @@ int relay_add_connection(int fd, struct lttng_poll_event *events, goto error_read; } - relay_connection->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING); - if (!relay_connection->ctf_traces_ht) { - goto error_read; + /* + * Only used by the control side and the reference is copied inside each + * stream from that connection. Thus a destroy HT must be done after every + * stream has been destroyed. + */ + if (relay_connection->type == RELAY_CONTROL) { + relay_connection->ctf_traces_ht = lttng_ht_new(0, + LTTNG_HT_TYPE_STRING); + if (!relay_connection->ctf_traces_ht) { + goto error_read; + } } lttng_ht_node_init_ulong(&relay_connection->sock_n, @@ -2135,7 +2165,6 @@ void deferred_free_connection(struct rcu_head *head) struct relay_command *relay_connection = caa_container_of(head, struct relay_command, rcu_node); - lttng_ht_destroy(relay_connection->ctf_traces_ht); lttcomm_destroy_sock(relay_connection->sock); free(relay_connection); } @@ -2149,12 +2178,13 @@ void relay_del_connection(struct lttng_ht *relay_connections_ht, ret = lttng_ht_del(relay_connections_ht, iter); assert(!ret); + if (relay_connection->type == RELAY_CONTROL) { relay_delete_session(relay_connection, sessions_ht); + lttng_ht_destroy(relay_connection->ctf_traces_ht); } - call_rcu(&relay_connection->rcu_node, - deferred_free_connection); + call_rcu(&relay_connection->rcu_node, deferred_free_connection); } /* @@ -2569,6 +2599,19 @@ int main(int argc, char **argv) goto exit_health_app_create; } + ret = utils_create_pipe(health_quit_pipe); + if (ret < 0) { + goto error_health_pipe; + } + + /* Create thread to manage the client socket */ + ret = pthread_create(&health_thread, NULL, + thread_manage_health, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create health"); + goto health_error; + } + /* Setup the dispatcher thread */ ret = pthread_create(&dispatcher_thread, NULL, relay_thread_dispatcher, (void *) NULL); @@ -2599,8 +2642,6 @@ int main(int argc, char **argv) goto exit_live; } - live_stop_threads(); - exit_live: ret = pthread_join(listener_thread, &status); if (ret != 0) { @@ -2623,6 +2664,21 @@ exit_worker: } exit_dispatcher: + ret = pthread_join(health_thread, &status); + if (ret != 0) { + PERROR("pthread_join health thread"); + goto error; /* join error, exit without cleanup */ + } + + /* + * Stop live threads only after joining other threads. + */ + live_stop_threads(); + +health_error: + utils_close_pipe(health_quit_pipe); + +error_health_pipe: health_app_destroy(health_relayd); exit_health_app_create: