fix: relayd: unaligned access in trace_chunk_registry_ht_key_hash
[lttng-tools.git] / src / bin / lttng-relayd / viewer-stream.cpp
index 7a66fde5bd6b8bf54d540a8aa0762e47ee9017bd..9ca54e67ea49b7be778cf054579ca54004289bcb 100644 (file)
@@ -8,20 +8,40 @@
  */
 
 #define _LGPL_SOURCE
-#include <common/common.h>
-#include <common/index/index.h>
-#include <common/compat/string.h>
-#include <common/utils.h>
-#include <sys/types.h>
-#include <sys/stat.h>
+#include "lttng-relayd.hpp"
+#include "viewer-stream.hpp"
+
+#include <common/common.hpp>
+#include <common/compat/string.hpp>
+#include <common/index/index.hpp>
+#include <common/urcu.hpp>
+#include <common/utils.hpp>
+
+#include <algorithm>
 #include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
 
-#include "lttng-relayd.h"
-#include "viewer-stream.h"
+static void viewer_stream_release_composite_objects(struct relay_viewer_stream *vstream)
+{
+       if (vstream->stream_file.handle) {
+               fs_handle_close(vstream->stream_file.handle);
+               vstream->stream_file.handle = nullptr;
+       }
+       if (vstream->index_file) {
+               lttng_index_file_put(vstream->index_file);
+               vstream->index_file = nullptr;
+       }
+       if (vstream->stream) {
+               stream_put(vstream->stream);
+               vstream->stream = nullptr;
+       }
+       lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+       vstream->stream_file.trace_chunk = nullptr;
+}
 
 static void viewer_stream_destroy(struct relay_viewer_stream *vstream)
 {
-       lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
        free(vstream->path_name);
        free(vstream->channel_name);
        free(vstream);
@@ -30,42 +50,40 @@ static void viewer_stream_destroy(struct relay_viewer_stream *vstream)
 static void viewer_stream_destroy_rcu(struct rcu_head *head)
 {
        struct relay_viewer_stream *vstream =
-               caa_container_of(head, struct relay_viewer_stream, rcu_node);
+               lttng::utils::container_of(head, &relay_viewer_stream::rcu_node);
 
        viewer_stream_destroy(vstream);
 }
 
 /* Relay stream's lock must be held by the caller. */
 struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
-               struct lttng_trace_chunk *trace_chunk,
-               enum lttng_viewer_seek seek_t)
+                                                struct lttng_trace_chunk *trace_chunk,
+                                                enum lttng_viewer_seek seek_t)
 {
-       struct relay_viewer_stream *vstream = NULL;
+       struct relay_viewer_stream *vstream = nullptr;
 
        ASSERT_LOCKED(stream->lock);
 
-       vstream = (relay_viewer_stream *) zmalloc(sizeof(*vstream));
+       vstream = zmalloc<relay_viewer_stream>();
        if (!vstream) {
                PERROR("relay viewer stream zmalloc");
                goto error;
        }
 
        if (trace_chunk) {
-               const bool acquired_reference = lttng_trace_chunk_get(
-                               trace_chunk);
+               const bool acquired_reference = lttng_trace_chunk_get(trace_chunk);
 
                LTTNG_ASSERT(acquired_reference);
        }
 
        vstream->stream_file.trace_chunk = trace_chunk;
        vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
-       if (vstream->path_name == NULL) {
+       if (vstream->path_name == nullptr) {
                PERROR("relay viewer path_name alloc");
                goto error;
        }
-       vstream->channel_name = lttng_strndup(stream->channel_name,
-                       LTTNG_VIEWER_NAME_MAX);
-       if (vstream->channel_name == NULL) {
+       vstream->channel_name = lttng_strndup(stream->channel_name, LTTNG_VIEWER_NAME_MAX);
+       if (vstream->channel_name == nullptr) {
                PERROR("relay viewer channel_name alloc");
                goto error;
        }
@@ -94,8 +112,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                         */
                        seq_tail = 0;
                }
-               vstream->current_tracefile_id =
-                       tracefile_array_get_file_index_tail(stream->tfa);
+               vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa);
                vstream->index_sent_seqcount = seq_tail;
                break;
        }
@@ -109,8 +126,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                 * We don't need to check the head position for -1ULL since the
                 * increment will set it to 0.
                 */
-               vstream->index_sent_seqcount =
-                               tracefile_array_get_seq_head(stream->tfa) + 1;
+               vstream->index_sent_seqcount = tracefile_array_get_seq_head(stream->tfa) + 1;
                break;
        default:
                goto error;
@@ -120,26 +136,26 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
         * If we never received an index for the current stream, delay
         * the opening of the index, otherwise open it right now.
         */
-       if (stream->index_file == NULL) {
-               vstream->index_file = NULL;
+       if (stream->index_file == nullptr) {
+               vstream->index_file = nullptr;
        } else if (vstream->stream_file.trace_chunk) {
                const uint32_t connection_major = stream->trace->session->major;
                const uint32_t connection_minor = stream->trace->session->minor;
                enum lttng_trace_chunk_status chunk_status;
 
                chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
-                               vstream->stream_file.trace_chunk,
-                               stream->path_name,
-                               stream->channel_name, stream->tracefile_size,
-                               vstream->current_tracefile_id,
-                               lttng_to_index_major(connection_major,
-                                               connection_minor),
-                               lttng_to_index_minor(connection_major,
-                                               connection_minor),
-                               true, &vstream->index_file);
+                       vstream->stream_file.trace_chunk,
+                       stream->path_name,
+                       stream->channel_name,
+                       stream->tracefile_size,
+                       vstream->current_tracefile_id,
+                       lttng_to_index_major(connection_major, connection_minor),
+                       lttng_to_index_minor(connection_major, connection_minor),
+                       true,
+                       &vstream->index_file);
                if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
-                               vstream->index_file = NULL;
+                               vstream->index_file = nullptr;
                        } else {
                                goto error;
                        }
@@ -156,17 +172,22 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                enum lttng_trace_chunk_status status;
 
                ret = utils_stream_file_path(stream->path_name,
-                               stream->channel_name, stream->tracefile_size,
-                               vstream->current_tracefile_id, NULL, file_path,
-                               sizeof(file_path));
+                                            stream->channel_name,
+                                            stream->tracefile_size,
+                                            vstream->current_tracefile_id,
+                                            nullptr,
+                                            file_path,
+                                            sizeof(file_path));
                if (ret < 0) {
                        goto error;
                }
 
-               status = lttng_trace_chunk_open_fs_handle(
-                               vstream->stream_file.trace_chunk, file_path,
-                               O_RDONLY, 0, &vstream->stream_file.handle,
-                               true);
+               status = lttng_trace_chunk_open_fs_handle(vstream->stream_file.trace_chunk,
+                                                         file_path,
+                                                         O_RDONLY,
+                                                         0,
+                                                         &vstream->stream_file.handle,
+                                                         true);
                if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        goto error;
                }
@@ -175,15 +196,13 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
        if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
                off_t lseek_ret;
 
-               lseek_ret = fs_handle_seek(
-                               vstream->index_file->file, 0, SEEK_END);
+               lseek_ret = fs_handle_seek(vstream->index_file->file, 0, SEEK_END);
                if (lseek_ret < 0) {
                        goto error;
                }
        }
        if (stream->is_metadata) {
-               rcu_assign_pointer(stream->trace->viewer_metadata_stream,
-                               vstream);
+               rcu_assign_pointer(stream->trace->viewer_metadata_stream, vstream);
        }
 
        vstream->last_seen_rotation_count = stream->completed_rotation_count;
@@ -197,9 +216,11 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
 
 error:
        if (vstream) {
+               /* Not using `put` since vstream is assumed to be published. */
+               viewer_stream_release_composite_objects(vstream);
                viewer_stream_destroy(vstream);
        }
-       return NULL;
+       return nullptr;
 }
 
 static void viewer_stream_unpublish(struct relay_viewer_stream *vstream)
@@ -214,29 +235,14 @@ static void viewer_stream_unpublish(struct relay_viewer_stream *vstream)
 
 static void viewer_stream_release(struct urcu_ref *ref)
 {
-       struct relay_viewer_stream *vstream = caa_container_of(ref,
-                       struct relay_viewer_stream, ref);
+       struct relay_viewer_stream *vstream =
+               caa_container_of(ref, struct relay_viewer_stream, ref);
 
        if (vstream->stream->is_metadata) {
                rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL);
        }
-
        viewer_stream_unpublish(vstream);
-
-       if (vstream->stream_file.handle) {
-               fs_handle_close(vstream->stream_file.handle);
-               vstream->stream_file.handle = NULL;
-       }
-       if (vstream->index_file) {
-               lttng_index_file_put(vstream->index_file);
-               vstream->index_file = NULL;
-       }
-       if (vstream->stream) {
-               stream_put(vstream->stream);
-               vstream->stream = NULL;
-       }
-       lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
-       vstream->stream_file.trace_chunk = NULL;
+       viewer_stream_release_composite_objects(vstream);
        call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu);
 }
 
@@ -255,40 +261,38 @@ struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id)
 {
        struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
-       struct relay_viewer_stream *vstream = NULL;
+       struct relay_viewer_stream *vstream = nullptr;
 
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        lttng_ht_lookup(viewer_streams_ht, &id, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (!node) {
                DBG("Relay viewer stream %" PRIu64 " not found", id);
                goto end;
        }
-       vstream = caa_container_of(node, struct relay_viewer_stream, stream_n);
+       vstream = lttng::utils::container_of(node, &relay_viewer_stream::stream_n);
        if (!viewer_stream_get(vstream)) {
-               vstream = NULL;
+               vstream = nullptr;
        }
 end:
-       rcu_read_unlock();
        return vstream;
 }
 
 void viewer_stream_put(struct relay_viewer_stream *vstream)
 {
-       rcu_read_lock();
+       lttng::urcu::read_lock_guard read_lock;
        urcu_ref_put(&vstream->ref, viewer_stream_release);
-       rcu_read_unlock();
 }
 
 void viewer_stream_close_files(struct relay_viewer_stream *vstream)
 {
        if (vstream->index_file) {
                lttng_index_file_put(vstream->index_file);
-               vstream->index_file = NULL;
+               vstream->index_file = nullptr;
        }
        if (vstream->stream_file.handle) {
-               fs_handle_close(vstream->stream_file.handle);
-               vstream->stream_file.handle = NULL;
+               fs_handle_close(vstream->stream_file.handle);
+               vstream->stream_file.handle = nullptr;
        }
 }
 
@@ -302,7 +306,14 @@ void viewer_stream_sync_tracefile_array_tail(struct relay_viewer_stream *vstream
        if (seq_tail == -1ULL) {
                seq_tail = 0;
        }
-       vstream->index_sent_seqcount = seq_tail;
+
+       /*
+        * Move the index sent seqcount forward if it was lagging behind
+        * the new tail of the tracefile array. If the current
+        * index_sent_seqcount is already further than the tracefile
+        * array tail position, keep its current position.
+        */
+       vstream->index_sent_seqcount = std::max(seq_tail, vstream->index_sent_seqcount);
 }
 
 /*
@@ -318,9 +329,8 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
        const struct relay_stream *stream = vstream->stream;
 
        /* Detect the last tracefile to open. */
-       if (stream->index_received_seqcount
-                       == vstream->index_sent_seqcount
-                       && stream->trace->session->connection_closed) {
+       if (stream->index_received_seqcount == vstream->index_sent_seqcount &&
+           stream->trace->session->connection_closed) {
                ret = 1;
                goto end;
        }
@@ -334,10 +344,8 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
        /*
         * Try to move to the next file.
         */
-       new_id = (vstream->current_tracefile_id + 1)
-                       % stream->tracefile_count;
-       if (tracefile_array_seq_in_file(stream->tfa, new_id,
-                       vstream->index_sent_seqcount)) {
+       new_id = (vstream->current_tracefile_id + 1) % stream->tracefile_count;
+       if (tracefile_array_seq_in_file(stream->tfa, new_id, vstream->index_sent_seqcount)) {
                vstream->current_tracefile_id = new_id;
        } else {
                uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa);
@@ -351,8 +359,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
                /*
                 * We need to resync because we lag behind tail.
                 */
-               vstream->current_tracefile_id =
-                       tracefile_array_get_file_index_tail(stream->tfa);
+               vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa);
                vstream->index_sent_seqcount = seq_tail;
        }
        viewer_stream_close_files(vstream);
@@ -361,7 +368,7 @@ end:
        return ret;
 }
 
-void print_viewer_streams(void)
+void print_viewer_streams()
 {
        struct lttng_ht_iter iter;
        struct relay_viewer_stream *vstream;
@@ -370,20 +377,22 @@ void print_viewer_streams(void)
                return;
        }
 
-       rcu_read_lock();
-       cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
-                       stream_n.node) {
-               if (!viewer_stream_get(vstream)) {
-                       continue;
+       {
+               lttng::urcu::read_lock_guard read_lock;
+
+               cds_lfht_for_each_entry (
+                       viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) {
+                       if (!viewer_stream_get(vstream)) {
+                               continue;
+                       }
+                       DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
+                           " session %" PRIu64,
+                           vstream,
+                           vstream->ref.refcount,
+                           vstream->stream->stream_handle,
+                           vstream->stream->trace->id,
+                           vstream->stream->trace->session->id);
+                       viewer_stream_put(vstream);
                }
-               DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
-                       " session %" PRIu64,
-                       vstream,
-                       vstream->ref.refcount,
-                       vstream->stream->stream_handle,
-                       vstream->stream->trace->id,
-                       vstream->stream->trace->session->id);
-               viewer_stream_put(vstream);
        }
-       rcu_read_unlock();
 }
This page took 0.02906 seconds and 4 git commands to generate.