X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=fb290bacc1a4345e12850d475fa10c720115f77f;hp=93b08fcaa4a32cddfb4deef66682e8a1b8ae6135;hb=f48c25b760239f20a6e82f3839e04f82d07bdeea;hpb=f263b7fd113e51d0737554e8232b8669e142a260 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 93b08fcaa..fb290bacc 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -18,6 +18,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -233,6 +234,11 @@ int set_option(int opt, const char *arg, const char *optname) break; case 'g': tracing_group_name = strdup(arg); + if (tracing_group_name == NULL) { + ret = -errno; + PERROR("strdup"); + goto end; + } tracing_group_name_override = 1; break; case 'h': @@ -251,7 +257,10 @@ int set_option(int opt, const char *arg, const char *optname) if (arg) { lttng_opt_verbose = config_parse_value(arg); } else { - lttng_opt_verbose += 1; + /* Only 3 level of verbosity (-vvv). */ + if (lttng_opt_verbose < 3) { + lttng_opt_verbose += 1; + } } break; default: @@ -327,7 +336,7 @@ end: static int set_options(int argc, char **argv) { - int c, ret = 0, option_index = 0; + int c, ret = 0, option_index = 0, retval = 0; int orig_optopt = optopt, orig_optind = optind; char *default_address, *optstring; const char *config_path = NULL; @@ -335,7 +344,7 @@ int set_options(int argc, char **argv) optstring = utils_generate_optstring(long_options, sizeof(long_options) / sizeof(struct option)); if (!optstring) { - ret = -ENOMEM; + retval = -ENOMEM; goto exit; } @@ -344,7 +353,7 @@ int set_options(int argc, char **argv) while ((c = getopt_long(argc, argv, optstring, long_options, &option_index)) != -1) { if (c == '?') { - ret = -EINVAL; + retval = -EINVAL; goto exit; } else if (c != 'f') { continue; @@ -361,8 +370,8 @@ int set_options(int argc, char **argv) if (ret) { if (ret > 0) { ERR("Invalid configuration option at line %i", ret); - ret = -1; } + retval = -1; goto exit; } @@ -377,6 +386,7 @@ int set_options(int argc, char **argv) ret = set_option(c, optarg, long_options[option_index].name); if (ret < 0) { + retval = -1; goto exit; } } @@ -388,6 +398,7 @@ int set_options(int argc, char **argv) DEFAULT_NETWORK_CONTROL_PORT); if (ret < 0) { PERROR("asprintf default data address"); + retval = -1; goto exit; } @@ -395,6 +406,7 @@ int set_options(int argc, char **argv) free(default_address); if (ret < 0) { ERR("Invalid control URI specified"); + retval = -1; goto exit; } } @@ -404,6 +416,7 @@ int set_options(int argc, char **argv) DEFAULT_NETWORK_DATA_PORT); if (ret < 0) { PERROR("asprintf default data address"); + retval = -1; goto exit; } @@ -411,6 +424,7 @@ int set_options(int argc, char **argv) free(default_address); if (ret < 0) { ERR("Invalid data URI specified"); + retval = -1; goto exit; } } @@ -420,6 +434,7 @@ int set_options(int argc, char **argv) DEFAULT_NETWORK_VIEWER_PORT); if (ret < 0) { PERROR("asprintf default viewer control address"); + retval = -1; goto exit; } @@ -427,23 +442,32 @@ int set_options(int argc, char **argv) free(default_address); if (ret < 0) { ERR("Invalid viewer control URI specified"); + retval = -1; goto exit; } } exit: free(optstring); - return ret; + return retval; } /* * Cleanup the daemon */ static -void cleanup(void) +void relayd_cleanup(struct relay_local_data *relay_ctx) { DBG("Cleaning up"); + if (viewer_streams_ht) + lttng_ht_destroy(viewer_streams_ht); + if (relay_streams_ht) + lttng_ht_destroy(relay_streams_ht); + if (relay_ctx && relay_ctx->sessions_ht) + lttng_ht_destroy(relay_ctx->sessions_ht); + free(relay_ctx); + /* free the dynamically allocated opt_output_path */ free(opt_output_path); @@ -470,41 +494,55 @@ int notify_thread_pipe(int wpipe) ret = lttng_write(wpipe, "!", 1); if (ret < 1) { PERROR("write poll pipe"); + goto end; } - + ret = 0; +end: return ret; } -static void notify_health_quit_pipe(int *pipe) +static +int notify_health_quit_pipe(int *pipe) { ssize_t ret; ret = lttng_write(pipe[1], "4", 1); if (ret < 1) { PERROR("write relay health quit"); + goto end; } + ret = 0; +end: + return ret; } /* - * Stop all threads by closing the thread quit pipe. + * Stop all relayd and relayd-live threads. */ -static -void stop_threads(void) +int lttng_relay_stop_threads(void) { - int ret; + int retval = 0; /* Stopping all threads */ DBG("Terminating all threads"); - ret = notify_thread_pipe(thread_quit_pipe[1]); - if (ret < 0) { + if (notify_thread_pipe(thread_quit_pipe[1])) { ERR("write error on thread quit pipe"); + retval = -1; } - notify_health_quit_pipe(health_quit_pipe); + if (notify_health_quit_pipe(health_quit_pipe)) { + ERR("write error on health quit pipe"); + } /* Dispatch thread */ CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&relay_conn_queue.futex); + + if (relayd_live_stop()) { + ERR("Error stopping live threads"); + retval = -1; + } + return retval; } /* @@ -522,11 +560,15 @@ void sighandler(int sig) return; case SIGINT: DBG("SIGINT caught"); - stop_threads(); + if (lttng_relay_stop_threads()) { + ERR("Error stopping threads"); + } break; case SIGTERM: DBG("SIGTERM caught"); - stop_threads(); + if (lttng_relay_stop_threads()) { + ERR("Error stopping threads"); + } break; case SIGUSR1: CMM_STORE_SHARED(recv_child_signal, 1); @@ -826,6 +868,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -879,11 +926,12 @@ restart: new_conn->sock = newsock; /* Enqueue request for the dispatcher thread. */ - cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode); + cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail, + &new_conn->qnode); /* * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfq_enqueue. + * the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); } @@ -918,7 +966,7 @@ error_sock_control: } health_unregister(health_relayd); DBG("Relay listener thread cleanup complete"); - stop_threads(); + lttng_relay_stop_threads(); return NULL; } @@ -930,7 +978,7 @@ void *relay_thread_dispatcher(void *data) { int err = -1; ssize_t ret; - struct cds_wfq_node *node; + struct cds_wfcq_node *node; struct relay_connection *new_conn = NULL; DBG("[thread] Relay dispatcher started"); @@ -953,7 +1001,8 @@ void *relay_thread_dispatcher(void *data) health_code_update(); /* Dequeue commands */ - node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue); + node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head, + &relay_conn_queue.tail); if (node == NULL) { DBG("Woken up but nothing in the relay command queue"); /* Continue thread execution */ @@ -993,7 +1042,7 @@ error_testpoint: } health_unregister(health_relayd); DBG("Dispatch thread dying"); - stop_threads(); + lttng_relay_stop_threads(); return NULL; } @@ -1202,6 +1251,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->session_id = session->id; stream->index_fd = -1; stream->read_index_fd = -1; + stream->ctf_stream_id = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); @@ -1339,7 +1389,6 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); stream->close_flag = 1; session->stream_count--; - assert(session->stream_count >= 0); /* Check if we can close it or else the data will do it. */ try_close_stream(session, stream); @@ -1938,9 +1987,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, DBG("Received live beacon for stream %" PRIu64, stream->stream_handle); /* - * Only flag a stream inactive when it has already received data. + * Only flag a stream inactive when it has already received data + * and no indexes are in flight. */ - if (stream->total_index_received > 0) { + if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); } ret = 0; @@ -1957,9 +2007,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } index_created = 1; + stream->indexes_in_flight++; } copy_index_control_data(index, &index_info); + if (stream->ctf_stream_id == -1ULL) { + stream->ctf_stream_id = be64toh(index_info.stream_id); + } if (index_created) { /* @@ -1984,6 +2038,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } end_rcu_unlock: @@ -2147,6 +2203,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } index_created = 1; + stream->indexes_in_flight++; } if (rotate_index || stream->index_fd < 0) { @@ -2189,6 +2246,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } error: @@ -2300,7 +2359,7 @@ int relay_process_data(struct relay_connection *conn) pthread_mutex_lock(&vstream->overwrite_lock); vstream->abort_flag = 1; pthread_mutex_unlock(&vstream->overwrite_lock); - DBG("Streaming side setting abort_flag on stream %s_%lu\n", + DBG("Streaming side setting abort_flag on stream %s_%" PRIu64 "\n", stream->channel_name, new_id); } else if (vstream->tracefile_count_current == stream->tracefile_count_current) { @@ -2317,7 +2376,6 @@ int relay_process_data(struct relay_connection *conn) stream->tracefile_size, stream->tracefile_count, relayd_uid, relayd_gid, stream->fd, &(stream->tracefile_count_current), &stream->fd); - stream->total_index_received = 0; pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (ret < 0) { ERR("Rotating stream output file"); @@ -2412,6 +2470,7 @@ void *relay_thread_worker(void *data) struct lttcomm_relayd_hdr recv_hdr; struct relay_local_data *relay_ctx = (struct relay_local_data *) data; struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; + struct relay_index *index; DBG("[thread] Relay worker started"); @@ -2482,6 +2541,11 @@ restart: health_code_update(); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -2584,45 +2648,49 @@ restart: health_code_update(); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Skip the command pipe. It's handled in the first loop. */ if (pollfd == relay_conn_pipe[0]) { continue; } - if (revents) { - rcu_read_lock(); - conn = connection_find_by_sock(relay_connections_ht, pollfd); - if (!conn) { - /* Skip it. Might be removed before. */ + rcu_read_lock(); + conn = connection_find_by_sock(relay_connections_ht, pollfd); + if (!conn) { + /* Skip it. Might be removed before. */ + rcu_read_unlock(); + continue; + } + + if (revents & LPOLLIN) { + if (conn->type != RELAY_DATA) { rcu_read_unlock(); continue; } - if (revents & LPOLLIN) { - if (conn->type != RELAY_DATA) { - continue; - } - - ret = relay_process_data(conn); - /* Connection closed */ - if (ret < 0) { - cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, conn); - DBG("Data connection closed with %d", pollfd); - /* - * Every goto restart call sets the last seen fd where - * here we don't really care since we gracefully - * continue the loop after the connection is deleted. - */ - } else { - /* Keep last seen port. */ - last_seen_data_fd = pollfd; - rcu_read_unlock(); - goto restart; - } + ret = relay_process_data(conn); + /* Connection closed */ + if (ret < 0) { + cleanup_connection_pollfd(&events, pollfd); + destroy_connection(relay_connections_ht, conn); + DBG("Data connection closed with %d", pollfd); + /* + * Every goto restart call sets the last seen fd where + * here we don't really care since we gracefully + * continue the loop after the connection is deleted. + */ + } else { + /* Keep last seen port. */ + last_seen_data_fd = pollfd; + rcu_read_unlock(); + goto restart; } - rcu_read_unlock(); } + rcu_read_unlock(); } last_seen_data_fd = -1; } @@ -2643,6 +2711,14 @@ error: } rcu_read_unlock(); error_poll_create: + rcu_read_lock(); + cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, + index_n.node) { + health_code_update(); + relay_index_delete(index); + relay_index_free_safe(index); + } + rcu_read_unlock(); lttng_ht_destroy(indexes_ht); indexes_ht_error: lttng_ht_destroy(relay_connections_ht); @@ -2661,7 +2737,7 @@ error_testpoint: } health_unregister(health_relayd); rcu_unregister_thread(); - stop_threads(); + lttng_relay_stop_threads(); return NULL; } @@ -2683,31 +2759,35 @@ static int create_relay_conn_pipe(void) */ int main(int argc, char **argv) { - int ret = 0; + int ret = 0, retval = 0; void *status; - struct relay_local_data *relay_ctx; + struct relay_local_data *relay_ctx = NULL; /* Parse arguments */ progname = argv[0]; - if ((ret = set_options(argc, argv)) < 0) { - goto exit; + if (set_options(argc, argv)) { + retval = -1; + goto exit_options; } - if ((ret = set_signal_handler()) < 0) { - goto exit; + if (set_signal_handler()) { + retval = -1; + goto exit_options; } /* Try to create directory if -o, --output is specified. */ if (opt_output_path) { if (*opt_output_path != '/') { ERR("Please specify an absolute path for -o, --output PATH"); - goto exit; + retval = -1; + goto exit_options; } ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG); if (ret < 0) { ERR("Unable to create %s", opt_output_path); - goto exit; + retval = -1; + goto exit_options; } } @@ -2718,7 +2798,8 @@ int main(int argc, char **argv) ret = lttng_daemonize(&child_ppid, &recv_child_signal, !opt_background); if (ret < 0) { - goto exit; + retval = -1; + goto exit_options; } /* @@ -2731,9 +2812,19 @@ int main(int argc, char **argv) } } + + /* Initialize thread health monitoring */ + health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); + if (!health_relayd) { + PERROR("health_app_create error"); + retval = -1; + goto exit_health_app_create; + } + /* Create thread quit pipe */ - if ((ret = init_thread_quit_pipe()) < 0) { - goto error; + if (init_thread_quit_pipe()) { + retval = -1; + goto exit_init_data; } /* We need those values for the file/dir creation. */ @@ -2744,21 +2835,25 @@ int main(int argc, char **argv) if (relayd_uid == 0) { if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) { ERR("Need to be root to use ports < 1024"); - ret = -1; - goto exit; + retval = -1; + goto exit_init_data; } } /* Setup the thread apps communication pipe. */ - if ((ret = create_relay_conn_pipe()) < 0) { - goto exit; + if (create_relay_conn_pipe()) { + retval = -1; + goto exit_init_data; } /* Init relay command queue. */ - cds_wfq_init(&relay_conn_queue.queue); + cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail); /* Set up max poll set size */ - lttng_poll_set_max_size(); + if (lttng_poll_set_max_size()) { + retval = -1; + goto exit_init_data; + } /* Initialize communication library */ lttcomm_init(); @@ -2767,134 +2862,139 @@ int main(int argc, char **argv) relay_ctx = zmalloc(sizeof(struct relay_local_data)); if (!relay_ctx) { PERROR("relay_ctx"); - goto exit; + retval = -1; + goto exit_init_data; } /* tables of sessions indexed by session ID */ relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!relay_ctx->sessions_ht) { - goto exit_relay_ctx_sessions; + retval = -1; + goto exit_init_data; } /* tables of streams indexed by stream ID */ relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!relay_streams_ht) { - goto exit_relay_ctx_streams; + retval = -1; + goto exit_init_data; } /* tables of streams indexed by stream ID */ viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!viewer_streams_ht) { - goto exit_relay_ctx_viewer_streams; - } - - /* Initialize thread health monitoring */ - health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); - if (!health_relayd) { - PERROR("health_app_create error"); - goto exit_health_app_create; + retval = -1; + goto exit_init_data; } ret = utils_create_pipe(health_quit_pipe); - if (ret < 0) { - goto error_health_pipe; + if (ret) { + retval = -1; + goto exit_health_quit_pipe; } /* Create thread to manage the client socket */ ret = pthread_create(&health_thread, NULL, thread_manage_health, (void *) NULL); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create health"); - goto health_error; + retval = -1; + goto exit_health_thread; } /* Setup the dispatcher thread */ ret = pthread_create(&dispatcher_thread, NULL, relay_thread_dispatcher, (void *) NULL); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create dispatcher"); - goto exit_dispatcher; + retval = -1; + goto exit_dispatcher_thread; } /* Setup the worker thread */ ret = pthread_create(&worker_thread, NULL, relay_thread_worker, (void *) relay_ctx); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create worker"); - goto exit_worker; + retval = -1; + goto exit_worker_thread; } /* Setup the listener thread */ ret = pthread_create(&listener_thread, NULL, relay_thread_listener, (void *) NULL); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create listener"); - goto exit_listener; + retval = -1; + goto exit_listener_thread; } - ret = live_start_threads(live_uri, relay_ctx); - if (ret != 0) { + ret = relayd_live_create(live_uri, relay_ctx); + if (ret) { ERR("Starting live viewer threads"); + retval = -1; goto exit_live; } + /* + * This is where we start awaiting program completion (e.g. through + * signal that asks threads to teardown). + */ + + ret = relayd_live_join(); + if (ret) { + retval = -1; + } exit_live: + ret = pthread_join(listener_thread, &status); - if (ret != 0) { - PERROR("pthread_join"); - goto error; /* join error, exit without cleanup */ + if (ret) { + errno = ret; + PERROR("pthread_join listener_thread"); + retval = -1; } -exit_listener: +exit_listener_thread: ret = pthread_join(worker_thread, &status); - if (ret != 0) { - PERROR("pthread_join"); - goto error; /* join error, exit without cleanup */ + if (ret) { + errno = ret; + PERROR("pthread_join worker_thread"); + retval = -1; } -exit_worker: +exit_worker_thread: ret = pthread_join(dispatcher_thread, &status); - if (ret != 0) { - PERROR("pthread_join"); - goto error; /* join error, exit without cleanup */ + if (ret) { + errno = ret; + PERROR("pthread_join dispatcher_thread"); + retval = -1; } +exit_dispatcher_thread: -exit_dispatcher: ret = pthread_join(health_thread, &status); - if (ret != 0) { - PERROR("pthread_join health thread"); - goto error; /* join error, exit without cleanup */ + if (ret) { + errno = ret; + PERROR("pthread_join health_thread"); + retval = -1; } +exit_health_thread: - /* - * Stop live threads only after joining other threads. - */ - live_stop_threads(); - -health_error: utils_close_pipe(health_quit_pipe); +exit_health_quit_pipe: -error_health_pipe: +exit_init_data: health_app_destroy(health_relayd); - exit_health_app_create: - lttng_ht_destroy(viewer_streams_ht); - -exit_relay_ctx_viewer_streams: - lttng_ht_destroy(relay_streams_ht); - -exit_relay_ctx_streams: - lttng_ht_destroy(relay_ctx->sessions_ht); +exit_options: + relayd_cleanup(relay_ctx); -exit_relay_ctx_sessions: - free(relay_ctx); - -exit: - cleanup(); - if (!ret) { + if (!retval) { exit(EXIT_SUCCESS); + } else { + exit(EXIT_FAILURE); } - -error: - exit(EXIT_FAILURE); }