X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.cpp;fp=src%2Fbin%2Flttng-relayd%2Fviewer-stream.cpp;h=92be53e4f8b102de4f7c6b44ebe038eb90cf99e0;hp=f37d8e33040d991d7ea3ecd818508d1d9ac1f3f2;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hpb=52e345b9ac912d033c2a2c25a170a01cf209839d diff --git a/src/bin/lttng-relayd/viewer-stream.cpp b/src/bin/lttng-relayd/viewer-stream.cpp index f37d8e330..92be53e4f 100644 --- a/src/bin/lttng-relayd/viewer-stream.cpp +++ b/src/bin/lttng-relayd/viewer-stream.cpp @@ -8,17 +8,18 @@ */ #define _LGPL_SOURCE +#include "lttng-relayd.hpp" +#include "viewer-stream.hpp" + #include -#include #include +#include #include -#include -#include -#include -#include -#include "lttng-relayd.hpp" -#include "viewer-stream.hpp" +#include +#include +#include +#include static void viewer_stream_release_composite_objects(struct relay_viewer_stream *vstream) { @@ -55,8 +56,8 @@ static void viewer_stream_destroy_rcu(struct rcu_head *head) /* Relay stream's lock must be held by the caller. */ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - struct lttng_trace_chunk *trace_chunk, - enum lttng_viewer_seek seek_t) + struct lttng_trace_chunk *trace_chunk, + enum lttng_viewer_seek seek_t) { struct relay_viewer_stream *vstream = NULL; @@ -69,8 +70,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, } if (trace_chunk) { - const bool acquired_reference = lttng_trace_chunk_get( - trace_chunk); + const bool acquired_reference = lttng_trace_chunk_get(trace_chunk); LTTNG_ASSERT(acquired_reference); } @@ -81,8 +81,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, PERROR("relay viewer path_name alloc"); goto error; } - vstream->channel_name = lttng_strndup(stream->channel_name, - LTTNG_VIEWER_NAME_MAX); + vstream->channel_name = lttng_strndup(stream->channel_name, LTTNG_VIEWER_NAME_MAX); if (vstream->channel_name == NULL) { PERROR("relay viewer channel_name alloc"); goto error; @@ -112,8 +111,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, */ seq_tail = 0; } - vstream->current_tracefile_id = - tracefile_array_get_file_index_tail(stream->tfa); + vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa); vstream->index_sent_seqcount = seq_tail; break; } @@ -127,8 +125,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, * We don't need to check the head position for -1ULL since the * increment will set it to 0. */ - vstream->index_sent_seqcount = - tracefile_array_get_seq_head(stream->tfa) + 1; + vstream->index_sent_seqcount = tracefile_array_get_seq_head(stream->tfa) + 1; break; default: goto error; @@ -146,15 +143,15 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, enum lttng_trace_chunk_status chunk_status; chunk_status = lttng_index_file_create_from_trace_chunk_read_only( - vstream->stream_file.trace_chunk, - stream->path_name, - stream->channel_name, stream->tracefile_size, - vstream->current_tracefile_id, - lttng_to_index_major(connection_major, - connection_minor), - lttng_to_index_minor(connection_major, - connection_minor), - true, &vstream->index_file); + vstream->stream_file.trace_chunk, + stream->path_name, + stream->channel_name, + stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, connection_minor), + lttng_to_index_minor(connection_major, connection_minor), + true, + &vstream->index_file); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { vstream->index_file = NULL; @@ -174,17 +171,22 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, enum lttng_trace_chunk_status status; ret = utils_stream_file_path(stream->path_name, - stream->channel_name, stream->tracefile_size, - vstream->current_tracefile_id, NULL, file_path, - sizeof(file_path)); + stream->channel_name, + stream->tracefile_size, + vstream->current_tracefile_id, + NULL, + file_path, + sizeof(file_path)); if (ret < 0) { goto error; } - status = lttng_trace_chunk_open_fs_handle( - vstream->stream_file.trace_chunk, file_path, - O_RDONLY, 0, &vstream->stream_file.handle, - true); + status = lttng_trace_chunk_open_fs_handle(vstream->stream_file.trace_chunk, + file_path, + O_RDONLY, + 0, + &vstream->stream_file.handle, + true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto error; } @@ -193,15 +195,13 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) { off_t lseek_ret; - lseek_ret = fs_handle_seek( - vstream->index_file->file, 0, SEEK_END); + lseek_ret = fs_handle_seek(vstream->index_file->file, 0, SEEK_END); if (lseek_ret < 0) { goto error; } } if (stream->is_metadata) { - rcu_assign_pointer(stream->trace->viewer_metadata_stream, - vstream); + rcu_assign_pointer(stream->trace->viewer_metadata_stream, vstream); } vstream->last_seen_rotation_count = stream->completed_rotation_count; @@ -234,8 +234,8 @@ static void viewer_stream_unpublish(struct relay_viewer_stream *vstream) static void viewer_stream_release(struct urcu_ref *ref) { - struct relay_viewer_stream *vstream = caa_container_of(ref, - struct relay_viewer_stream, ref); + struct relay_viewer_stream *vstream = + caa_container_of(ref, struct relay_viewer_stream, ref); if (vstream->stream->is_metadata) { rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL); @@ -292,7 +292,7 @@ void viewer_stream_close_files(struct relay_viewer_stream *vstream) vstream->index_file = NULL; } if (vstream->stream_file.handle) { - fs_handle_close(vstream->stream_file.handle); + fs_handle_close(vstream->stream_file.handle); vstream->stream_file.handle = NULL; } } @@ -314,8 +314,7 @@ void viewer_stream_sync_tracefile_array_tail(struct relay_viewer_stream *vstream * index_sent_seqcount is already further than the tracefile * array tail position, keep its current position. */ - vstream->index_sent_seqcount = - std::max(seq_tail, vstream->index_sent_seqcount); + vstream->index_sent_seqcount = std::max(seq_tail, vstream->index_sent_seqcount); } /* @@ -331,9 +330,8 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) const struct relay_stream *stream = vstream->stream; /* Detect the last tracefile to open. */ - if (stream->index_received_seqcount - == vstream->index_sent_seqcount - && stream->trace->session->connection_closed) { + if (stream->index_received_seqcount == vstream->index_sent_seqcount && + stream->trace->session->connection_closed) { ret = 1; goto end; } @@ -347,10 +345,8 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) /* * Try to move to the next file. */ - new_id = (vstream->current_tracefile_id + 1) - % stream->tracefile_count; - if (tracefile_array_seq_in_file(stream->tfa, new_id, - vstream->index_sent_seqcount)) { + new_id = (vstream->current_tracefile_id + 1) % stream->tracefile_count; + if (tracefile_array_seq_in_file(stream->tfa, new_id, vstream->index_sent_seqcount)) { vstream->current_tracefile_id = new_id; } else { uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); @@ -364,8 +360,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) /* * We need to resync because we lag behind tail. */ - vstream->current_tracefile_id = - tracefile_array_get_file_index_tail(stream->tfa); + vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa); vstream->index_sent_seqcount = seq_tail; } viewer_stream_close_files(vstream); @@ -384,18 +379,16 @@ void print_viewer_streams(void) } rcu_read_lock(); - cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, - stream_n.node) { + cds_lfht_for_each_entry (viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { if (!viewer_stream_get(vstream)) { continue; } - DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 - " session %" PRIu64, - vstream, - vstream->ref.refcount, - vstream->stream->stream_handle, - vstream->stream->trace->id, - vstream->stream->trace->session->id); + DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 " session %" PRIu64, + vstream, + vstream->ref.refcount, + vstream->stream->stream_handle, + vstream->stream->trace->id, + vstream->stream->trace->session->id); viewer_stream_put(vstream); } rcu_read_unlock();