X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fbin%2Flttng-relayd%2Fstream.cpp;h=fbecdc55804acdc151732a17188b13fa1b969186;hb=HEAD;hp=9df6e52fb52204abfa3bce423503394b72a41b35;hpb=28ab034a2c3582d07d3423d2d746731f87d3969f;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/stream.cpp b/src/bin/lttng-relayd/stream.cpp index 9df6e52fb..a0935231d 100644 --- a/src/bin/lttng-relayd/stream.cpp +++ b/src/bin/lttng-relayd/stream.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -45,9 +46,9 @@ struct relay_stream *stream_get_by_id(uint64_t stream_id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct relay_stream *stream = NULL; + struct relay_stream *stream = nullptr; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_lookup(relay_streams_ht, &stream_id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (!node) { @@ -56,10 +57,9 @@ struct relay_stream *stream_get_by_id(uint64_t stream_id) } stream = lttng::utils::container_of(node, &relay_stream::node); if (!stream_get(stream)) { - stream = NULL; + stream = nullptr; } end: - rcu_read_unlock(); return stream; } @@ -93,7 +93,7 @@ static int stream_create_data_output_file_from_trace_chunk(struct relay_stream * stream->channel_name, stream->tracefile_size, stream->tracefile_current_index, - NULL, + nullptr, stream_path, sizeof(stream_path)); if (ret < 0) { @@ -145,7 +145,7 @@ static int stream_rotate_data_file(struct relay_stream *stream) if (stream->file) { fs_handle_close(stream->file); - stream->file = NULL; + stream->file = nullptr; } stream->tracefile_wrapped_around = false; @@ -203,8 +203,8 @@ static int rotate_truncate_stream(struct relay_stream *stream) off_t lseek_ret, previous_stream_copy_origin; uint64_t copy_bytes_left, misplaced_data_size; bool acquired_reference; - struct fs_handle *previous_stream_file = NULL; - struct lttng_trace_chunk *previous_chunk = NULL; + struct fs_handle *previous_stream_file = nullptr; + struct lttng_trace_chunk *previous_chunk = nullptr; if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) { ERR("Protocol error encoutered in %s(): stream rotation " @@ -236,7 +236,7 @@ static int rotate_truncate_stream(struct relay_stream *stream) */ LTTNG_ASSERT(stream->file); previous_stream_file = stream->file; - stream->file = NULL; + stream->file = nullptr; LTTNG_ASSERT(!stream->is_metadata); LTTNG_ASSERT(stream->tracefile_size_current > stream->pos_after_last_complete_data_index); @@ -417,7 +417,7 @@ static int create_index_file(struct relay_stream *stream, struct lttng_trace_chu { int ret; uint32_t major, minor; - char *index_subpath = NULL; + char *index_subpath = nullptr; enum lttng_trace_chunk_status status; ASSERT_LOCKED(stream->lock); @@ -425,7 +425,7 @@ static int create_index_file(struct relay_stream *stream, struct lttng_trace_chu /* Put ref on previous index_file. */ if (stream->index_file) { lttng_index_file_put(stream->index_file); - stream->index_file = NULL; + stream->index_file = nullptr; } major = stream->trace->session->major; minor = stream->trace->session->minor; @@ -516,7 +516,7 @@ static int try_rotate_stream_index(struct relay_stream *stream) DBG("Rotating stream %" PRIu64 " index file", stream->stream_handle); if (stream->index_file) { lttng_index_file_put(stream->index_file); - stream->index_file = NULL; + stream->index_file = nullptr; } stream->ongoing_rotation.value.index_rotated = true; @@ -556,7 +556,7 @@ static int stream_set_trace_chunk(struct relay_stream *stream, struct lttng_trac if (stream->file) { fs_handle_close(stream->file); - stream->file = NULL; + stream->file = nullptr; } ret = stream_create_data_output_file_from_trace_chunk(stream, chunk, false, &stream->file); end: @@ -574,13 +574,13 @@ struct relay_stream *stream_create(struct ctf_trace *trace, uint64_t tracefile_count) { int ret; - struct relay_stream *stream = NULL; + struct relay_stream *stream = nullptr; struct relay_session *session = trace->session; bool acquired_reference = false; struct lttng_trace_chunk *current_trace_chunk; stream = zmalloc(); - if (stream == NULL) { + if (stream == nullptr) { PERROR("relay stream zmalloc"); goto error_no_alloc; } @@ -596,7 +596,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream->channel_name = channel_name; stream->beacon_ts_end = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); - pthread_mutex_init(&stream->lock, NULL); + pthread_mutex_init(&stream->lock, nullptr); urcu_ref_init(&stream->ref); ctf_trace_get(trace); stream->trace = trace; @@ -665,10 +665,10 @@ end: if (ret) { if (stream->file) { fs_handle_close(stream->file); - stream->file = NULL; + stream->file = nullptr; } stream_put(stream); - stream = NULL; + stream = nullptr; } if (acquired_reference) { lttng_trace_chunk_put(current_trace_chunk); @@ -682,7 +682,7 @@ error_no_alloc: */ free(path_name); free(channel_name); - return NULL; + return nullptr; } /* @@ -788,33 +788,32 @@ static void stream_release(struct urcu_ref *ref) if (stream->file) { fs_handle_close(stream->file); - stream->file = NULL; + stream->file = nullptr; } if (stream->index_file) { lttng_index_file_put(stream->index_file); - stream->index_file = NULL; + stream->index_file = nullptr; } if (stream->trace) { ctf_trace_put(stream->trace); - stream->trace = NULL; + stream->trace = nullptr; } stream_complete_rotation(stream); lttng_trace_chunk_put(stream->trace_chunk); - stream->trace_chunk = NULL; + stream->trace_chunk = nullptr; call_rcu(&stream->rcu_node, stream_destroy_rcu); } void stream_put(struct relay_stream *stream) { - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(stream->ref.refcount != 0); /* * Wait until we have processed all the stream packets before * actually putting our last stream reference. */ urcu_ref_put(&stream->ref, stream_release); - rcu_read_unlock(); } int stream_set_pending_rotation(struct relay_stream *stream, @@ -972,14 +971,14 @@ void try_stream_close(struct relay_stream *stream) /* Put stream fd before put chunk. */ if (stream->file) { fs_handle_close(stream->file); - stream->file = NULL; + stream->file = nullptr; } if (stream->index_file) { lttng_index_file_put(stream->index_file); - stream->index_file = NULL; + stream->index_file = nullptr; } lttng_trace_chunk_put(stream->trace_chunk); - stream->trace_chunk = NULL; + stream->trace_chunk = nullptr; pthread_mutex_unlock(&stream->lock); DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle); stream_put(stream); @@ -1029,7 +1028,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size, bool *fi if (stream->file) { fs_handle_close(stream->file); - stream->file = NULL; + stream->file = nullptr; } ret = stream_create_data_output_file_from_trace_chunk( stream, stream->trace_chunk, false, &stream->file); @@ -1110,9 +1109,6 @@ int stream_write(struct relay_stream *stream, recv_len = packet ? packet->size : 0; recv_len += padding_len; stream->metadata_received += recv_len; - if (recv_len) { - stream->no_new_metadata_notified = false; - } } DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu", @@ -1169,7 +1165,7 @@ int stream_update_index(struct relay_stream *stream, stream->stream_handle); /* Put self-ref for this index due to error. */ relay_index_put(index); - index = NULL; + index = nullptr; goto end; } } @@ -1178,7 +1174,7 @@ int stream_update_index(struct relay_stream *stream, ret = -1; /* Put self-ref for this index due to error. */ relay_index_put(index); - index = NULL; + index = nullptr; goto end; } @@ -1315,18 +1311,20 @@ static void print_stream_indexes(struct relay_stream *stream) struct lttng_ht_iter iter; struct relay_index *index; - rcu_read_lock(); - cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { - DBG("index %p net_seq_num %" PRIu64 " refcount %ld" - " stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64, - index, - index->index_n.key, - stream->ref.refcount, - index->stream->stream_handle, - index->stream->trace->id, - index->stream->trace->session->id); - } - rcu_read_unlock(); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (stream->indexes_ht->ht, &iter.iter, index, index_n.node) { + DBG("index %p net_seq_num %" PRIu64 " refcount %ld" + " stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64, + index, + index->index_n.key, + stream->ref.refcount, + index->stream->stream_handle, + index->stream->trace->id, + index->stream->trace->session->id); + } + } } int stream_reset_file(struct relay_stream *stream) @@ -1342,7 +1340,7 @@ int stream_reset_file(struct relay_stream *stream) stream->channel_name, stream->stream_handle); } - stream->file = NULL; + stream->file = nullptr; } DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64, @@ -1360,7 +1358,7 @@ int stream_reset_file(struct relay_stream *stream) stream, stream->trace_chunk, true, &stream->file); } -void print_relay_streams(void) +void print_relay_streams() { struct lttng_ht_iter iter; struct relay_stream *stream; @@ -1369,19 +1367,23 @@ void print_relay_streams(void) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (!stream_get(stream)) { - continue; + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { + if (!stream_get(stream)) { + continue; + } + + DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + stream, + stream->ref.refcount, + stream->stream_handle, + stream->trace->id, + stream->trace->session->id); + print_stream_indexes(stream); + stream_put(stream); } - DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64, - stream, - stream->ref.refcount, - stream->stream_handle, - stream->trace->id, - stream->trace->session->id); - print_stream_indexes(stream); - stream_put(stream); } - rcu_read_unlock(); }