X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=6727a547de65b2a94706a76b5f2089b5b33956a5;hp=43e6f318aef9eec57daecb9a6106f4b314c9296b;hb=eea7556c652e22165c760a37e1db48595216ee7c;hpb=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 43e6f318a..6727a547d 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -57,6 +57,7 @@ #include "utils.h" #include "lttng-relayd.h" #include "live.h" +#include "health-relayd.h" /* command line options */ char *opt_output_path; @@ -101,9 +102,6 @@ static struct relay_cmd_queue relay_cmd_queue; static char *data_buffer; static unsigned int data_buffer_size; -/* Global hash table that stores relay index object. */ -static struct lttng_ht *indexes_ht; - /* We need those values for the file/dir creation. */ static uid_t relayd_uid; static gid_t relayd_gid; @@ -111,6 +109,15 @@ static gid_t relayd_gid; /* Global relay stream hash table. */ struct lttng_ht *relay_streams_ht; +/* Global relay viewer stream hash table. */ +struct lttng_ht *viewer_streams_ht; + +/* Global hash table that stores relay index object. */ +struct lttng_ht *indexes_ht; + +/* Relayd health monitoring */ +struct health_app *health_relayd; + /* * usage function on stderr */ @@ -510,6 +517,10 @@ void *relay_thread_listener(void *data) DBG("[thread] Relay listener started"); + health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER); + + health_code_update(); + control_sock = relay_init_sock(control_uri); if (!control_sock) { goto error_sock_control; @@ -541,10 +552,14 @@ void *relay_thread_listener(void *data) } while (1) { + health_code_update(); + DBG("Listener accepting connections"); restart: + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -559,6 +574,8 @@ restart: DBG("Relay new connection received"); for (i = 0; i < nb_fd; i++) { + health_code_update(); + /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -653,8 +670,10 @@ error_sock_relay: lttcomm_destroy_sock(control_sock); error_sock_control: if (err) { - DBG("Thread exited with error"); + health_error(); + ERR("Health error occurred in %s", __func__); } + health_unregister(health_relayd); DBG("Relay listener thread cleanup complete"); stop_threads(); return NULL; @@ -666,17 +685,25 @@ error_sock_control: static void *relay_thread_dispatcher(void *data) { - int ret; + int ret, err = -1; struct cds_wfq_node *node; struct relay_command *relay_cmd = NULL; DBG("[thread] Relay dispatcher started"); + health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER); + + health_code_update(); + while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + health_code_update(); + /* Atomically prepare the queue futex */ futex_nto1_prepare(&relay_cmd_queue.futex); do { + health_code_update(); + /* Dequeue commands */ node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue); if (node == NULL) { @@ -705,10 +732,20 @@ void *relay_thread_dispatcher(void *data) } while (node != NULL); /* Futex wait on queue. Blocking call on futex() */ + health_poll_entry(); futex_nto1_wait(&relay_cmd_queue.futex); + health_poll_exit(); } + /* Normal exit, no error */ + err = 0; + error: + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_relayd); DBG("Dispatch thread dying"); stop_threads(); return NULL; @@ -761,6 +798,57 @@ void deferred_free_session(struct rcu_head *head) free(session); } +/* + * Close a given stream. The stream is freed using a call RCU. + * + * RCU read side lock MUST be acquired. If NO close_stream_check() was called + * BEFORE the stream lock MUST be acquired. + */ +static void destroy_stream(struct relay_stream *stream, + struct lttng_ht *ctf_traces_ht) +{ + int delret; + struct relay_viewer_stream *vstream; + struct lttng_ht_iter iter; + + assert(stream); + + delret = close(stream->fd); + if (delret < 0) { + PERROR("close stream"); + } + + if (stream->index_fd >= 0) { + delret = close(stream->index_fd); + if (delret < 0) { + PERROR("close stream index_fd"); + } + } + + vstream = live_find_viewer_stream_by_id(stream->stream_handle); + if (vstream) { + /* + * Set the last good value into the viewer stream. This is done + * right before the stream gets deleted from the hash table. The + * lookup failure on the live thread side of a stream indicates + * that the viewer stream index received value should be used. + */ + vstream->total_index_received = stream->total_index_received; + } + + /* Cleanup index of that stream. */ + relay_index_destroy_by_stream_id(stream->stream_handle); + + iter.iter.node = &stream->stream_n.node; + delret = lttng_ht_del(relay_streams_ht, &iter); + assert(!delret); + iter.iter.node = &stream->ctf_trace_node.node; + delret = lttng_ht_del(ctf_traces_ht, &iter); + assert(!delret); + call_rcu(&stream->rcu_node, deferred_free_stream); + DBG("Closed tracefile %d from close stream", stream->fd); +} + /* * relay_delete_session: Free all memory associated with a session and * close all the FDs @@ -783,29 +871,20 @@ void relay_delete_session(struct relay_command *cmd, rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) { node = lttng_ht_iter_get_node_ulong(&iter); - if (node) { - stream = caa_container_of(node, - struct relay_stream, stream_n); - if (stream->session == cmd->session) { - ret = close(stream->fd); - if (ret < 0) { - PERROR("close stream fd on delete session"); - } - ret = lttng_ht_del(relay_streams_ht, &iter); - assert(!ret); - call_rcu(&stream->rcu_node, - deferred_free_stream); - } - /* Cleanup index of that stream. */ - relay_index_destroy_by_stream_id(stream->stream_handle, - indexes_ht); + if (!node) { + continue; + } + stream = caa_container_of(node, struct relay_stream, stream_n); + if (stream->session == cmd->session) { + destroy_stream(stream, cmd->ctf_traces_ht); } } + + /* Make this session not visible anymore. */ iter.iter.node = &cmd->session->session_n.node; ret = lttng_ht_del(sessions_ht, &iter); assert(!ret); - call_rcu(&cmd->session->rcu_node, - deferred_free_session); + call_rcu(&cmd->session->rcu_node, deferred_free_session); rcu_read_unlock(); } @@ -859,6 +938,8 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, session->id = ++last_relay_session_id; session->sock = cmd->sock; + session->minor = cmd->minor; + session->major = cmd->major; cmd->session = session; reply.session_id = htobe64(session->id); @@ -1029,14 +1110,13 @@ err_free_stream: */ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *viewer_streams_ht) + struct relay_command *cmd) { + int ret, send_ret; struct relay_session *session = cmd->session; struct lttcomm_relayd_close_stream stream_info; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; - int ret, send_ret; - struct lttng_ht_iter iter; DBG("Close stream received"); @@ -1070,42 +1150,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->close_flag = 1; if (close_stream_check(stream)) { - int delret; - struct relay_viewer_stream *vstream; - - delret = close(stream->fd); - if (delret < 0) { - PERROR("close stream"); - } - - if (stream->index_fd >= 0) { - delret = close(stream->index_fd); - if (delret < 0) { - PERROR("close stream index_fd"); - } - } - - vstream = live_find_viewer_stream_by_id(stream->stream_handle, - viewer_streams_ht); - if (vstream) { - /* - * Set the last good value into the viewer stream. This is done - * right before the stream gets deleted from the hash table. The - * lookup failure on the live thread side of a stream indicates - * that the viewer stream index received value should be used. - */ - vstream->total_index_received = stream->total_index_received; - } - - iter.iter.node = &stream->stream_n.node; - delret = lttng_ht_del(relay_streams_ht, &iter); - assert(!delret); - iter.iter.node = &stream->ctf_trace_node.node; - delret = lttng_ht_del(cmd->ctf_traces_ht, &iter); - assert(!delret); - call_rcu(&stream->rcu_node, - deferred_free_stream); - DBG("Closed tracefile %d from close stream", stream->fd); + destroy_stream(stream, cmd->ctf_traces_ht); } end_unlock: @@ -1644,7 +1689,7 @@ end_no_session: */ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *indexes_ht) + struct relay_command *cmd) { int ret, send_ret, index_created = 0; struct relay_session *session = cmd->session; @@ -1655,7 +1700,6 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, uint64_t net_seq_num; assert(cmd); - assert(indexes_ht); DBG("Relay receiving index"); @@ -1703,7 +1747,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, stream->beacon_ts_end = -1ULL; } - index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht); + index = relay_index_find(stream->stream_handle, net_seq_num); if (!index) { /* A successful creation will add the object to the HT. */ index = relay_index_create(stream->stream_handle, net_seq_num); @@ -1721,7 +1765,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, * already exist, destroy back the index created, set the data in this * object and write it on disk. */ - relay_index_add(index, indexes_ht, &wr_index); + relay_index_add(index, &wr_index); if (wr_index) { copy_index_control_data(wr_index, &index_info); free(index); @@ -1744,7 +1788,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, stream->index_fd = ret; } - ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + ret = relay_index_write(wr_index->fd, wr_index); if (ret < 0) { goto end_rcu_unlock; } @@ -1795,7 +1839,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_send_version(recv_hdr, cmd, ctx->sessions_ht); break; case RELAYD_CLOSE_STREAM: - ret = relay_close_stream(recv_hdr, cmd, ctx->viewer_streams_ht); + ret = relay_close_stream(recv_hdr, cmd); break; case RELAYD_DATA_PENDING: ret = relay_data_pending(recv_hdr, cmd); @@ -1810,7 +1854,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_end_data_pending(recv_hdr, cmd); break; case RELAYD_SEND_INDEX: - ret = relay_recv_index(recv_hdr, cmd, indexes_ht); + ret = relay_recv_index(recv_hdr, cmd); break; case RELAYD_UPDATE_SYNC_INFO: default: @@ -1824,18 +1868,98 @@ end: return ret; } +/* + * Handle index for a data stream. + * + * RCU read side lock MUST be acquired. + * + * Return 0 on success else a negative value. + */ +static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, + int rotate_index) +{ + int ret = 0, index_created = 0; + uint64_t stream_id, data_offset; + struct relay_index *index, *wr_index = NULL; + + assert(stream); + + stream_id = stream->stream_handle; + /* Get data offset because we are about to update the index. */ + data_offset = htobe64(stream->tracefile_size_current); + + /* + * Lookup for an existing index for that stream id/sequence number. If on + * exists, the control thread already received the data for it thus we need + * to write it on disk. + */ + index = relay_index_find(stream_id, net_seq_num); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream_id, net_seq_num); + if (!index) { + ret = -1; + goto error; + } + index_created = 1; + } + + if (rotate_index || stream->index_fd < 0) { + index->to_close_fd = stream->index_fd; + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + /* This will close the stream's index fd if one. */ + relay_index_free_safe(index); + goto error; + } + stream->index_fd = ret; + } + index->fd = stream->index_fd; + index->index_data.offset = data_offset; + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created and set the data. + */ + relay_index_add(index, &wr_index); + if (wr_index) { + /* Copy back data from the created index. */ + wr_index->fd = index->fd; + wr_index->to_close_fd = index->to_close_fd; + wr_index->index_data.offset = data_offset; + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; + } + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + ret = relay_index_write(wr_index->fd, wr_index); + if (ret < 0) { + goto error; + } + stream->total_index_received++; + } + +error: + return ret; +} + /* * relay_process_data: Process the data received on the data socket */ static -int relay_process_data(struct relay_command *cmd, - struct lttng_ht *indexes_ht) +int relay_process_data(struct relay_command *cmd) { - int ret = 0, rotate_index = 0, index_created = 0; + int ret = 0, rotate_index = 0; struct relay_stream *stream; - struct relay_index *index, *wr_index = NULL; struct lttcomm_relayd_data_hdr data_hdr; - uint64_t stream_id, data_offset; + uint64_t stream_id; uint64_t net_seq_num; uint32_t data_size; @@ -1908,77 +2032,18 @@ int relay_process_data(struct relay_command *cmd, rotate_index = 1; } - /* Get data offset because we are about to update the index. */ - data_offset = htobe64(stream->tracefile_size_current); - /* - * Lookup for an existing index for that stream id/sequence number. If on - * exists, the control thread already received the data for it thus we need - * to write it on disk. + * Index are handled in protocol version 2.4 and above. Also, snapshot and + * index are NOT supported. */ - index = relay_index_find(stream_id, net_seq_num, indexes_ht); - if (!index) { - /* A successful creation will add the object to the HT. */ - index = relay_index_create(stream->stream_handle, net_seq_num); - if (!index) { - goto end_rcu_unlock; - } - index_created = 1; - } - - if (rotate_index || stream->index_fd < 0) { - index->to_close_fd = stream->index_fd; - ret = index_create_file(stream->path_name, stream->channel_name, - relayd_uid, relayd_gid, stream->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { - /* This will close the stream's index fd if one. */ - relay_index_free_safe(index); - goto end_rcu_unlock; - } - stream->index_fd = ret; - } - index->fd = stream->index_fd; - index->index_data.offset = data_offset; - - if (index_created) { - /* - * Try to add the relay index object to the hash table. If an object - * already exist, destroy back the index created and set the data. - */ - relay_index_add(index, indexes_ht, &wr_index); - if (wr_index) { - /* Copy back data from the created index. */ - wr_index->fd = index->fd; - wr_index->to_close_fd = index->to_close_fd; - wr_index->index_data.offset = data_offset; - free(index); - } - } else { - /* The index already exists so write it on disk. */ - wr_index = index; - } - - /* Do we have a writable ready index to write on disk. */ - if (wr_index) { - /* Starting at 2.4, create the index file if none available. */ - if (cmd->minor >= 4 && stream->index_fd < 0) { - ret = index_create_file(stream->path_name, stream->channel_name, - relayd_uid, relayd_gid, stream->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { - goto end_rcu_unlock; - } - stream->index_fd = ret; - } - - ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + if (stream->session->minor >= 4 && !stream->session->snapshot) { + ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { goto end_rcu_unlock; } - stream->total_index_received++; } + /* Write data to stream output fd. */ do { ret = write(stream->fd, data_buffer, data_size); } while (ret < 0 && errno == EINTR); @@ -2001,24 +2066,7 @@ int relay_process_data(struct relay_command *cmd, /* Check if we need to close the FD */ if (close_stream_check(stream)) { - int cret; - struct lttng_ht_iter iter; - - cret = close(stream->fd); - if (cret < 0) { - PERROR("close stream process data"); - } - - cret = close(stream->index_fd); - if (cret < 0) { - PERROR("close stream index_fd"); - } - iter.iter.node = &stream->stream_n.node; - ret = lttng_ht_del(relay_streams_ht, &iter); - assert(!ret); - call_rcu(&stream->rcu_node, - deferred_free_stream); - DBG("Closed tracefile %d after recv data", stream->fd); + destroy_stream(stream, cmd->ctf_traces_ht); } end_rcu_unlock: @@ -2130,6 +2178,10 @@ void *relay_thread_worker(void *data) rcu_register_thread(); + health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER); + + health_code_update(); + /* table of connections indexed on socket */ relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); if (!relay_connections_ht) { @@ -2156,9 +2208,13 @@ restart: while (1) { int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1; + health_code_update(); + /* Infinite blocking call, waiting for transmission */ DBG3("Relayd worker thread polling..."); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -2181,6 +2237,8 @@ restart: uint32_t revents = LTTNG_POLL_GETEV(&events, i); int pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -2283,6 +2341,9 @@ restart: if (last_seen_data_fd >= 0) { for (i = 0; i < nb_fd; i++) { int pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); + if (last_seen_data_fd == pollfd) { idx = i; break; @@ -2296,6 +2357,8 @@ restart: uint32_t revents = LTTNG_POLL_GETEV(&events, i); int pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(); + /* Skip the command pipe. It's handled in the first loop. */ if (pollfd == relay_cmd_pipe[0]) { continue; @@ -2320,7 +2383,7 @@ restart: continue; } - ret = relay_process_data(relay_connection, indexes_ht); + ret = relay_process_data(relay_connection); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2345,6 +2408,9 @@ restart: last_seen_data_fd = -1; } + /* Normal exit, no error */ + ret = 0; + exit: error: lttng_poll_clean(&events); @@ -2352,6 +2418,8 @@ error: /* empty the hash table and free the memory */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) { + health_code_update(); + node = lttng_ht_iter_get_node_ulong(&iter); if (node) { relay_connection = caa_container_of(node, @@ -2373,8 +2441,13 @@ relay_connections_ht_error: } DBG("Worker thread cleanup complete"); free(data_buffer); - stop_threads(); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_relayd); rcu_unregister_thread(); + stop_threads(); return NULL; } @@ -2484,11 +2557,18 @@ int main(int argc, char **argv) } /* tables of streams indexed by stream ID */ - relay_ctx->viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (!relay_ctx->viewer_streams_ht) { + viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!viewer_streams_ht) { goto exit_relay_ctx_viewer_streams; } + /* Initialize thread health monitoring */ + health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES); + if (!health_relayd) { + PERROR("health_app_create error"); + goto exit_health_app_create; + } + /* Setup the dispatcher thread */ ret = pthread_create(&dispatcher_thread, NULL, relay_thread_dispatcher, (void *) NULL); @@ -2513,32 +2593,40 @@ int main(int argc, char **argv) goto exit_listener; } - ret = live_start_threads(live_uri, relay_ctx); + ret = live_start_threads(live_uri, relay_ctx, thread_quit_pipe); if (ret != 0) { ERR("Starting live viewer threads"); + goto exit_live; } -exit_listener: + live_stop_threads(); + +exit_live: ret = pthread_join(listener_thread, &status); if (ret != 0) { PERROR("pthread_join"); goto error; /* join error, exit without cleanup */ } -exit_worker: +exit_listener: ret = pthread_join(worker_thread, &status); if (ret != 0) { PERROR("pthread_join"); goto error; /* join error, exit without cleanup */ } -exit_dispatcher: +exit_worker: ret = pthread_join(dispatcher_thread, &status); if (ret != 0) { PERROR("pthread_join"); goto error; /* join error, exit without cleanup */ } - lttng_ht_destroy(relay_ctx->viewer_streams_ht); + +exit_dispatcher: + health_app_destroy(health_relayd); + +exit_health_app_create: + lttng_ht_destroy(viewer_streams_ht); exit_relay_ctx_viewer_streams: lttng_ht_destroy(relay_streams_ht); @@ -2550,7 +2638,6 @@ exit_relay_ctx_sessions: free(relay_ctx); exit: - live_stop_threads(); cleanup(); if (!ret) { exit(EXIT_SUCCESS);