Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
index bc7893475c13c34e816427d1d358740655c53bb3..187f5bec14a919c5f500e7ad7fe2d5e908695c14 100644 (file)
@@ -44,6 +44,7 @@
 #include <lttng/rotate-internal.hpp>
 #include <lttng/trigger/trigger.h>
 
+#include <fcntl.h>
 #include <inttypes.h>
 #include <memory>
 #include <signal.h>
@@ -67,9 +68,10 @@ struct ls::rotation_thread_timer_queue {
 
 namespace {
 struct rotation_thread_job {
-       using uptr = std::unique_ptr<
-               rotation_thread_job,
-               lttng::details::create_unique_class<rotation_thread_job, lttng::free>>;
+       using uptr =
+               std::unique_ptr<rotation_thread_job,
+                               lttng::memory::create_deleter_class<rotation_thread_job,
+                                                                   lttng::memory::free>::deleter>;
 
        enum ls::rotation_thread_job_type type;
        struct ltt_session *session;
@@ -110,38 +112,39 @@ end:
        return exists;
 }
 
-void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed)
+void check_session_rotation_pending_on_consumers(const ltt_session::locked_ref& session,
+                                                bool& _rotation_completed)
 {
        int ret = 0;
-       struct consumer_socket *socket;
-       struct cds_lfht_iter iter;
        enum consumer_trace_chunk_exists_status exists_status;
        uint64_t relayd_id;
        bool chunk_exists_on_peer = false;
        enum lttng_trace_chunk_status chunk_status;
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
-       LTTNG_ASSERT(session.chunk_being_archived);
+       LTTNG_ASSERT(session->chunk_being_archived);
 
        /*
         * Check for a local pending rotation on all consumers (32-bit
         * user space, 64-bit user space, and kernel).
         */
-       if (!session.ust_session) {
+       if (!session->ust_session) {
                goto skip_ust;
        }
 
-       cds_lfht_for_each_entry (
-               session.ust_session->consumer->socks->ht, &iter, socket, node.node) {
-               relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ?
+       for (auto *socket : lttng::urcu::lfht_iteration_adapter<consumer_socket,
+                                                               decltype(consumer_socket::node),
+                                                               &consumer_socket::node>(
+                    *session->ust_session->consumer->socks->ht)) {
+               relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
                        -1ULL :
-                       session.ust_session->consumer->net_seq_index;
+                       session->ust_session->consumer->net_seq_index;
 
-               lttng::pthread::lock_guard socket_lock(*socket->lock);
+               const lttng::pthread::lock_guard socket_lock(*socket->lock);
                ret = consumer_trace_chunk_exists(socket,
                                                  relayd_id,
-                                                 session.id,
-                                                 session.chunk_being_archived,
+                                                 session->id,
+                                                 session->chunk_being_archived,
                                                  &exists_status);
                if (ret) {
                        ERR("Error occurred while checking rotation status on consumer daemon");
@@ -155,22 +158,24 @@ void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _ro
        }
 
 skip_ust:
-       if (!session.kernel_session) {
+       if (!session->kernel_session) {
                goto skip_kernel;
        }
 
-       cds_lfht_for_each_entry (
-               session.kernel_session->consumer->socks->ht, &iter, socket, node.node) {
-               lttng::pthread::lock_guard socket_lock(*socket->lock);
+       for (auto *socket : lttng::urcu::lfht_iteration_adapter<consumer_socket,
+                                                               decltype(consumer_socket::node),
+                                                               &consumer_socket::node>(
+                    *session->kernel_session->consumer->socks->ht)) {
+               const lttng::pthread::lock_guard socket_lock(*socket->lock);
 
-               relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
+               relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
                        -1ULL :
-                       session.kernel_session->consumer->net_seq_index;
+                       session->kernel_session->consumer->net_seq_index;
 
                ret = consumer_trace_chunk_exists(socket,
                                                  relayd_id,
-                                                 session.id,
-                                                 session.chunk_being_archived,
+                                                 session->id,
+                                                 session->chunk_being_archived,
                                                  &exists_status);
                if (ret) {
                        ERR("Error occurred while checking rotation status on consumer daemon");
@@ -188,20 +193,20 @@ end:
        if (!chunk_exists_on_peer) {
                uint64_t chunk_being_archived_id;
 
-               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
+               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
                                                        &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
                DBG("Rotation of trace archive %" PRIu64
                    " of session \"%s\" is complete on all consumers",
                    chunk_being_archived_id,
-                   session.name);
+                   session->name);
        }
 
        _rotation_completed = !chunk_exists_on_peer;
        if (ret) {
                ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
                if (ret) {
-                       ERR("Failed to reset rotation state of session \"%s\"", session.name);
+                       ERR("Failed to reset rotation state of session \"%s\"", session->name);
                }
        }
 }
@@ -211,7 +216,7 @@ end:
  * Should only return non-zero in the event of a fatal error. Doing so will
  * shutdown the thread.
  */
-int check_session_rotation_pending(ltt_session& session,
+int check_session_rotation_pending(const ltt_session::locked_ref& session,
                                   notification_thread_handle& notification_thread_handle)
 {
        int ret;
@@ -221,17 +226,17 @@ int check_session_rotation_pending(ltt_session& session,
        const char *archived_chunk_name;
        uint64_t chunk_being_archived_id;
 
-       if (!session.chunk_being_archived) {
+       if (!session->chunk_being_archived) {
                ret = 0;
                goto end;
        }
 
        chunk_status =
-               lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id);
+               lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
        DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
-           session.name,
+           session->name,
            chunk_being_archived_id);
 
        /*
@@ -248,7 +253,7 @@ int check_session_rotation_pending(ltt_session& session,
        }
 
        check_session_rotation_pending_on_consumers(session, rotation_completed);
-       if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) {
+       if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
                goto check_ongoing_rotation;
        }
 
@@ -257,41 +262,41 @@ int check_session_rotation_pending(ltt_session& session,
         * rotations can start now.
         */
        chunk_status = lttng_trace_chunk_get_name(
-               session.chunk_being_archived, &archived_chunk_name, nullptr);
+               session->chunk_being_archived, &archived_chunk_name, nullptr);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
-       free(session.last_archived_chunk_name);
-       session.last_archived_chunk_name = strdup(archived_chunk_name);
-       if (!session.last_archived_chunk_name) {
+       free(session->last_archived_chunk_name);
+       session->last_archived_chunk_name = strdup(archived_chunk_name);
+       if (!session->last_archived_chunk_name) {
                PERROR("Failed to duplicate archived chunk name");
        }
 
        session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
 
-       if (!session.quiet_rotation) {
-               location = session_get_trace_archive_location(&session);
+       if (!session->quiet_rotation) {
+               location = session_get_trace_archive_location(session);
                ret = notification_thread_command_session_rotation_completed(
                        &notification_thread_handle,
-                       session.id,
-                       session.last_archived_chunk_id.value,
+                       session->id,
+                       session->last_archived_chunk_id.value,
                        location);
                lttng_trace_archive_location_put(location);
                if (ret != LTTNG_OK) {
                        ERR("Failed to notify notification thread of completed rotation for session %s",
-                           session.name);
+                           session->name);
                }
        }
 
        ret = 0;
 check_ongoing_rotation:
-       if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
-               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
+       if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
                                                        &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
                DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
                    chunk_being_archived_id,
-                   session.name);
-               ret = timer_session_rotation_pending_check_start(&session,
+                   session->name);
+               ret = timer_session_rotation_pending_check_start(session,
                                                                 DEFAULT_ROTATE_PENDING_TIMER);
                if (ret) {
                        ERR("Failed to re-enable rotation pending timer");
@@ -305,42 +310,46 @@ end:
 }
 
 /* Call with the session and session_list locks held. */
-int launch_session_rotation(ltt_session& session)
+void launch_session_rotation(const ltt_session::locked_ref& session)
 {
        int ret;
-       struct lttng_rotate_session_return rotation_return;
 
-       DBG("Launching scheduled time-based rotation on session \"%s\"", session.name);
+       DBG_FMT("Launching scheduled time-based rotation: session_name='{}'", session->name);
 
        ASSERT_SESSION_LIST_LOCKED();
-       ASSERT_LOCKED(session.lock);
-
-       ret = cmd_rotate_session(&session,
-                                &rotation_return,
-                                false,
-                                LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
-       if (ret == LTTNG_OK) {
-               DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
-                   session.name);
+
+       ret = cmd_rotate_session(
+               session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+       if (ret != LTTNG_OK) {
+               LTTNG_THROW_CTL(fmt::format("Failed to launch session rotation: session_name={}",
+                                           session->name),
+                               static_cast<lttng_error_code>(ret));
        } else {
                /* Don't consider errors as fatal. */
-               DBG("Scheduled time-based rotation aborted for session %s: %s",
-                   session.name,
-                   lttng_strerror(ret));
+               DBG_FMT("Scheduled time-based rotation aborted session_name=`{}`, error='{}'",
+                       session->name,
+                       lttng_strerror(ret));
        }
-
-       return 0;
 }
 
 int run_job(const rotation_thread_job& job,
-           ltt_session& session,
+           const ltt_session::locked_ref& session,
            notification_thread_handle& notification_thread_handle)
 {
-       int ret;
+       int ret = 0;
 
        switch (job.type) {
        case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
-               ret = launch_session_rotation(session);
+               try {
+                       launch_session_rotation(session);
+                       DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
+                           session->name);
+               } catch (const lttng::ctl::error& ctl_ex) {
+                       /* Don't consider errors as fatal. */
+                       DBG("Scheduled time-based rotation aborted for session %s: %s",
+                           session->name,
+                           lttng_strerror(ctl_ex.code()));
+               }
                break;
        case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
                ret = check_session_rotation_pending(session, notification_thread_handle);
@@ -384,7 +393,7 @@ void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue
        lttng_pipe_destroy(queue->event_pipe);
 
        {
-               lttng::pthread::lock_guard queue_lock(queue->lock);
+               const lttng::pthread::lock_guard queue_lock(queue->lock);
 
                LTTNG_ASSERT(cds_list_empty(&queue->list));
        }
@@ -395,8 +404,8 @@ void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue
 
 ls::rotation_thread::rotation_thread(rotation_thread_timer_queue& rotation_timer_queue,
                                     notification_thread_handle& notification_thread_handle) :
-       _rotation_timer_queue{ rotation_timer_queue },
-       _notification_thread_handle{ notification_thread_handle }
+       _rotation_timer_queue(rotation_timer_queue),
+       _notification_thread_handle(notification_thread_handle)
 {
        _quit_pipe.reset([]() {
                auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
@@ -465,7 +474,7 @@ void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue,
        const char dummy = '!';
        struct rotation_thread_job *job = nullptr;
        const char *job_type_str = get_job_type_str(job_type);
-       lttng::pthread::lock_guard queue_lock(queue->lock);
+       const lttng::pthread::lock_guard queue_lock(queue->lock);
 
        if (timer_job_exists(queue, job_type, session)) {
                /*
@@ -526,7 +535,7 @@ void ls::rotation_thread::_handle_job_queue()
 
                {
                        /* Take the queue lock only to pop an element from the list. */
-                       lttng::pthread::lock_guard rotation_timer_queue_lock(
+                       const lttng::pthread::lock_guard rotation_timer_queue_lock(
                                _rotation_timer_queue.lock);
                        if (cds_list_empty(&_rotation_timer_queue.list)) {
                                break;
@@ -537,15 +546,13 @@ void ls::rotation_thread::_handle_job_queue()
                        cds_list_del(&job->head);
                }
 
-               session_lock_list();
-               const auto unlock_list =
-                       lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
+               const auto list_lock = lttng::sessiond::lock_session_list();
 
-               /* locked_ptr will unlock the session and release the ref held by the job. */
+               /* locked_ref will unlock the session and release the ref held by the job. */
                session_lock(job->session);
-               auto session = ltt_session::locked_ptr(job->session);
+               auto session = ltt_session::make_locked_ref(*job->session);
 
-               if (run_job(*job, *session, _notification_thread_handle)) {
+               if (run_job(*job, session, _notification_thread_handle)) {
                        return;
                }
        }
@@ -583,19 +590,48 @@ void ls::rotation_thread::_handle_notification(const lttng_notification& notific
                condition_session_name,
                consumed);
 
-       session_lock_list();
-       const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
+       /*
+        * Mind the order of the declaration of list_lock vs session:
+        * the session list lock must always be released _after_ the release of
+        * a session's reference (the destruction of a ref/locked_ref) to ensure
+        * since the reference's release may unpublish the session from the list of
+        * sessions.
+        */
+       const auto list_lock = lttng::sessiond::lock_session_list();
+       try {
+               const auto session = ltt_session::find_locked_session(condition_session_name);
+
+               if (!lttng_trigger_is_equal(session->rotate_trigger,
+                                           lttng_notification_get_const_trigger(&notification))) {
+                       DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
+                       return;
+               }
 
-       ltt_session::locked_ptr session{ [&condition_session_name]() {
-               auto raw_session_ptr = session_find_by_name(condition_session_name);
+               unsubscribe_session_consumed_size_rotation(*session);
 
-               if (raw_session_ptr) {
-                       session_lock(raw_session_ptr);
+               ret = cmd_rotate_session(
+                       session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+               if (ret != LTTNG_OK) {
+                       switch (ret) {
+                       case LTTNG_OK:
+                               break;
+                       case -LTTNG_ERR_ROTATION_PENDING:
+                               DBG("Rotate already pending, subscribe to the next threshold value");
+                               break;
+                       case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
+                               DBG("Rotation already happened since last stop, subscribe to the next threshold value");
+                               break;
+                       case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
+                               DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
+                               break;
+                       default:
+                               LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
+                                               static_cast<lttng_error_code>(-ret));
+                       }
                }
 
-               return raw_session_ptr;
-       }() };
-       if (!session) {
+               subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
+       } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
                DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
                        lttng_condition_type_str(condition_type),
                        condition_session_name);
@@ -605,35 +641,6 @@ void ls::rotation_thread::_handle_notification(const lttng_notification& notific
                 */
                return;
        }
-
-       if (!lttng_trigger_is_equal(session->rotate_trigger,
-                                   lttng_notification_get_const_trigger(&notification))) {
-               DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
-               return;
-       }
-
-       unsubscribe_session_consumed_size_rotation(*session);
-
-       ret = cmd_rotate_session(
-               session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
-       switch (ret) {
-       case LTTNG_OK:
-               break;
-       case -LTTNG_ERR_ROTATION_PENDING:
-               DBG("Rotate already pending, subscribe to the next threshold value");
-               break;
-       case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
-               DBG("Rotation already happened since last stop, subscribe to the next threshold value");
-               break;
-       case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
-               DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
-               break;
-       default:
-               LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
-                               static_cast<lttng_error_code>(-ret));
-       }
-
-       subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
 }
 
 void ls::rotation_thread::_handle_notification_channel_activity()
@@ -751,8 +758,8 @@ void ls::rotation_thread::_run()
                        DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
 
                        if (revents & LPOLLERR) {
-                               LTTNG_THROW_ERROR(
-                                       fmt::format("Polling returned an error on fd: fd={}", fd));
+                               LTTNG_THROW_ERROR(lttng::format(
+                                       "Polling returned an error on fd: fd={}", fd));
                        }
 
                        if (fd == _notification_channel->socket ||
@@ -788,7 +795,7 @@ void ls::rotation_thread::_run()
 
                                        if (lttng_read(fd, &buf, 1) != 1) {
                                                LTTNG_THROW_POSIX(
-                                                       fmt::format(
+                                                       lttng::format(
                                                                "Failed to read from wakeup pipe: fd={}",
                                                                fd),
                                                        errno);
@@ -837,7 +844,7 @@ void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session&
                .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
        };
 
-       ASSERT_LOCKED(session.lock);
+       ASSERT_LOCKED(session._lock);
 
        auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
                lttng_condition_session_consumed_size_create());
@@ -848,14 +855,14 @@ void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session&
        auto condition_status =
                lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
        if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-               LTTNG_THROW_ERROR(fmt::format(
+               LTTNG_THROW_ERROR(lttng::format(
                        "Could not set session consumed size condition threshold: size={}", size));
        }
 
        condition_status = lttng_condition_session_consumed_size_set_session_name(
                rotate_condition.get(), session.name);
        if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-               LTTNG_THROW_ERROR(fmt::format(
+               LTTNG_THROW_ERROR(lttng::format(
                        "Could not set session consumed size condition session name: name=`{}`",
                        session.name));
        }
@@ -894,7 +901,7 @@ void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session&
                &_notification_thread_handle, trigger.get(), true);
        if (register_ret != LTTNG_OK) {
                LTTNG_THROW_CTL(
-                       fmt::format(
+                       lttng::format(
                                "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
                                session.name,
                                size),
@@ -918,7 +925,7 @@ void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session
                _notification_channel.get(),
                lttng_trigger_get_const_condition(session.rotate_trigger));
        if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               LTTNG_THROW_ERROR(fmt::format(
+               LTTNG_THROW_ERROR(lttng::format(
                        "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
                        session.name,
                        static_cast<int>(unsubscribe_status)));
@@ -934,7 +941,7 @@ void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session
                &_notification_thread_handle, session.rotate_trigger);
        if (unregister_status != LTTNG_OK) {
                LTTNG_THROW_CTL(
-                       fmt::format(
+                       lttng::format(
                                "Failed to unregister trigger for automatic size-based rotation: session_name{}",
                                session.name),
                        unregister_status);
This page took 0.035218 seconds and 4 git commands to generate.