X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.cpp;h=699778d0e4ef2c144e4bcaba390387750b2f7606;hb=bb9f214082555e181b5a0dc7a7dfdc03a5b827cd;hp=786feaae83651acf86f3115b0cb0f3530552cf2a;hpb=5c7248cd5bce45bf64d563fb4e130a63bf345f11;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index 786feaae8..699778d0e 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -237,28 +238,33 @@ static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) static int check_new_streams(struct relay_connection *conn) { struct relay_session *session; - unsigned long current_val; int ret = 0; if (!conn->viewer_session) { goto end; } - rcu_read_lock(); - cds_list_for_each_entry_rcu( - session, &conn->viewer_session->session_list, viewer_session_node) + { - if (!session_get(session)) { - continue; - } - current_val = uatomic_cmpxchg(&session->new_streams, 1, 0); - ret = current_val; - session_put(session); - if (ret == 1) { - goto end; + lttng::urcu::read_lock_guard read_lock; + cds_list_for_each_entry_rcu( + session, &conn->viewer_session->session_list, viewer_session_node) + { + if (!session_get(session)) { + continue; + } + + ret = uatomic_read(&session->new_streams); + session_put(session); + if (ret == 1) { + goto end; + } } } + end: - rcu_read_unlock(); + DBG("Viewer connection has%s new streams: socket_fd = %d", + ret == 0 ? " no" : "", + conn->sock->fd); return ret; } @@ -275,63 +281,65 @@ send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int struct lttng_ht_iter iter; struct relay_viewer_stream *vstream; - rcu_read_lock(); + { + lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { - struct ctf_trace *ctf_trace; - struct lttng_viewer_stream send_stream = {}; + cds_lfht_for_each_entry ( + viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { + struct ctf_trace *ctf_trace; + struct lttng_viewer_stream send_stream = {}; - health_code_update(); + health_code_update(); - if (!viewer_stream_get(vstream)) { - continue; - } + if (!viewer_stream_get(vstream)) { + continue; + } - pthread_mutex_lock(&vstream->stream->lock); - /* Ignore if not the same session. */ - if (vstream->stream->trace->session->id != session_id || - (!ignore_sent_flag && vstream->sent_flag)) { - pthread_mutex_unlock(&vstream->stream->lock); - viewer_stream_put(vstream); - continue; - } + pthread_mutex_lock(&vstream->stream->lock); + /* Ignore if not the same session. */ + if (vstream->stream->trace->session->id != session_id || + (!ignore_sent_flag && vstream->sent_flag)) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + continue; + } - ctf_trace = vstream->stream->trace; - send_stream.id = htobe64(vstream->stream->stream_handle); - send_stream.ctf_trace_id = htobe64(ctf_trace->id); - send_stream.metadata_flag = htobe32(vstream->stream->is_metadata); - if (lttng_strncpy(send_stream.path_name, - vstream->path_name, - sizeof(send_stream.path_name))) { - pthread_mutex_unlock(&vstream->stream->lock); - viewer_stream_put(vstream); - ret = -1; /* Error. */ - goto end_unlock; - } - if (lttng_strncpy(send_stream.channel_name, - vstream->channel_name, - sizeof(send_stream.channel_name))) { - pthread_mutex_unlock(&vstream->stream->lock); - viewer_stream_put(vstream); - ret = -1; /* Error. */ - goto end_unlock; - } + ctf_trace = vstream->stream->trace; + send_stream.id = htobe64(vstream->stream->stream_handle); + send_stream.ctf_trace_id = htobe64(ctf_trace->id); + send_stream.metadata_flag = htobe32(vstream->stream->is_metadata); + if (lttng_strncpy(send_stream.path_name, + vstream->path_name, + sizeof(send_stream.path_name))) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + ret = -1; /* Error. */ + goto end; + } + if (lttng_strncpy(send_stream.channel_name, + vstream->channel_name, + sizeof(send_stream.channel_name))) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); + ret = -1; /* Error. */ + goto end; + } - DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle); - vstream->sent_flag = true; - pthread_mutex_unlock(&vstream->stream->lock); + DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle); + vstream->sent_flag = true; + pthread_mutex_unlock(&vstream->stream->lock); - ret = send_response(sock, &send_stream, sizeof(send_stream)); - viewer_stream_put(vstream); - if (ret < 0) { - goto end_unlock; + ret = send_response(sock, &send_stream, sizeof(send_stream)); + viewer_stream_put(vstream); + if (ret < 0) { + goto end; + } } } ret = 0; -end_unlock: - rcu_read_unlock(); +end: return ret; } @@ -370,191 +378,201 @@ static int make_viewer_streams(struct relay_session *relay_session, * Create viewer streams for relay streams that are ready to be * used for a the given session id only. */ - rcu_read_lock(); - cds_lfht_for_each_entry ( - relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) { - bool trace_has_metadata_stream = false; - - health_code_update(); - - if (!ctf_trace_get(ctf_trace)) { - continue; - } + { + lttng::urcu::read_lock_guard read_lock; - /* - * Iterate over all the streams of the trace to see if we have a - * metadata stream. - */ - cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node) - { - bool is_metadata_stream; + cds_lfht_for_each_entry ( + relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) { + bool trace_has_metadata_stream = false; - pthread_mutex_lock(&relay_stream->lock); - is_metadata_stream = relay_stream->is_metadata; - pthread_mutex_unlock(&relay_stream->lock); + health_code_update(); - if (is_metadata_stream) { - trace_has_metadata_stream = true; - break; + if (!ctf_trace_get(ctf_trace)) { + continue; } - } - - relay_stream = nullptr; - - /* - * If there is no metadata stream in this trace at the moment - * and we never sent one to the viewer, skip the trace. We - * accept that the viewer will not see this trace at all. - */ - if (!trace_has_metadata_stream && !ctf_trace->metadata_stream_sent_to_viewer) { - ctf_trace_put(ctf_trace); - continue; - } - - cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node) - { - struct relay_viewer_stream *viewer_stream; - if (!stream_get(relay_stream)) { - continue; + /* + * Iterate over all the streams of the trace to see if we have a + * metadata stream. + */ + cds_list_for_each_entry_rcu( + relay_stream, &ctf_trace->stream_list, stream_node) + { + bool is_metadata_stream; + + pthread_mutex_lock(&relay_stream->lock); + is_metadata_stream = relay_stream->is_metadata; + pthread_mutex_unlock(&relay_stream->lock); + + if (is_metadata_stream) { + trace_has_metadata_stream = true; + break; + } } - pthread_mutex_lock(&relay_stream->lock); + relay_stream = nullptr; + /* - * stream published is protected by the session lock. + * If there is no metadata stream in this trace at the moment + * and we never sent one to the viewer, skip the trace. We + * accept that the viewer will not see this trace at all. */ - if (!relay_stream->published) { - goto next; + if (!trace_has_metadata_stream && + !ctf_trace->metadata_stream_sent_to_viewer) { + ctf_trace_put(ctf_trace); + continue; } - viewer_stream = viewer_stream_get_by_id(relay_stream->stream_handle); - if (!viewer_stream) { - struct lttng_trace_chunk *viewer_stream_trace_chunk = nullptr; - /* - * Save that we sent the metadata stream to the - * viewer. So that we know what trace the viewer - * is aware of. - */ - if (relay_stream->is_metadata) { - ctf_trace->metadata_stream_sent_to_viewer = true; + cds_list_for_each_entry_rcu( + relay_stream, &ctf_trace->stream_list, stream_node) + { + struct relay_viewer_stream *viewer_stream; + + if (!stream_get(relay_stream)) { + continue; } + pthread_mutex_lock(&relay_stream->lock); /* - * If a rotation is ongoing, use a copy of the - * relay stream's chunk to ensure the stream - * files exist. - * - * Otherwise, the viewer session's current trace - * chunk can be used safely. + * stream published is protected by the session lock. */ - if ((relay_stream->ongoing_rotation.is_set || - session_has_ongoing_rotation(relay_session)) && - relay_stream->trace_chunk) { - viewer_stream_trace_chunk = - lttng_trace_chunk_copy(relay_stream->trace_chunk); - if (!viewer_stream_trace_chunk) { - ret = -1; - ctf_trace_put(ctf_trace); - goto error_unlock; + if (!relay_stream->published) { + goto next; + } + viewer_stream = + viewer_stream_get_by_id(relay_stream->stream_handle); + if (!viewer_stream) { + struct lttng_trace_chunk *viewer_stream_trace_chunk = + nullptr; + + /* + * Save that we sent the metadata stream to the + * viewer. So that we know what trace the viewer + * is aware of. + */ + if (relay_stream->is_metadata) { + ctf_trace->metadata_stream_sent_to_viewer = true; } - } else { + /* - * Transition the viewer session into the newest trace chunk - * available. + * If a rotation is ongoing, use a copy of the + * relay stream's chunk to ensure the stream + * files exist. + * + * Otherwise, the viewer session's current trace + * chunk can be used safely. */ - if (!lttng_trace_chunk_ids_equal( - viewer_session->current_trace_chunk, - relay_stream->trace_chunk)) { - ret = viewer_session_set_trace_chunk_copy( - viewer_session, relay_stream->trace_chunk); - if (ret) { + if ((relay_stream->ongoing_rotation.is_set || + session_has_ongoing_rotation(relay_session)) && + relay_stream->trace_chunk) { + viewer_stream_trace_chunk = lttng_trace_chunk_copy( + relay_stream->trace_chunk); + if (!viewer_stream_trace_chunk) { ret = -1; ctf_trace_put(ctf_trace); goto error_unlock; } - } - - if (relay_stream->trace_chunk) { + } else { /* - * If the corresponding relay - * stream's trace chunk is set, - * the viewer stream will be - * created under it. - * - * Note that a relay stream can - * have a NULL output trace - * chunk (for instance, after a - * clear against a stopped - * session). + * Transition the viewer session into the newest + * trace chunk available. */ - const bool reference_acquired = - lttng_trace_chunk_get( - viewer_session->current_trace_chunk); + if (!lttng_trace_chunk_ids_equal( + viewer_session->current_trace_chunk, + relay_stream->trace_chunk)) { + ret = viewer_session_set_trace_chunk_copy( + viewer_session, + relay_stream->trace_chunk); + if (ret) { + ret = -1; + ctf_trace_put(ctf_trace); + goto error_unlock; + } + } - LTTNG_ASSERT(reference_acquired); - viewer_stream_trace_chunk = - viewer_session->current_trace_chunk; + if (relay_stream->trace_chunk) { + /* + * If the corresponding relay + * stream's trace chunk is set, + * the viewer stream will be + * created under it. + * + * Note that a relay stream can + * have a NULL output trace + * chunk (for instance, after a + * clear against a stopped + * session). + */ + const bool reference_acquired = + lttng_trace_chunk_get( + viewer_session + ->current_trace_chunk); + + LTTNG_ASSERT(reference_acquired); + viewer_stream_trace_chunk = + viewer_session->current_trace_chunk; + } } - } - viewer_stream = viewer_stream_create( - relay_stream, viewer_stream_trace_chunk, seek_t); - lttng_trace_chunk_put(viewer_stream_trace_chunk); - viewer_stream_trace_chunk = nullptr; - if (!viewer_stream) { - ret = -1; - ctf_trace_put(ctf_trace); - goto error_unlock; - } + viewer_stream = viewer_stream_create( + relay_stream, viewer_stream_trace_chunk, seek_t); + lttng_trace_chunk_put(viewer_stream_trace_chunk); + viewer_stream_trace_chunk = nullptr; + if (!viewer_stream) { + ret = -1; + ctf_trace_put(ctf_trace); + goto error_unlock; + } - if (nb_created) { - /* Update number of created stream counter. */ - (*nb_created)++; - } - /* - * Ensure a self-reference is preserved even - * after we have put our local reference. - */ - if (!viewer_stream_get(viewer_stream)) { - ERR("Unable to get self-reference on viewer stream, logic error."); - abort(); - } - } else { - if (!viewer_stream->sent_flag && nb_unsent) { - /* Update number of unsent stream counter. */ - (*nb_unsent)++; - } - } - /* Update number of total stream counter. */ - if (nb_total) { - if (relay_stream->is_metadata) { - if (!relay_stream->closed || - relay_stream->metadata_received > - viewer_stream->metadata_sent) { - (*nb_total)++; + if (nb_created) { + /* Update number of created stream counter. */ + (*nb_created)++; + } + /* + * Ensure a self-reference is preserved even + * after we have put our local reference. + */ + if (!viewer_stream_get(viewer_stream)) { + ERR("Unable to get self-reference on viewer stream, logic error."); + abort(); } } else { - if (!relay_stream->closed || - !(((int64_t) (relay_stream->prev_data_seq - - relay_stream->last_net_seq_num)) >= 0)) { - (*nb_total)++; + if (!viewer_stream->sent_flag && nb_unsent) { + /* Update number of unsent stream counter. */ + (*nb_unsent)++; + } + } + /* Update number of total stream counter. */ + if (nb_total) { + if (relay_stream->is_metadata) { + if (!relay_stream->closed || + relay_stream->metadata_received > + viewer_stream->metadata_sent) { + (*nb_total)++; + } + } else { + if (!relay_stream->closed || + !(((int64_t) (relay_stream->prev_data_seq - + relay_stream->last_net_seq_num)) >= + 0)) { + (*nb_total)++; + } } } + /* Put local reference. */ + viewer_stream_put(viewer_stream); + next: + pthread_mutex_unlock(&relay_stream->lock); + stream_put(relay_stream); } - /* Put local reference. */ - viewer_stream_put(viewer_stream); - next: - pthread_mutex_unlock(&relay_stream->lock); - stream_put(relay_stream); + relay_stream = nullptr; + ctf_trace_put(ctf_trace); } - relay_stream = nullptr; - ctf_trace_put(ctf_trace); } ret = 0; error_unlock: - rcu_read_unlock(); if (relay_stream) { pthread_mutex_unlock(&relay_stream->lock); @@ -1042,61 +1060,65 @@ static int viewer_list_sessions(struct relay_connection *conn) return -1; } - rcu_read_lock(); - cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) { - struct lttng_viewer_session *send_session; + { + lttng::urcu::read_lock_guard read_lock; - health_code_update(); + cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) { + struct lttng_viewer_session *send_session; - pthread_mutex_lock(&session->lock); - if (session->connection_closed) { - /* Skip closed session */ - goto next_session; - } + health_code_update(); + + pthread_mutex_lock(&session->lock); + if (session->connection_closed) { + /* Skip closed session */ + goto next_session; + } - if (count >= buf_count) { - struct lttng_viewer_session *newbuf; - uint32_t new_buf_count = buf_count << 1; + if (count >= buf_count) { + struct lttng_viewer_session *newbuf; + uint32_t new_buf_count = buf_count << 1; - newbuf = (lttng_viewer_session *) realloc( - send_session_buf, new_buf_count * sizeof(*send_session_buf)); - if (!newbuf) { + newbuf = (lttng_viewer_session *) realloc( + send_session_buf, + new_buf_count * sizeof(*send_session_buf)); + if (!newbuf) { + ret = -1; + goto break_loop; + } + send_session_buf = newbuf; + buf_count = new_buf_count; + } + send_session = &send_session_buf[count]; + if (lttng_strncpy(send_session->session_name, + session->session_name, + sizeof(send_session->session_name))) { ret = -1; goto break_loop; } - send_session_buf = newbuf; - buf_count = new_buf_count; - } - send_session = &send_session_buf[count]; - if (lttng_strncpy(send_session->session_name, - session->session_name, - sizeof(send_session->session_name))) { - ret = -1; - goto break_loop; - } - if (lttng_strncpy(send_session->hostname, - session->hostname, - sizeof(send_session->hostname))) { - ret = -1; - goto break_loop; - } - send_session->id = htobe64(session->id); - send_session->live_timer = htobe32(session->live_timer); - if (session->viewer_attached) { - send_session->clients = htobe32(1); - } else { - send_session->clients = htobe32(0); + if (lttng_strncpy(send_session->hostname, + session->hostname, + sizeof(send_session->hostname))) { + ret = -1; + goto break_loop; + } + send_session->id = htobe64(session->id); + send_session->live_timer = htobe32(session->live_timer); + if (session->viewer_attached) { + send_session->clients = htobe32(1); + } else { + send_session->clients = htobe32(0); + } + send_session->streams = htobe32(session->stream_count); + count++; + next_session: + pthread_mutex_unlock(&session->lock); + continue; + break_loop: + pthread_mutex_unlock(&session->lock); + break; } - send_session->streams = htobe32(session->stream_count); - count++; - next_session: - pthread_mutex_unlock(&session->lock); - continue; - break_loop: - pthread_mutex_unlock(&session->lock); - break; } - rcu_read_unlock(); + if (ret < 0) { goto end_free; } @@ -1199,6 +1221,8 @@ static int viewer_get_new_streams(struct relay_connection *conn) response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply_unlock; } + + uatomic_set(&session->new_streams, 0); send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); @@ -1642,6 +1666,7 @@ static int viewer_get_next_index(struct relay_connection *conn) bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind; uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL; enum lttng_trace_chunk_status status; + bool attached_sessions_have_new_streams = false; LTTNG_ASSERT(conn); @@ -1693,6 +1718,17 @@ static int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } + ret = check_new_streams(conn); + if (ret < 0) { + viewer_index.status = LTTNG_VIEWER_INDEX_ERR; + ERR("Error checking for new streams in the attached sessions, returning status=%s", + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); + goto send_reply; + } else if (ret == 1) { + attached_sessions_have_new_streams = true; + } + if (rstream->ongoing_rotation.is_set) { /* Rotation is ongoing, try again later. */ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; @@ -1802,6 +1838,7 @@ static int viewer_get_next_index(struct relay_connection *conn) */ goto send_reply; } + /* At this point, ret is 0 thus we will be able to read the index. */ LTTNG_ASSERT(!ret); @@ -1880,19 +1917,6 @@ static int viewer_get_next_index(struct relay_connection *conn) vstream->stream_file.handle = fs_handle; } - ret = check_new_streams(conn); - if (ret < 0) { - viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - ERR("Error checking for new streams before sending new index to stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); - goto send_reply; - } else if (ret == 1) { - viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; - } - ret = lttng_index_file_read(vstream->index_file, &packet_index); if (ret) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; @@ -1943,6 +1967,10 @@ send_reply: pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); } + if (attached_sessions_have_new_streams) { + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; + } + viewer_index.flags = htobe32(viewer_index.flags); viewer_index.status = htobe32(viewer_index.status); health_code_update(); @@ -2109,6 +2137,7 @@ static int viewer_get_metadata(struct relay_connection *conn) struct lttng_viewer_get_metadata request; struct lttng_viewer_metadata_packet reply; struct relay_viewer_stream *vstream = nullptr; + bool dispose_of_stream = false; LTTNG_ASSERT(conn); @@ -2137,6 +2166,9 @@ static int viewer_get_metadata(struct relay_connection *conn) reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; } + + pthread_mutex_lock(&vstream->stream->trace->session->lock); + pthread_mutex_lock(&vstream->stream->trace->lock); pthread_mutex_lock(&vstream->stream->lock); if (!vstream->stream->is_metadata) { ERR("Invalid metadata stream"); @@ -2145,11 +2177,7 @@ static int viewer_get_metadata(struct relay_connection *conn) if (vstream->metadata_sent >= vstream->stream->metadata_received) { /* - * The live viewers expect to receive a NO_NEW_METADATA - * status before a stream disappears, otherwise they abort the - * entire live connection when receiving an error status. - * - * Clear feature resets the metadata_sent to 0 until the + * Clear feature resets the metadata_received to 0 until the * same metadata is received again. */ reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); @@ -2157,20 +2185,7 @@ static int viewer_get_metadata(struct relay_connection *conn) * The live viewer considers a closed 0 byte metadata stream as * an error. */ - if (vstream->metadata_sent > 0) { - if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) { - /* - * Release ownership for the viewer metadata - * stream. Note that this reference is the - * viewer's reference. The vstream still exists - * until the end of the function as - * viewer_stream_get_by_id() took a reference. - */ - viewer_stream_put(vstream); - } - - vstream->stream->no_new_metadata_notified = true; - } + dispose_of_stream = vstream->metadata_sent > 0 && vstream->stream->closed; goto send_reply; } @@ -2208,6 +2223,19 @@ static int viewer_get_metadata(struct relay_connection *conn) len = vstream->stream->metadata_received - vstream->metadata_sent; if (!vstream->stream_file.trace_chunk) { + if (vstream->stream->trace->session->connection_closed) { + /* + * If the connection is closed, there is no way for the metadata stream + * to ever transition back to an active chunk. As such, signal to the viewer + * that there is no new metadata available. + * + * The stream can be disposed-of. On the next execution of this command, + * the relay daemon will reply with an error status since the stream can't + * be found. + */ + dispose_of_stream = true; + } + reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); len = 0; goto send_reply; @@ -2338,6 +2366,8 @@ send_reply: health_code_update(); if (vstream) { pthread_mutex_unlock(&vstream->stream->lock); + pthread_mutex_unlock(&vstream->stream->trace->lock); + pthread_mutex_unlock(&vstream->stream->trace->session->lock); } ret = send_response(conn->sock, &reply, sizeof(reply)); if (ret < 0) { @@ -2363,7 +2393,22 @@ end_free: end: if (vstream) { viewer_stream_put(vstream); + if (dispose_of_stream) { + /* + * Trigger the destruction of the viewer stream + * by releasing its global reference. + * + * The live viewers expect to receive a NO_NEW_METADATA + * status before a stream disappears, otherwise they abort the + * entire live connection when receiving an error status. + * + * On the next query for this stream, an error will be reported to the + * client. + */ + viewer_stream_put(vstream); + } } + return ret; } @@ -2717,12 +2762,15 @@ error: (void) fd_tracker_util_poll_clean(the_fd_tracker, &events); /* Cleanup remaining connection object. */ - rcu_read_lock(); - cds_lfht_for_each_entry (viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { - health_code_update(); - connection_put(destroy_conn); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry ( + viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { + health_code_update(); + connection_put(destroy_conn); + } } - rcu_read_unlock(); error_poll_create: lttng_ht_destroy(viewer_connections_ht); viewer_connections_ht_error: