X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.cpp;h=2790903adb7962d706eeb8bb0e660b7229c17d26;hp=f37d8e33040d991d7ea3ecd818508d1d9ac1f3f2;hb=HEAD;hpb=0114db0ec2407029052eb61a0189c9b1cd64d520 diff --git a/src/bin/lttng-relayd/viewer-stream.cpp b/src/bin/lttng-relayd/viewer-stream.cpp index f37d8e330..9ca54e67e 100644 --- a/src/bin/lttng-relayd/viewer-stream.cpp +++ b/src/bin/lttng-relayd/viewer-stream.cpp @@ -8,34 +8,36 @@ */ #define _LGPL_SOURCE +#include "lttng-relayd.hpp" +#include "viewer-stream.hpp" + #include -#include #include +#include +#include #include -#include -#include -#include -#include -#include "lttng-relayd.hpp" -#include "viewer-stream.hpp" +#include +#include +#include +#include static void viewer_stream_release_composite_objects(struct relay_viewer_stream *vstream) { if (vstream->stream_file.handle) { fs_handle_close(vstream->stream_file.handle); - vstream->stream_file.handle = NULL; + vstream->stream_file.handle = nullptr; } if (vstream->index_file) { lttng_index_file_put(vstream->index_file); - vstream->index_file = NULL; + vstream->index_file = nullptr; } if (vstream->stream) { stream_put(vstream->stream); - vstream->stream = NULL; + vstream->stream = nullptr; } lttng_trace_chunk_put(vstream->stream_file.trace_chunk); - vstream->stream_file.trace_chunk = NULL; + vstream->stream_file.trace_chunk = nullptr; } static void viewer_stream_destroy(struct relay_viewer_stream *vstream) @@ -55,10 +57,10 @@ static void viewer_stream_destroy_rcu(struct rcu_head *head) /* Relay stream's lock must be held by the caller. */ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - struct lttng_trace_chunk *trace_chunk, - enum lttng_viewer_seek seek_t) + struct lttng_trace_chunk *trace_chunk, + enum lttng_viewer_seek seek_t) { - struct relay_viewer_stream *vstream = NULL; + struct relay_viewer_stream *vstream = nullptr; ASSERT_LOCKED(stream->lock); @@ -69,21 +71,19 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, } if (trace_chunk) { - const bool acquired_reference = lttng_trace_chunk_get( - trace_chunk); + const bool acquired_reference = lttng_trace_chunk_get(trace_chunk); LTTNG_ASSERT(acquired_reference); } vstream->stream_file.trace_chunk = trace_chunk; vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); - if (vstream->path_name == NULL) { + if (vstream->path_name == nullptr) { PERROR("relay viewer path_name alloc"); goto error; } - vstream->channel_name = lttng_strndup(stream->channel_name, - LTTNG_VIEWER_NAME_MAX); - if (vstream->channel_name == NULL) { + vstream->channel_name = lttng_strndup(stream->channel_name, LTTNG_VIEWER_NAME_MAX); + if (vstream->channel_name == nullptr) { PERROR("relay viewer channel_name alloc"); goto error; } @@ -112,8 +112,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, */ seq_tail = 0; } - vstream->current_tracefile_id = - tracefile_array_get_file_index_tail(stream->tfa); + vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa); vstream->index_sent_seqcount = seq_tail; break; } @@ -127,8 +126,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, * We don't need to check the head position for -1ULL since the * increment will set it to 0. */ - vstream->index_sent_seqcount = - tracefile_array_get_seq_head(stream->tfa) + 1; + vstream->index_sent_seqcount = tracefile_array_get_seq_head(stream->tfa) + 1; break; default: goto error; @@ -138,26 +136,26 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, * If we never received an index for the current stream, delay * the opening of the index, otherwise open it right now. */ - if (stream->index_file == NULL) { - vstream->index_file = NULL; + if (stream->index_file == nullptr) { + vstream->index_file = nullptr; } else if (vstream->stream_file.trace_chunk) { const uint32_t connection_major = stream->trace->session->major; const uint32_t connection_minor = stream->trace->session->minor; enum lttng_trace_chunk_status chunk_status; chunk_status = lttng_index_file_create_from_trace_chunk_read_only( - vstream->stream_file.trace_chunk, - stream->path_name, - stream->channel_name, stream->tracefile_size, - vstream->current_tracefile_id, - lttng_to_index_major(connection_major, - connection_minor), - lttng_to_index_minor(connection_major, - connection_minor), - true, &vstream->index_file); + vstream->stream_file.trace_chunk, + stream->path_name, + stream->channel_name, + stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, connection_minor), + lttng_to_index_minor(connection_major, connection_minor), + true, + &vstream->index_file); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { - vstream->index_file = NULL; + vstream->index_file = nullptr; } else { goto error; } @@ -174,17 +172,22 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, enum lttng_trace_chunk_status status; ret = utils_stream_file_path(stream->path_name, - stream->channel_name, stream->tracefile_size, - vstream->current_tracefile_id, NULL, file_path, - sizeof(file_path)); + stream->channel_name, + stream->tracefile_size, + vstream->current_tracefile_id, + nullptr, + file_path, + sizeof(file_path)); if (ret < 0) { goto error; } - status = lttng_trace_chunk_open_fs_handle( - vstream->stream_file.trace_chunk, file_path, - O_RDONLY, 0, &vstream->stream_file.handle, - true); + status = lttng_trace_chunk_open_fs_handle(vstream->stream_file.trace_chunk, + file_path, + O_RDONLY, + 0, + &vstream->stream_file.handle, + true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto error; } @@ -193,15 +196,13 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) { off_t lseek_ret; - lseek_ret = fs_handle_seek( - vstream->index_file->file, 0, SEEK_END); + lseek_ret = fs_handle_seek(vstream->index_file->file, 0, SEEK_END); if (lseek_ret < 0) { goto error; } } if (stream->is_metadata) { - rcu_assign_pointer(stream->trace->viewer_metadata_stream, - vstream); + rcu_assign_pointer(stream->trace->viewer_metadata_stream, vstream); } vstream->last_seen_rotation_count = stream->completed_rotation_count; @@ -219,7 +220,7 @@ error: viewer_stream_release_composite_objects(vstream); viewer_stream_destroy(vstream); } - return NULL; + return nullptr; } static void viewer_stream_unpublish(struct relay_viewer_stream *vstream) @@ -234,8 +235,8 @@ static void viewer_stream_unpublish(struct relay_viewer_stream *vstream) static void viewer_stream_release(struct urcu_ref *ref) { - struct relay_viewer_stream *vstream = caa_container_of(ref, - struct relay_viewer_stream, ref); + struct relay_viewer_stream *vstream = + caa_container_of(ref, struct relay_viewer_stream, ref); if (vstream->stream->is_metadata) { rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL); @@ -260,9 +261,9 @@ struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct relay_viewer_stream *vstream = NULL; + struct relay_viewer_stream *vstream = nullptr; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_lookup(viewer_streams_ht, &id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (!node) { @@ -271,29 +272,27 @@ struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id) } vstream = lttng::utils::container_of(node, &relay_viewer_stream::stream_n); if (!viewer_stream_get(vstream)) { - vstream = NULL; + vstream = nullptr; } end: - rcu_read_unlock(); return vstream; } void viewer_stream_put(struct relay_viewer_stream *vstream) { - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; urcu_ref_put(&vstream->ref, viewer_stream_release); - rcu_read_unlock(); } void viewer_stream_close_files(struct relay_viewer_stream *vstream) { if (vstream->index_file) { lttng_index_file_put(vstream->index_file); - vstream->index_file = NULL; + vstream->index_file = nullptr; } if (vstream->stream_file.handle) { - fs_handle_close(vstream->stream_file.handle); - vstream->stream_file.handle = NULL; + fs_handle_close(vstream->stream_file.handle); + vstream->stream_file.handle = nullptr; } } @@ -314,8 +313,7 @@ void viewer_stream_sync_tracefile_array_tail(struct relay_viewer_stream *vstream * index_sent_seqcount is already further than the tracefile * array tail position, keep its current position. */ - vstream->index_sent_seqcount = - std::max(seq_tail, vstream->index_sent_seqcount); + vstream->index_sent_seqcount = std::max(seq_tail, vstream->index_sent_seqcount); } /* @@ -331,9 +329,8 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) const struct relay_stream *stream = vstream->stream; /* Detect the last tracefile to open. */ - if (stream->index_received_seqcount - == vstream->index_sent_seqcount - && stream->trace->session->connection_closed) { + if (stream->index_received_seqcount == vstream->index_sent_seqcount && + stream->trace->session->connection_closed) { ret = 1; goto end; } @@ -347,10 +344,8 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) /* * Try to move to the next file. */ - new_id = (vstream->current_tracefile_id + 1) - % stream->tracefile_count; - if (tracefile_array_seq_in_file(stream->tfa, new_id, - vstream->index_sent_seqcount)) { + new_id = (vstream->current_tracefile_id + 1) % stream->tracefile_count; + if (tracefile_array_seq_in_file(stream->tfa, new_id, vstream->index_sent_seqcount)) { vstream->current_tracefile_id = new_id; } else { uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); @@ -364,8 +359,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) /* * We need to resync because we lag behind tail. */ - vstream->current_tracefile_id = - tracefile_array_get_file_index_tail(stream->tfa); + vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa); vstream->index_sent_seqcount = seq_tail; } viewer_stream_close_files(vstream); @@ -374,7 +368,7 @@ end: return ret; } -void print_viewer_streams(void) +void print_viewer_streams() { struct lttng_ht_iter iter; struct relay_viewer_stream *vstream; @@ -383,20 +377,22 @@ void print_viewer_streams(void) return; } - rcu_read_lock(); - cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, - stream_n.node) { - if (!viewer_stream_get(vstream)) { - continue; + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry ( + viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { + if (!viewer_stream_get(vstream)) { + continue; + } + DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + vstream, + vstream->ref.refcount, + vstream->stream->stream_handle, + vstream->stream->trace->id, + vstream->stream->trace->session->id); + viewer_stream_put(vstream); } - DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 - " session %" PRIu64, - vstream, - vstream->ref.refcount, - vstream->stream->stream_handle, - vstream->stream->trace->id, - vstream->stream->trace->session->id); - viewer_stream_put(vstream); } - rcu_read_unlock(); }