#include <lttng/rotate-internal.hpp>
#include <lttng/trigger/trigger.h>
+#include <fcntl.h>
#include <inttypes.h>
#include <memory>
#include <signal.h>
namespace {
struct rotation_thread_job {
- using uptr = std::unique_ptr<
- rotation_thread_job,
- lttng::details::create_unique_class<rotation_thread_job, lttng::free>>;
+ using uptr =
+ std::unique_ptr<rotation_thread_job,
+ lttng::memory::create_deleter_class<rotation_thread_job,
+ lttng::memory::free>::deleter>;
enum ls::rotation_thread_job_type type;
struct ltt_session *session;
return exists;
}
-void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed)
+void check_session_rotation_pending_on_consumers(const ltt_session::locked_ref& session,
+ bool& _rotation_completed)
{
int ret = 0;
- struct consumer_socket *socket;
- struct cds_lfht_iter iter;
enum consumer_trace_chunk_exists_status exists_status;
uint64_t relayd_id;
bool chunk_exists_on_peer = false;
enum lttng_trace_chunk_status chunk_status;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
- LTTNG_ASSERT(session.chunk_being_archived);
+ LTTNG_ASSERT(session->chunk_being_archived);
/*
* Check for a local pending rotation on all consumers (32-bit
* user space, 64-bit user space, and kernel).
*/
- if (!session.ust_session) {
+ if (!session->ust_session) {
goto skip_ust;
}
- cds_lfht_for_each_entry (
- session.ust_session->consumer->socks->ht, &iter, socket, node.node) {
- relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ?
+ for (auto *socket : lttng::urcu::lfht_iteration_adapter<consumer_socket,
+ decltype(consumer_socket::node),
+ &consumer_socket::node>(
+ *session->ust_session->consumer->socks->ht)) {
+ relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
-1ULL :
- session.ust_session->consumer->net_seq_index;
+ session->ust_session->consumer->net_seq_index;
- lttng::pthread::lock_guard socket_lock(*socket->lock);
+ const lttng::pthread::lock_guard socket_lock(*socket->lock);
ret = consumer_trace_chunk_exists(socket,
relayd_id,
- session.id,
- session.chunk_being_archived,
+ session->id,
+ session->chunk_being_archived,
&exists_status);
if (ret) {
ERR("Error occurred while checking rotation status on consumer daemon");
}
skip_ust:
- if (!session.kernel_session) {
+ if (!session->kernel_session) {
goto skip_kernel;
}
- cds_lfht_for_each_entry (
- session.kernel_session->consumer->socks->ht, &iter, socket, node.node) {
- lttng::pthread::lock_guard socket_lock(*socket->lock);
+ for (auto *socket : lttng::urcu::lfht_iteration_adapter<consumer_socket,
+ decltype(consumer_socket::node),
+ &consumer_socket::node>(
+ *session->kernel_session->consumer->socks->ht)) {
+ const lttng::pthread::lock_guard socket_lock(*socket->lock);
- relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
+ relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
-1ULL :
- session.kernel_session->consumer->net_seq_index;
+ session->kernel_session->consumer->net_seq_index;
ret = consumer_trace_chunk_exists(socket,
relayd_id,
- session.id,
- session.chunk_being_archived,
+ session->id,
+ session->chunk_being_archived,
&exists_status);
if (ret) {
ERR("Error occurred while checking rotation status on consumer daemon");
if (!chunk_exists_on_peer) {
uint64_t chunk_being_archived_id;
- chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
+ chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
&chunk_being_archived_id);
LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
DBG("Rotation of trace archive %" PRIu64
" of session \"%s\" is complete on all consumers",
chunk_being_archived_id,
- session.name);
+ session->name);
}
_rotation_completed = !chunk_exists_on_peer;
if (ret) {
ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
if (ret) {
- ERR("Failed to reset rotation state of session \"%s\"", session.name);
+ ERR("Failed to reset rotation state of session \"%s\"", session->name);
}
}
}
* Should only return non-zero in the event of a fatal error. Doing so will
* shutdown the thread.
*/
-int check_session_rotation_pending(ltt_session& session,
+int check_session_rotation_pending(const ltt_session::locked_ref& session,
notification_thread_handle& notification_thread_handle)
{
int ret;
const char *archived_chunk_name;
uint64_t chunk_being_archived_id;
- if (!session.chunk_being_archived) {
+ if (!session->chunk_being_archived) {
ret = 0;
goto end;
}
chunk_status =
- lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id);
+ lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
- session.name,
+ session->name,
chunk_being_archived_id);
/*
}
check_session_rotation_pending_on_consumers(session, rotation_completed);
- if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) {
+ if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
goto check_ongoing_rotation;
}
* rotations can start now.
*/
chunk_status = lttng_trace_chunk_get_name(
- session.chunk_being_archived, &archived_chunk_name, nullptr);
+ session->chunk_being_archived, &archived_chunk_name, nullptr);
LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
- free(session.last_archived_chunk_name);
- session.last_archived_chunk_name = strdup(archived_chunk_name);
- if (!session.last_archived_chunk_name) {
+ free(session->last_archived_chunk_name);
+ session->last_archived_chunk_name = strdup(archived_chunk_name);
+ if (!session->last_archived_chunk_name) {
PERROR("Failed to duplicate archived chunk name");
}
session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
- if (!session.quiet_rotation) {
- location = session_get_trace_archive_location(&session);
+ if (!session->quiet_rotation) {
+ location = session_get_trace_archive_location(session);
ret = notification_thread_command_session_rotation_completed(
¬ification_thread_handle,
- session.id,
- session.last_archived_chunk_id.value,
+ session->id,
+ session->last_archived_chunk_id.value,
location);
lttng_trace_archive_location_put(location);
if (ret != LTTNG_OK) {
ERR("Failed to notify notification thread of completed rotation for session %s",
- session.name);
+ session->name);
}
}
ret = 0;
check_ongoing_rotation:
- if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
- chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
+ if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+ chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
&chunk_being_archived_id);
LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
chunk_being_archived_id,
- session.name);
- ret = timer_session_rotation_pending_check_start(&session,
+ session->name);
+ ret = timer_session_rotation_pending_check_start(session,
DEFAULT_ROTATE_PENDING_TIMER);
if (ret) {
ERR("Failed to re-enable rotation pending timer");
}
/* Call with the session and session_list locks held. */
-int launch_session_rotation(ltt_session& session)
+void launch_session_rotation(const ltt_session::locked_ref& session)
{
int ret;
- struct lttng_rotate_session_return rotation_return;
- DBG("Launching scheduled time-based rotation on session \"%s\"", session.name);
+ DBG_FMT("Launching scheduled time-based rotation: session_name='{}'", session->name);
ASSERT_SESSION_LIST_LOCKED();
- ASSERT_LOCKED(session.lock);
-
- ret = cmd_rotate_session(&session,
- &rotation_return,
- false,
- LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
- if (ret == LTTNG_OK) {
- DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
- session.name);
+
+ ret = cmd_rotate_session(
+ session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+ if (ret != LTTNG_OK) {
+ LTTNG_THROW_CTL(fmt::format("Failed to launch session rotation: session_name={}",
+ session->name),
+ static_cast<lttng_error_code>(ret));
} else {
/* Don't consider errors as fatal. */
- DBG("Scheduled time-based rotation aborted for session %s: %s",
- session.name,
- lttng_strerror(ret));
+ DBG_FMT("Scheduled time-based rotation aborted session_name=`{}`, error='{}'",
+ session->name,
+ lttng_strerror(ret));
}
-
- return 0;
}
int run_job(const rotation_thread_job& job,
- ltt_session& session,
+ const ltt_session::locked_ref& session,
notification_thread_handle& notification_thread_handle)
{
- int ret;
+ int ret = 0;
switch (job.type) {
case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
- ret = launch_session_rotation(session);
+ try {
+ launch_session_rotation(session);
+ DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
+ session->name);
+ } catch (const lttng::ctl::error& ctl_ex) {
+ /* Don't consider errors as fatal. */
+ DBG("Scheduled time-based rotation aborted for session %s: %s",
+ session->name,
+ lttng_strerror(ctl_ex.code()));
+ }
break;
case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
ret = check_session_rotation_pending(session, notification_thread_handle);
lttng_pipe_destroy(queue->event_pipe);
{
- lttng::pthread::lock_guard queue_lock(queue->lock);
+ const lttng::pthread::lock_guard queue_lock(queue->lock);
LTTNG_ASSERT(cds_list_empty(&queue->list));
}
ls::rotation_thread::rotation_thread(rotation_thread_timer_queue& rotation_timer_queue,
notification_thread_handle& notification_thread_handle) :
- _rotation_timer_queue{ rotation_timer_queue },
- _notification_thread_handle{ notification_thread_handle }
+ _rotation_timer_queue(rotation_timer_queue),
+ _notification_thread_handle(notification_thread_handle)
{
_quit_pipe.reset([]() {
auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
const char dummy = '!';
struct rotation_thread_job *job = nullptr;
const char *job_type_str = get_job_type_str(job_type);
- lttng::pthread::lock_guard queue_lock(queue->lock);
+ const lttng::pthread::lock_guard queue_lock(queue->lock);
if (timer_job_exists(queue, job_type, session)) {
/*
{
/* Take the queue lock only to pop an element from the list. */
- lttng::pthread::lock_guard rotation_timer_queue_lock(
+ const lttng::pthread::lock_guard rotation_timer_queue_lock(
_rotation_timer_queue.lock);
if (cds_list_empty(&_rotation_timer_queue.list)) {
break;
cds_list_del(&job->head);
}
- session_lock_list();
- const auto unlock_list =
- lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
+ const auto list_lock = lttng::sessiond::lock_session_list();
- /* locked_ptr will unlock the session and release the ref held by the job. */
+ /* locked_ref will unlock the session and release the ref held by the job. */
session_lock(job->session);
- auto session = ltt_session::locked_ptr(job->session);
+ auto session = ltt_session::make_locked_ref(*job->session);
- if (run_job(*job, *session, _notification_thread_handle)) {
+ if (run_job(*job, session, _notification_thread_handle)) {
return;
}
}
condition_session_name,
consumed);
- session_lock_list();
- const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
+ /*
+ * Mind the order of the declaration of list_lock vs session:
+ * the session list lock must always be released _after_ the release of
+ * a session's reference (the destruction of a ref/locked_ref) to ensure
+ * since the reference's release may unpublish the session from the list of
+ * sessions.
+ */
+ const auto list_lock = lttng::sessiond::lock_session_list();
+ try {
+ const auto session = ltt_session::find_locked_session(condition_session_name);
+
+ if (!lttng_trigger_is_equal(session->rotate_trigger,
+ lttng_notification_get_const_trigger(¬ification))) {
+ DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
+ return;
+ }
- ltt_session::locked_ptr session{ [&condition_session_name]() {
- auto raw_session_ptr = session_find_by_name(condition_session_name);
+ unsubscribe_session_consumed_size_rotation(*session);
- if (raw_session_ptr) {
- session_lock(raw_session_ptr);
+ ret = cmd_rotate_session(
+ session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+ if (ret != LTTNG_OK) {
+ switch (ret) {
+ case LTTNG_OK:
+ break;
+ case -LTTNG_ERR_ROTATION_PENDING:
+ DBG("Rotate already pending, subscribe to the next threshold value");
+ break;
+ case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
+ DBG("Rotation already happened since last stop, subscribe to the next threshold value");
+ break;
+ case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
+ DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
+ break;
+ default:
+ LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
+ static_cast<lttng_error_code>(-ret));
+ }
}
- return raw_session_ptr;
- }() };
- if (!session) {
+ subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
+ } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
lttng_condition_type_str(condition_type),
condition_session_name);
*/
return;
}
-
- if (!lttng_trigger_is_equal(session->rotate_trigger,
- lttng_notification_get_const_trigger(¬ification))) {
- DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
- return;
- }
-
- unsubscribe_session_consumed_size_rotation(*session);
-
- ret = cmd_rotate_session(
- session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
- switch (ret) {
- case LTTNG_OK:
- break;
- case -LTTNG_ERR_ROTATION_PENDING:
- DBG("Rotate already pending, subscribe to the next threshold value");
- break;
- case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
- DBG("Rotation already happened since last stop, subscribe to the next threshold value");
- break;
- case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
- DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
- break;
- default:
- LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
- static_cast<lttng_error_code>(-ret));
- }
-
- subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
}
void ls::rotation_thread::_handle_notification_channel_activity()
DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
if (revents & LPOLLERR) {
- LTTNG_THROW_ERROR(
- fmt::format("Polling returned an error on fd: fd={}", fd));
+ LTTNG_THROW_ERROR(lttng::format(
+ "Polling returned an error on fd: fd={}", fd));
}
if (fd == _notification_channel->socket ||
if (lttng_read(fd, &buf, 1) != 1) {
LTTNG_THROW_POSIX(
- fmt::format(
+ lttng::format(
"Failed to read from wakeup pipe: fd={}",
fd),
errno);
.gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
};
- ASSERT_LOCKED(session.lock);
+ ASSERT_LOCKED(session._lock);
auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
lttng_condition_session_consumed_size_create());
auto condition_status =
lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
if (condition_status != LTTNG_CONDITION_STATUS_OK) {
- LTTNG_THROW_ERROR(fmt::format(
+ LTTNG_THROW_ERROR(lttng::format(
"Could not set session consumed size condition threshold: size={}", size));
}
condition_status = lttng_condition_session_consumed_size_set_session_name(
rotate_condition.get(), session.name);
if (condition_status != LTTNG_CONDITION_STATUS_OK) {
- LTTNG_THROW_ERROR(fmt::format(
+ LTTNG_THROW_ERROR(lttng::format(
"Could not set session consumed size condition session name: name=`{}`",
session.name));
}
&_notification_thread_handle, trigger.get(), true);
if (register_ret != LTTNG_OK) {
LTTNG_THROW_CTL(
- fmt::format(
+ lttng::format(
"Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
session.name,
size),
_notification_channel.get(),
lttng_trigger_get_const_condition(session.rotate_trigger));
if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
- LTTNG_THROW_ERROR(fmt::format(
+ LTTNG_THROW_ERROR(lttng::format(
"Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
session.name,
static_cast<int>(unsubscribe_status)));
&_notification_thread_handle, session.rotate_trigger);
if (unregister_status != LTTNG_OK) {
LTTNG_THROW_CTL(
- fmt::format(
+ lttng::format(
"Failed to unregister trigger for automatic size-based rotation: session_name{}",
session.name),
unregister_status);