X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.cpp;fp=src%2Fbin%2Flttng-relayd%2Fmain.cpp;h=e3eb9d72ef9c59a68ddc561abe9a01bf41559d90;hp=9793e277808975728d5f6965e12f8648de35a9ff;hb=56047f5a23df5c2c583a102b8015bbec5a7da9f1;hpb=66cefebdc240cbae0bc79594305f509b0779fa98 diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 9793e2778..e3eb9d72e 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include #include @@ -1517,12 +1518,11 @@ static void publish_connection_local_streams(struct relay_connection *conn) * session lock. */ pthread_mutex_lock(&session->lock); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node) { stream_publish(stream); } - rcu_read_unlock(); /* * Inform the viewer that there are new streams in the session. @@ -2224,21 +2224,25 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, * to iterate over all streams to find the one associated with * the right session_id. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (!stream_get(stream)) { - continue; - } - if (stream->trace->session->id == msg.session_id) { - pthread_mutex_lock(&stream->lock); - stream->data_pending_check_done = false; - pthread_mutex_unlock(&stream->lock); - DBG("Set begin data pending flag to stream %" PRIu64, - stream->stream_handle); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { + if (!stream_get(stream)) { + continue; + } + + if (stream->trace->session->id == msg.session_id) { + pthread_mutex_lock(&stream->lock); + stream->data_pending_check_done = false; + pthread_mutex_unlock(&stream->lock); + DBG("Set begin data pending flag to stream %" PRIu64, + stream->stream_handle); + } + + stream_put(stream); } - stream_put(stream); } - rcu_read_unlock(); memset(&reply, 0, sizeof(reply)); /* All good, send back reply. */ @@ -2299,43 +2303,49 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at * Iterate over all streams to see if the begin data pending * flag is set. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (!stream_get(stream)) { - continue; - } - if (stream->trace->session->id != msg.session_id) { - stream_put(stream); - continue; - } - pthread_mutex_lock(&stream->lock); - if (!stream->data_pending_check_done) { - uint64_t stream_seq; + { + lttng::urcu::read_lock_guard read_lock; - if (session_streams_have_index(conn->session)) { - /* - * Ensure that both the index and stream data have been - * flushed up to the requested point. - */ - stream_seq = - std::min(stream->prev_data_seq, stream->prev_index_seq); - } else { - stream_seq = stream->prev_data_seq; + cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { + if (!stream_get(stream)) { + continue; } - if (!stream->closed || - !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { - is_data_inflight = 1; - DBG("Data is still in flight for stream %" PRIu64, - stream->stream_handle); - pthread_mutex_unlock(&stream->lock); + + if (stream->trace->session->id != msg.session_id) { stream_put(stream); - break; + continue; + } + + pthread_mutex_lock(&stream->lock); + if (!stream->data_pending_check_done) { + uint64_t stream_seq; + + if (session_streams_have_index(conn->session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = std::min(stream->prev_data_seq, + stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; + } + + if (!stream->closed || + !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { + is_data_inflight = 1; + DBG("Data is still in flight for stream %" PRIu64, + stream->stream_handle); + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + break; + } } + + pthread_mutex_unlock(&stream->lock); + stream_put(stream); } - pthread_mutex_unlock(&stream->lock); - stream_put(stream); } - rcu_read_unlock(); memset(&reply, 0, sizeof(reply)); /* All good, send back reply. */ @@ -4161,19 +4171,23 @@ restart: exit: error: /* Cleanup remaining connection object. */ - rcu_read_lock(); - cds_lfht_for_each_entry (relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { - health_code_update(); + { + lttng::urcu::read_lock_guard read_lock; - session_abort(destroy_conn->session); + cds_lfht_for_each_entry ( + relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { + health_code_update(); - /* - * No need to grab another ref, because we own - * destroy_conn. - */ - relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn); + session_abort(destroy_conn->session); + + /* + * No need to grab another ref, because we own + * destroy_conn. + */ + relay_thread_close_connection( + &events, destroy_conn->sock->fd, destroy_conn); + } } - rcu_read_unlock(); (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); error_poll_create: