X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=fdfe73ccc913ee8e9674f2ac73325f0c0bc22d1c;hp=02f676df8bc0e1bf56542e2c2394a5cabb59185c;hb=2a174661a1e0ab551b41ff1cae7191688525fc1f;hpb=2f8f53af90479595d530f8f02e71dd0b9fb810ee diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 02f676df8..fdfe73ccc 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -63,6 +63,8 @@ #include "health-relayd.h" #include "testpoint.h" #include "viewer-stream.h" +#include "session.h" +#include "stream.h" /* command line options */ char *opt_output_path; @@ -109,7 +111,6 @@ static pthread_t worker_thread; static pthread_t health_thread; static uint64_t last_relay_stream_id; -static uint64_t last_relay_session_id; /* * Relay command queue. @@ -691,7 +692,6 @@ error: static int close_stream_check(struct relay_stream *stream) { - if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) { /* * We are about to close the stream so set the data pending flag to 1 @@ -705,6 +705,40 @@ int close_stream_check(struct relay_stream *stream) return 0; } +static void try_close_stream(struct relay_session *session, + struct relay_stream *stream) +{ + int ret; + struct ctf_trace *ctf_trace; + + assert(session); + assert(stream); + + if (!close_stream_check(stream)) { + /* Can't close it, not ready for that. */ + goto end; + } + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, + stream->path_name); + assert(ctf_trace); + + pthread_mutex_lock(&session->viewer_ready_lock); + ctf_trace->invalid_flag = 1; + pthread_mutex_unlock(&session->viewer_ready_lock); + + ret = stream_close(session, stream); + if (!ret) { + /* Already close thus the ctf trace is being or has been destroyed. */ + goto end; + } + + ctf_trace_try_destroy(session, ctf_trace); + +end: + return; +} + /* * This thread manages the listening for new connections on the network */ @@ -964,149 +998,51 @@ error_testpoint: return NULL; } -/* - * Get stream from stream id. - * Need to be called with RCU read-side lock held. - */ -struct relay_stream *relay_stream_find_by_id(uint64_t stream_id) -{ - struct lttng_ht_node_ulong *node; - struct lttng_ht_iter iter; - struct relay_stream *ret; - - lttng_ht_lookup(relay_streams_ht, - (void *)((unsigned long) stream_id), - &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - DBG("Relay stream %" PRIu64 " not found", stream_id); - ret = NULL; - goto end; - } - - ret = caa_container_of(node, struct relay_stream, stream_n); - -end: - return ret; -} - -static -void deferred_free_stream(struct rcu_head *head) -{ - struct relay_stream *stream = - caa_container_of(head, struct relay_stream, rcu_node); - - free(stream->path_name); - free(stream->channel_name); - free(stream); -} - -static -void deferred_free_session(struct rcu_head *head) +static void try_close_streams(struct relay_session *session) { - struct relay_session *session = - caa_container_of(head, struct relay_session, rcu_node); - free(session); -} - -/* - * Close a given stream. The stream is freed using a call RCU. - * - * RCU read side lock MUST be acquired. If NO close_stream_check() was called - * BEFORE the stream lock MUST be acquired. - */ -static void destroy_stream(struct relay_stream *stream) -{ - int delret; - struct relay_viewer_stream *vstream; + struct ctf_trace *ctf_trace; struct lttng_ht_iter iter; - assert(stream); + assert(session); - delret = close(stream->fd); - if (delret < 0) { - PERROR("close stream"); - } + pthread_mutex_lock(&session->viewer_ready_lock); + rcu_read_lock(); + cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, + node.node) { + struct relay_stream *stream; - if (stream->index_fd >= 0) { - delret = close(stream->index_fd); - if (delret < 0) { - PERROR("close stream index_fd"); + /* Close streams. */ + cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) { + stream_close(session, stream); } - } - - vstream = viewer_stream_find_by_id(stream->stream_handle); - if (vstream) { - /* - * Set the last good value into the viewer stream. This is done - * right before the stream gets deleted from the hash table. The - * lookup failure on the live thread side of a stream indicates - * that the viewer stream index received value should be used. - */ - pthread_mutex_lock(&stream->viewer_stream_rotation_lock); - vstream->total_index_received = stream->total_index_received; - vstream->tracefile_count_last = stream->tracefile_count_current; - vstream->close_write_flag = 1; - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); - } - - /* Cleanup index of that stream. */ - relay_index_destroy_by_stream_id(stream->stream_handle); - iter.iter.node = &stream->stream_n.node; - delret = lttng_ht_del(relay_streams_ht, &iter); - assert(!delret); - iter.iter.node = &stream->ctf_trace_node.node; - delret = lttng_ht_del(stream->ctf_traces_ht, &iter); - assert(!delret); - - if (stream->ctf_trace) { - ctf_trace_try_destroy(stream->ctf_trace); + ctf_trace->invalid_flag = 1; + ctf_trace_try_destroy(session, ctf_trace); } - - call_rcu(&stream->rcu_node, deferred_free_stream); - DBG("Closed tracefile %d from close stream", stream->fd); + rcu_read_unlock(); + pthread_mutex_unlock(&session->viewer_ready_lock); } /* - * relay_delete_session: Free all memory associated with a session and - * close all the FDs + * Try to destroy a session within a connection. */ static void relay_delete_session(struct relay_command *cmd, struct lttng_ht *sessions_ht) { - struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; - struct relay_stream *stream; - int ret; + assert(cmd); + assert(sessions_ht); - if (!cmd->session) { - return; - } + /* Indicate that this session can be destroyed from now on. */ + cmd->session->close_flag = 1; - DBG("Relay deleting session %" PRIu64, cmd->session->id); + try_close_streams(cmd->session); - rcu_read_lock(); - cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) { - node = lttng_ht_iter_get_node_ulong(&iter); - if (!node) { - continue; - } - stream = caa_container_of(node, struct relay_stream, stream_n); - if (stream->session == cmd->session) { - destroy_stream(stream); - cmd->session->stream_count--; - assert(cmd->session->stream_count >= 0); - } - } - - /* Make this session not visible anymore. */ - iter.iter.node = &cmd->session->session_n.node; - ret = lttng_ht_del(sessions_ht, &iter); - assert(!ret); - call_rcu(&cmd->session->rcu_node, deferred_free_session); - rcu_read_unlock(); + /* + * This will try to delete and destroy the session if no viewer is attached + * to it meaning the refcount is down to zero. + */ + session_try_destroy(sessions_ht, cmd->session); } /* @@ -1150,38 +1086,30 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, memset(&reply, 0, sizeof(reply)); - session = zmalloc(sizeof(struct relay_session)); - if (session == NULL) { - PERROR("relay session zmalloc"); + session = session_create(); + if (!session) { ret = -1; goto error; } - - session->id = ++last_relay_session_id; - session->sock = cmd->sock; session->minor = cmd->minor; session->major = cmd->major; - pthread_mutex_init(&session->viewer_ready_lock, NULL); + cmd->session_id = session->id; cmd->session = session; reply.session_id = htobe64(session->id); switch (cmd->minor) { - case 1: - case 2: - case 3: - break; - case 4: /* LTTng sessiond 2.4 */ - default: - ret = cmd_create_session_2_4(cmd, session); - break; + case 1: + case 2: + case 3: + break; + case 4: /* LTTng sessiond 2.4 */ + default: + ret = cmd_create_session_2_4(cmd, session); + break; } - lttng_ht_node_init_ulong(&session->session_n, - (unsigned long) session->id); - lttng_ht_add_unique_ulong(sessions_ht, - &session->session_n); - + lttng_ht_add_unique_u64(sessions_ht, &session->session_n); DBG("Created session %" PRIu64, session->id); error: @@ -1207,32 +1135,14 @@ error: static void set_viewer_ready_flag(struct relay_command *cmd) { - struct relay_stream_recv_handle *node, *tmp_node; + struct relay_stream *stream, *tmp_stream; pthread_mutex_lock(&cmd->session->viewer_ready_lock); - - cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) { - struct relay_stream *stream; - - rcu_read_lock(); - stream = relay_stream_find_by_id(node->id); - if (!stream) { - /* - * Stream is most probably being cleaned up by the data thread thus - * simply continue to the next one. - */ - rcu_read_unlock(); - continue; - } - + cds_list_for_each_entry_safe(stream, tmp_stream, &cmd->recv_head, + recv_list) { stream->viewer_ready = 1; - rcu_read_unlock(); - - /* Clean stream handle node. */ - cds_list_del(&node->node); - free(node); + cds_list_del(&stream->recv_list); } - pthread_mutex_unlock(&cmd->session->viewer_ready_lock); return; } @@ -1242,20 +1152,12 @@ void set_viewer_ready_flag(struct relay_command *cmd) * handle. A new node is allocated thus must be freed when the node is deleted * from the list. */ -static void queue_stream_handle(uint64_t handle, struct relay_command *cmd) +static void queue_stream(struct relay_stream *stream, struct relay_command *cmd) { - struct relay_stream_recv_handle *node; - assert(cmd); + assert(stream); - node = zmalloc(sizeof(*node)); - if (!node) { - PERROR("zmalloc queue stream handle"); - return; - } - - node->id = handle; - cds_list_add(&node->node, &cmd->recv_head); + cds_list_add(&stream->recv_list, &cmd->recv_head); } /* @@ -1265,10 +1167,11 @@ static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, struct relay_command *cmd, struct lttng_ht *sessions_ht) { + int ret, send_ret; struct relay_session *session = cmd->session; struct relay_stream *stream = NULL; struct lttcomm_relayd_status_stream reply; - int ret, send_ret; + struct ctf_trace *trace; if (!session || cmd->version_check_done == 0) { ERR("Trying to add a stream before version check"); @@ -1299,10 +1202,10 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, rcu_read_lock(); stream->stream_handle = ++last_relay_stream_id; stream->prev_seq = -1ULL; - stream->session = session; + stream->session_id = session->id; stream->index_fd = -1; stream->read_index_fd = -1; - stream->ctf_trace = NULL; + lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG); @@ -1328,42 +1231,37 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); } - if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { - stream->metadata_flag = 1; - /* - * When we receive a new metadata stream, we create a new - * ctf_trace and we assign this ctf_trace to all streams with - * the same path. - * - * If later on we receive a new stream for the same ctf_trace, - * we copy the information from the first hit in the HT to the - * new stream. - */ - stream->ctf_trace = ctf_trace_create(); - if (!stream->ctf_trace) { + trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name); + if (!trace) { + trace = ctf_trace_create(stream->path_name); + if (!trace) { ret = -1; goto end; } - stream->ctf_trace->refcount++; - stream->ctf_trace->metadata_stream = stream; + ctf_trace_add(session->ctf_traces_ht, trace); + } + ctf_trace_get_ref(trace); + + if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { + stream->metadata_flag = 1; + /* Assign quick reference to the metadata stream in the trace. */ + trace->metadata_stream = stream; } - ctf_trace_assign(cmd->ctf_traces_ht, stream); - stream->ctf_traces_ht = cmd->ctf_traces_ht; /* - * Add the stream handle in the recv list of the connection. Once the end - * stream message is received, this list is emptied and streams are set - * with the viewer ready flag. + * Add the stream in the recv list of the connection. Once the end stream + * message is received, this list is emptied and streams are set with the + * viewer ready flag. */ - queue_stream_handle(stream->stream_handle, cmd); + queue_stream(stream, cmd); - lttng_ht_node_init_ulong(&stream->stream_n, - (unsigned long) stream->stream_handle); - lttng_ht_add_unique_ulong(relay_streams_ht, - &stream->stream_n); + /* + * Both in the ctf_trace object and the global stream ht since the data + * side of the relayd does not have the concept of session. + */ + lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); + cds_list_add_tail(&stream->trace_list, &trace->stream_list); - lttng_ht_node_init_str(&stream->ctf_trace_node, stream->path_name); - lttng_ht_add_str(cmd->ctf_traces_ht, &stream->ctf_trace_node); session->stream_count++; DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, @@ -1433,7 +1331,8 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, } rcu_read_lock(); - stream = relay_stream_find_by_id(be64toh(stream_info.stream_id)); + stream = stream_find_by_id(relay_streams_ht, + be64toh(stream_info.stream_id)); if (!stream) { ret = -1; goto end_unlock; @@ -1444,9 +1343,8 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, session->stream_count--; assert(session->stream_count >= 0); - if (close_stream_check(stream)) { - destroy_stream(stream); - } + /* Check if we can close it or else the data will do it. */ + try_close_stream(session, stream); end_unlock: rcu_read_unlock(); @@ -1554,6 +1452,7 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_metadata_payload *metadata_struct; struct relay_stream *metadata_stream; uint64_t data_size, payload_size; + struct ctf_trace *ctf_trace; if (!session) { ERR("Metadata sent before version check"); @@ -1599,7 +1498,7 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer; rcu_read_lock(); - metadata_stream = relay_stream_find_by_id( + metadata_stream = stream_find_by_id(relay_streams_ht, be64toh(metadata_struct->stream_id)); if (!metadata_stream) { ret = -1; @@ -1619,7 +1518,11 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, if (ret < 0) { goto end_unlock; } - metadata_stream->ctf_trace->metadata_received += + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, + metadata_stream->path_name); + assert(ctf_trace); + ctf_trace->metadata_received += payload_size + be32toh(metadata_struct->padding_size); DBG2("Relay metadata written"); @@ -1731,7 +1634,7 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, last_net_seq_num = be64toh(msg.last_net_seq_num); rcu_read_lock(); - stream = relay_stream_find_by_id(stream_id); + stream = stream_find_by_id(relay_streams_ht, stream_id); if (stream == NULL) { ret = -1; goto end_unlock; @@ -1809,7 +1712,7 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - stream_n.node) { + node.node) { if (stream->stream_handle == stream_id) { stream->data_pending_check_done = 1; DBG("Relay quiescent control pending flag set to %" PRIu64, @@ -1880,8 +1783,8 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, */ rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - stream_n.node) { - if (stream->session->id == session_id) { + node.node) { + if (stream->session_id == session_id) { stream->data_pending_check_done = 0; DBG("Set begin data pending flag to stream %" PRIu64, stream->stream_handle); @@ -1951,8 +1854,8 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, /* Iterate over all streams to see if the begin data pending flag is set. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - stream_n.node) { - if (stream->session->id == session_id && + node.node) { + if (stream->session_id == session_id && !stream->data_pending_check_done) { is_data_inflight = 1; DBG("Data is still in flight for stream %" PRIu64, @@ -2017,7 +1920,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, net_seq_num = be64toh(index_info.net_seq_num); rcu_read_lock(); - stream = relay_stream_find_by_id(be64toh(index_info.relay_stream_id)); + stream = stream_find_by_id(relay_streams_ht, + be64toh(index_info.relay_stream_id)); if (!stream) { ret = -1; goto end_rcu_unlock; @@ -2296,7 +2200,7 @@ error: * relay_process_data: Process the data received on the data socket */ static -int relay_process_data(struct relay_command *cmd) +int relay_process_data(struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret = 0, rotate_index = 0; ssize_t size_ret; @@ -2305,6 +2209,7 @@ int relay_process_data(struct relay_command *cmd) uint64_t stream_id; uint64_t net_seq_num; uint32_t data_size; + struct relay_session *session; ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2322,12 +2227,15 @@ int relay_process_data(struct relay_command *cmd) stream_id = be64toh(data_hdr.stream_id); rcu_read_lock(); - stream = relay_stream_find_by_id(stream_id); + stream = stream_find_by_id(relay_streams_ht, stream_id); if (!stream) { ret = -1; goto end_rcu_unlock; } + session = session_find_by_id(sessions_ht, stream->session_id); + assert(session); + data_size = be32toh(data_hdr.data_size); if (data_buffer_size < data_size) { char *tmp_data_ptr; @@ -2423,7 +2331,7 @@ int relay_process_data(struct relay_command *cmd) * Index are handled in protocol version 2.4 and above. Also, snapshot and * index are NOT supported. */ - if (stream->session->minor >= 4 && !stream->session->snapshot) { + if (session->minor >= 4 && !session->snapshot) { ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { goto end_rcu_unlock; @@ -2449,10 +2357,7 @@ int relay_process_data(struct relay_command *cmd) stream->prev_seq = net_seq_num; - /* Check if we need to close the FD */ - if (close_stream_check(stream)) { - destroy_stream(stream); - } + try_close_stream(session, stream); end_rcu_unlock: rcu_read_unlock(); @@ -2492,19 +2397,6 @@ int relay_add_connection(int fd, struct lttng_poll_event *events, } CDS_INIT_LIST_HEAD(&relay_connection->recv_head); - /* - * Only used by the control side and the reference is copied inside each - * stream from that connection. Thus a destroy HT must be done after every - * stream has been destroyed. - */ - if (relay_connection->type == RELAY_CONTROL) { - relay_connection->ctf_traces_ht = lttng_ht_new(0, - LTTNG_HT_TYPE_STRING); - if (!relay_connection->ctf_traces_ht) { - goto error_read; - } - } - lttng_ht_node_init_ulong(&relay_connection->sock_n, (unsigned long) relay_connection->sock->fd); rcu_read_lock(); @@ -2542,17 +2434,16 @@ void relay_del_connection(struct lttng_ht *relay_connections_ht, assert(!ret); if (relay_connection->type == RELAY_CONTROL) { - struct relay_stream_recv_handle *node, *tmp_node; - - relay_delete_session(relay_connection, sessions_ht); - lttng_ht_destroy(relay_connection->ctf_traces_ht); + struct relay_stream *stream, *tmp_stream; /* Clean up recv list. */ - cds_list_for_each_entry_safe(node, tmp_node, - &relay_connection->recv_head, node) { - cds_list_del(&node->node); - free(node); + cds_list_for_each_entry_safe(stream, tmp_stream, + &relay_connection->recv_head, recv_list) { + cds_list_del(&stream->recv_list); } + + relay_delete_session(relay_connection, sessions_ht); + } call_rcu(&relay_connection->rcu_node, deferred_free_connection); @@ -2788,7 +2679,8 @@ restart: continue; } - ret = relay_process_data(relay_connection); + ret = relay_process_data(relay_connection, + sessions_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2963,13 +2855,13 @@ int main(int argc, char **argv) } /* tables of sessions indexed by session ID */ - relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!relay_ctx->sessions_ht) { goto exit_relay_ctx_sessions; } /* tables of streams indexed by stream ID */ - relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!relay_streams_ht) { goto exit_relay_ctx_streams; }