X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.cpp;h=a0935231d039280598ad538dfcaaa2f8189d3c27;hb=HEAD;hp=60c90bc111a44975a6b7c255879132d77810fde9;hpb=cd9adb8b829564212158943a0d279bb35322ab30;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/stream.cpp b/src/bin/lttng-relayd/stream.cpp index 60c90bc11..a0935231d 100644 --- a/src/bin/lttng-relayd/stream.cpp +++ b/src/bin/lttng-relayd/stream.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -47,7 +48,7 @@ struct relay_stream *stream_get_by_id(uint64_t stream_id) struct lttng_ht_iter iter; struct relay_stream *stream = nullptr; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_lookup(relay_streams_ht, &stream_id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (!node) { @@ -59,7 +60,6 @@ struct relay_stream *stream_get_by_id(uint64_t stream_id) stream = nullptr; } end: - rcu_read_unlock(); return stream; } @@ -807,14 +807,13 @@ static void stream_release(struct urcu_ref *ref) void stream_put(struct relay_stream *stream) { - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(stream->ref.refcount != 0); /* * Wait until we have processed all the stream packets before * actually putting our last stream reference. */ urcu_ref_put(&stream->ref, stream_release); - rcu_read_unlock(); } int stream_set_pending_rotation(struct relay_stream *stream, @@ -1110,9 +1109,6 @@ int stream_write(struct relay_stream *stream, recv_len = packet ? packet->size : 0; recv_len += padding_len; stream->metadata_received += recv_len; - if (recv_len) { - stream->no_new_metadata_notified = false; - } } DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu", @@ -1315,18 +1311,20 @@ static void print_stream_indexes(struct relay_stream *stream) struct lttng_ht_iter iter; struct relay_index *index; - rcu_read_lock(); - cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { - DBG("index %p net_seq_num %" PRIu64 " refcount %ld" - " stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64, - index, - index->index_n.key, - stream->ref.refcount, - index->stream->stream_handle, - index->stream->trace->id, - index->stream->trace->session->id); - } - rcu_read_unlock(); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { + DBG("index %p net_seq_num %" PRIu64 " refcount %ld" + " stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64, + index, + index->index_n.key, + stream->ref.refcount, + index->stream->stream_handle, + index->stream->trace->id, + index->stream->trace->session->id); + } + } } int stream_reset_file(struct relay_stream *stream) @@ -1369,19 +1367,23 @@ void print_relay_streams() return; } - rcu_read_lock(); - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (!stream_get(stream)) { - continue; + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { + if (!stream_get(stream)) { + continue; + } + + DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + stream, + stream->ref.refcount, + stream->stream_handle, + stream->trace->id, + stream->trace->session->id); + print_stream_indexes(stream); + stream_put(stream); } - DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64, - stream, - stream->ref.refcount, - stream->stream_handle, - stream->trace->id, - stream->trace->session->id); - print_stream_indexes(stream); - stream_put(stream); } - rcu_read_unlock(); }