X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.cpp;h=e3eb9d72ef9c59a68ddc561abe9a01bf41559d90;hb=HEAD;hp=e7598974de380e57ce76636d80b4d9474e3ed769;hpb=cd9adb8b829564212158943a0d279bb35322ab30;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index e7598974d..ed5ec9d8f 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -25,6 +25,7 @@ #include "tracefile-array.hpp" #include "utils.hpp" #include "version.hpp" +#include "viewer-session.hpp" #include "viewer-stream.hpp" #include @@ -42,10 +43,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include @@ -167,6 +170,9 @@ struct lttng_ht *viewer_streams_ht; /* Global relay sessions hash table. */ struct lttng_ht *sessions_ht; +/* Global viewer sessions hash table. */ +struct lttng_ht *viewer_sessions_ht; + /* Relayd health monitoring */ struct health_app *health_relayd; @@ -490,7 +496,7 @@ static int config_entry_handler(const struct config_entry *entry, for (i = 0; i < (sizeof(long_options) / sizeof(struct option)) - 1; i++) { /* Ignore if entry name is not fully matched. */ - if (strcmp(entry->name, long_options[i].name)) { + if (strcmp(entry->name, long_options[i].name) != 0) { continue; } @@ -772,11 +778,13 @@ static void relayd_cleanup() if (viewer_streams_ht) lttng_ht_destroy(viewer_streams_ht); + if (viewer_sessions_ht) { + lttng_ht_destroy(viewer_sessions_ht); + } if (relay_streams_ht) lttng_ht_destroy(relay_streams_ht); if (sessions_ht) lttng_ht_destroy(sessions_ht); - free(opt_output_path); free(opt_working_directory); @@ -1320,7 +1328,13 @@ static void *relay_thread_dispatcher(void *data __attribute__((unused))) * the data will be read at some point in time * or wait to the end of the world :) */ - ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn)); + ret = lttng_write( + relay_conn_pipe[1], &new_conn, sizeof(new_conn)); /* NOLINT + sizeof + used + on a + pointer. + */ if (ret < 0) { PERROR("write connection pipe"); connection_put(new_conn); @@ -1504,27 +1518,79 @@ end: */ static void publish_connection_local_streams(struct relay_connection *conn) { - struct relay_stream *stream; struct relay_session *session = conn->session; + unsigned int created = 0; + bool closed = false; + + LTTNG_ASSERT(viewer_sessions_ht); /* * We publish all streams belonging to a session atomically wrt * session lock. */ - pthread_mutex_lock(&session->lock); - rcu_read_lock(); - cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node) - { + const lttng::pthread::lock_guard session_lock(session->lock); + + for (auto *stream : + lttng::urcu::rcu_list_iteration_adapter( + session->recv_list)) { stream_publish(stream); } - rcu_read_unlock(); /* * Inform the viewer that there are new streams in the session. */ - if (session->viewer_attached) { - uatomic_set(&session->new_streams, 1); + if (!session->viewer_attached) { + goto unlock; + } + + /* + * Create viewer_streams for all the newly published streams for this relay session. + * This searches through all known viewer sessions and finds those that are + * attached to this connection's relay session. This is done so that the newer + * viewer streams will hold a reference on any relay streams that already exist, + * but may be unpublished between now and the next GET_NEW_STREAMS from the + * attached live viewer. + */ + for (auto *viewer_session : + lttng::urcu::lfht_iteration_adapter( + *viewer_sessions_ht->ht)) { + for (auto *session_iter : + lttng::urcu::rcu_list_iteration_adapter( + viewer_session->session_list)) { + if (session != session_iter) { + continue; + } + const int ret = make_viewer_streams(session, + viewer_session, + LTTNG_VIEWER_SEEK_BEGINNING, + nullptr, + nullptr, + &created, + &closed); + if (ret == 0) { + DBG("Created %d new viewer streams during publication of relay streams for relay session %" PRIu64, + created, + session->id); + } else if (ret < 0) { + /* + * Warning, since the creation of the + * streams will be retried when the viewer + * next sends the GET_NEW_STREAMS again. + */ + WARN("Failed to create new viewer streams during publication of relay streams for relay session %" PRIu64 + ", ret=%d, created=%d, closed=%d", + session->id, + ret, + created, + closed); + } + } } +unlock: + uatomic_set(&session->new_streams, 1); pthread_mutex_unlock(&session->lock); } @@ -2187,10 +2253,8 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, { int ret; ssize_t send_ret; - struct lttng_ht_iter iter; struct lttcomm_relayd_begin_data_pending msg; struct lttcomm_relayd_generic_reply reply; - struct relay_stream *stream; LTTNG_ASSERT(recv_hdr); LTTNG_ASSERT(conn); @@ -2219,11 +2283,14 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, * to iterate over all streams to find the one associated with * the right session_id. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*relay_streams_ht->ht)) { if (!stream_get(stream)) { continue; } + if (stream->trace->session->id == msg.session_id) { pthread_mutex_lock(&stream->lock); stream->data_pending_check_done = false; @@ -2231,9 +2298,9 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, DBG("Set begin data pending flag to stream %" PRIu64, stream->stream_handle); } + stream_put(stream); } - rcu_read_unlock(); memset(&reply, 0, sizeof(reply)); /* All good, send back reply. */ @@ -2266,10 +2333,8 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at { int ret; ssize_t send_ret; - struct lttng_ht_iter iter; struct lttcomm_relayd_end_data_pending msg; struct lttcomm_relayd_generic_reply reply; - struct relay_stream *stream; uint32_t is_data_inflight = 0; DBG("End data pending command"); @@ -2294,15 +2359,19 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at * Iterate over all streams to see if the begin data pending * flag is set. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*relay_streams_ht->ht)) { if (!stream_get(stream)) { continue; } + if (stream->trace->session->id != msg.session_id) { stream_put(stream); continue; } + pthread_mutex_lock(&stream->lock); if (!stream->data_pending_check_done) { uint64_t stream_seq; @@ -2317,6 +2386,7 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at } else { stream_seq = stream->prev_data_seq; } + if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { is_data_inflight = 1; @@ -2327,10 +2397,10 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at break; } } + pthread_mutex_unlock(&stream->lock); stream_put(stream); } - rcu_read_unlock(); memset(&reply, 0, sizeof(reply)); /* All good, send back reply. */ @@ -3883,8 +3953,6 @@ static void *relay_thread_worker(void *data __attribute__((unused))) uint32_t nb_fd; struct lttng_poll_event events; struct lttng_ht *relay_connections_ht; - struct lttng_ht_iter iter; - struct relay_connection *destroy_conn = nullptr; DBG("[thread] Relay worker started"); @@ -3961,7 +4029,10 @@ restart: if (revents & LPOLLIN) { struct relay_connection *conn; - ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn)); + ret = lttng_read(relay_conn_pipe[0], + &conn, + sizeof(conn)); /* NOLINT sizeof used on a + pointer. */ if (ret < 0) { goto error; } @@ -4059,7 +4130,7 @@ restart: if (last_seen_data_fd >= 0) { for (i = 0; i < nb_fd; i++) { - int pollfd = LTTNG_POLL_GETFD(&events, i); + const int pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); @@ -4073,8 +4144,8 @@ restart: /* Process data connection. */ for (i = idx + 1; i < nb_fd; i++) { /* Fetch the poll data. */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); + const uint32_t revents = LTTNG_POLL_GETEV(&events, i); + const int pollfd = LTTNG_POLL_GETFD(&events, i); struct relay_connection *data_conn; health_code_update(); @@ -4153,8 +4224,11 @@ restart: exit: error: /* Cleanup remaining connection object. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { + for (auto *destroy_conn : + lttng::urcu::lfht_iteration_adapter( + *relay_connections_ht->ht)) { health_code_update(); session_abort(destroy_conn->session); @@ -4165,7 +4239,6 @@ error: */ relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn); } - rcu_read_unlock(); (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); error_poll_create: @@ -4380,6 +4453,13 @@ int main(int argc, char **argv) goto exit_options; } + /* tables of viewer sessions indexed by session ID */ + viewer_sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!viewer_sessions_ht) { + retval = -1; + goto exit_options; + } + /* tables of streams indexed by stream ID */ viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!viewer_streams_ht) {