/*
- * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
- * Copyright (C) 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
*
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _LGPL_SOURCE
#include <common/align.h>
#include <common/time.h>
#include <common/hashtable/utils.h>
-#include <sys/eventfd.h>
#include <sys/stat.h>
#include <time.h>
#include <signal.h>
return queue;
}
-void log_job_destruction(const struct rotation_thread_job *job)
-{
- enum lttng_error_level log_level;
- const char *job_type_str = get_job_type_str(job->type);
-
- switch (job->type) {
- case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
- /*
- * Not a problem, the scheduled rotation is racing with the teardown
- * of the daemon. In this case, the rotation will not happen, which
- * is not a problem (or at least, not important enough to delay
- * the shutdown of the session daemon).
- */
- log_level = PRINT_DBG;
- break;
- case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
- /* This is not expected to happen; warn the user. */
- log_level = PRINT_WARN;
- break;
- default:
- abort();
- }
-
- LOG(log_level, "Rotation thread timer queue still contains job of type %s targeting session \"%s\" on destruction",
- job_type_str, job->session->name);
-}
-
void rotation_thread_timer_queue_destroy(
struct rotation_thread_timer_queue *queue)
{
struct ltt_session *session)
{
int ret;
- const char * const dummy = "!";
+ const char dummy = '!';
struct rotation_thread_job *job = NULL;
const char *job_type_str = get_job_type_str(job_type);
job->type = job_type;
cds_list_add_tail(&job->head, &queue->list);
- ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), dummy,
- 1);
+ ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy,
+ sizeof(dummy));
if (ret < 0) {
/*
* We do not want to block in the timer handler, the job has
}
static
-int check_session_rotation_pending_local_on_consumer(
- const struct ltt_session *session,
- struct consumer_socket *socket, bool *rotation_completed)
-{
- int ret;
-
- pthread_mutex_lock(socket->lock);
- DBG("[rotation-thread] Checking for locally pending rotation on the %s consumer for session %s",
- lttng_consumer_type_str(socket->type),
- session->name);
- ret = consumer_check_rotation_pending_local(socket,
- session->id,
- session->current_archive_id - 1);
- pthread_mutex_unlock(socket->lock);
-
- if (ret == 0) {
- /* Rotation was completed on this consumer. */
- DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" was completed on the %s consumer",
- session->current_archive_id - 1,
- session->name,
- lttng_consumer_type_str(socket->type));
- *rotation_completed = true;
- } else if (ret == 1) {
- /* Rotation pending on this consumer. */
- DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
- session->current_archive_id - 1,
- session->name,
- lttng_consumer_type_str(socket->type));
- *rotation_completed = false;
- ret = 0;
- } else {
- /* Not a fatal error. */
- ERR("[rotation-thread] Encountered an error when checking if local rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the %s consumer",
- session->current_archive_id - 1,
- session->name,
- lttng_consumer_type_str(socket->type));
- *rotation_completed = false;
- }
- return ret;
-}
-
-static
-int check_session_rotation_pending_local(struct ltt_session *session)
+void check_session_rotation_pending_on_consumers(struct ltt_session *session,
+ bool *_rotation_completed)
{
int ret = 0;
struct consumer_socket *socket;
struct cds_lfht_iter iter;
- bool rotation_completed = true;
+ enum consumer_trace_chunk_exists_status exists_status;
+ uint64_t relayd_id;
+ bool chunk_exists_on_peer = false;
+ enum lttng_trace_chunk_status chunk_status;
+
+ assert(session->chunk_being_archived);
/*
* Check for a local pending rotation on all consumers (32-bit
* user space, 64-bit user space, and kernel).
*/
- DBG("[rotation-thread] Checking for pending local rotation on session \"%s\", trace archive %" PRIu64,
- session->name, session->current_archive_id - 1);
-
rcu_read_lock();
if (!session->ust_session) {
goto skip_ust;
}
cds_lfht_for_each_entry(session->ust_session->consumer->socks->ht,
&iter, socket, node.node) {
- ret = check_session_rotation_pending_local_on_consumer(session,
- socket, &rotation_completed);
- if (ret || !rotation_completed) {
+ relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
+ -1ULL :
+ session->ust_session->consumer->net_seq_index;
+
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_trace_chunk_exists(socket,
+ relayd_id,
+ session->id, session->chunk_being_archived,
+ &exists_status);
+ if (ret) {
+ pthread_mutex_unlock(socket->lock);
+ ERR("Error occurred while checking rotation status on consumer daemon");
goto end;
}
+
+ if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
+ pthread_mutex_unlock(socket->lock);
+ chunk_exists_on_peer = true;
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
}
skip_ust:
}
cds_lfht_for_each_entry(session->kernel_session->consumer->socks->ht,
&iter, socket, node.node) {
- ret = check_session_rotation_pending_local_on_consumer(session,
- socket, &rotation_completed);
- if (ret || !rotation_completed) {
+ pthread_mutex_lock(socket->lock);
+ relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
+ -1ULL :
+ session->kernel_session->consumer->net_seq_index;
+
+ ret = consumer_trace_chunk_exists(socket,
+ relayd_id,
+ session->id, session->chunk_being_archived,
+ &exists_status);
+ if (ret) {
+ pthread_mutex_unlock(socket->lock);
+ ERR("Error occurred while checking rotation status on consumer daemon");
goto end;
}
+
+ if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
+ pthread_mutex_unlock(socket->lock);
+ chunk_exists_on_peer = true;
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
}
skip_kernel:
end:
rcu_read_unlock();
- if (rotation_completed) {
- DBG("[rotation-thread] Local rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
- session->current_archive_id - 1,
+ if (!chunk_exists_on_peer) {
+ uint64_t chunk_being_archived_id;
+
+ chunk_status = lttng_trace_chunk_get_id(
+ session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+ DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers",
+ chunk_being_archived_id,
session->name);
- session->rotation_pending_local = false;
}
+ *_rotation_completed = !chunk_exists_on_peer;
if (ret) {
ret = session_reset_rotation_state(session,
LTTNG_ROTATION_STATE_ERROR);
session->name);
}
}
- return 0;
-}
-
-static
-int check_session_rotation_pending_relay(struct ltt_session *session)
-{
- int ret;
- struct consumer_socket *socket;
- struct cds_lfht_iter iter;
- bool rotation_completed = true;
- const struct consumer_output *output;
-
- /*
- * Check for a pending rotation on any consumer as we only use
- * it as a "tunnel" to the relayd.
- */
-
- rcu_read_lock();
- if (session->ust_session) {
- cds_lfht_first(session->ust_session->consumer->socks->ht,
- &iter);
- output = session->ust_session->consumer;
- } else {
- cds_lfht_first(session->kernel_session->consumer->socks->ht,
- &iter);
- output = session->kernel_session->consumer;
- }
- assert(cds_lfht_iter_get_node(&iter));
-
- socket = caa_container_of(cds_lfht_iter_get_node(&iter),
- typeof(*socket), node.node);
-
- pthread_mutex_lock(socket->lock);
- DBG("[rotation-thread] Checking for pending relay rotation on session \"%s\", trace archive %" PRIu64 " through the %s consumer",
- session->name, session->current_archive_id - 1,
- lttng_consumer_type_str(socket->type));
- ret = consumer_check_rotation_pending_relay(socket,
- output,
- session->id,
- session->current_archive_id - 1);
- pthread_mutex_unlock(socket->lock);
-
- if (ret == 0) {
- /* Rotation was completed on the relay. */
- DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" was completed",
- session->current_archive_id - 1,
- session->name);
- } else if (ret == 1) {
- /* Rotation pending on relay. */
- DBG("[rotation-thread] Relay rotation of trace archive %" PRIu64 " of session \"%s\" is pending",
- session->current_archive_id - 1,
- session->name);
- rotation_completed = false;
- } else {
- /* Not a fatal error. */
- ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay",
- session->current_archive_id - 1,
- session->name);
- ret = session_reset_rotation_state(session,
- LTTNG_ROTATION_STATE_ERROR);
- if (ret) {
- ERR("Failed to reset rotation state of session \"%s\"",
- session->name);
- }
- rotation_completed = false;
- }
-
- rcu_read_unlock();
-
- if (rotation_completed) {
- DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
- session->current_archive_id - 1,
- session->name);
- session->rotation_pending_relay = false;
- }
- return 0;
}
/*
* Check if the last rotation was completed, called with session lock held.
+ * Should only return non-zero in the event of a fatal error. Doing so will
+ * shutdown the thread.
*/
static
int check_session_rotation_pending(struct ltt_session *session,
{
int ret;
struct lttng_trace_archive_location *location;
- time_t now;
+ enum lttng_trace_chunk_status chunk_status;
+ bool rotation_completed = false;
+ const char *archived_chunk_name;
+ uint64_t chunk_being_archived_id;
+
+ if (!session->chunk_being_archived) {
+ ret = 0;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
DBG("[rotation-thread] Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
- session->name, session->current_archive_id - 1);
+ session->name, chunk_being_archived_id);
/*
* The rotation-pending check timer of a session is launched in
*/
ret = timer_session_rotation_pending_check_stop(session);
if (ret) {
- goto end;
- }
-
- if (session->rotation_pending_local) {
- /* Updates session->rotation_pending_local as needed. */
- ret = check_session_rotation_pending_local(session);
- if (ret) {
- goto end;
- }
-
- /*
- * No need to check for a pending rotation on the relay
- * since the rotation is not even completed locally yet.
- */
- if (session->rotation_pending_local) {
- goto end;
- }
- }
-
- if (session->rotation_pending_relay) {
- /* Updates session->rotation_pending_relay as needed. */
- ret = check_session_rotation_pending_relay(session);
- if (ret) {
- goto end;
- }
-
- if (session->rotation_pending_relay) {
- goto end;
- }
+ goto check_ongoing_rotation;
}
- DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " completed for "
- "session %s", session->current_archive_id - 1,
- session->name);
-
- /* Rename the completed trace archive's location. */
- now = time(NULL);
- if (now == (time_t) -1) {
- ret = session_reset_rotation_state(session,
- LTTNG_ROTATION_STATE_ERROR);
- if (ret) {
- ERR("Failed to reset rotation state of session \"%s\"",
- session->name);
- }
- ret = LTTNG_ERR_UNK;
- goto end;
+ check_session_rotation_pending_on_consumers(session,
+ &rotation_completed);
+ if (!rotation_completed ||
+ session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
+ goto check_ongoing_rotation;
}
- ret = rename_completed_chunk(session, now);
- if (ret < 0) {
- ERR("Failed to rename completed rotation chunk");
- goto end;
- }
- session->last_chunk_start_ts = session->current_chunk_start_ts;
-
/*
* Now we can clear the "ONGOING" state in the session. New
* rotations can start now.
*/
- session->rotation_state = LTTNG_ROTATION_STATE_COMPLETED;
-
- /* Ownership of location is transferred. */
- location = session_get_trace_archive_location(session);
- ret = notification_thread_command_session_rotation_completed(
- notification_thread_handle,
- session->name,
- session->uid,
- session->gid,
- session->current_archive_id,
- location);
- if (ret != LTTNG_OK) {
- ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
- session->name);
+ chunk_status = lttng_trace_chunk_get_name(session->chunk_being_archived,
+ &archived_chunk_name, NULL);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+ free(session->last_archived_chunk_name);
+ session->last_archived_chunk_name = strdup(archived_chunk_name);
+ if (!session->last_archived_chunk_name) {
+ PERROR("Failed to duplicate archived chunk name");
}
+ session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
- if (!session->active) {
- /*
- * A stop command was issued during the rotation, it is
- * up to the rotation completion check to perform the
- * renaming of the last chunk that was produced.
- */
- ret = notification_thread_command_session_rotation_ongoing(
- notification_thread_handle,
- session->name,
- session->uid,
- session->gid,
- session->current_archive_id);
- if (ret != LTTNG_OK) {
- ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
- session->name);
- }
-
- ret = rename_active_chunk(session);
- if (ret < 0) {
- ERR("[rotation-thread] Failed to rename active rotation chunk");
- goto end;
- }
-
- /* Ownership of location is transferred. */
+ if (!session->quiet_rotation) {
location = session_get_trace_archive_location(session);
+ /* Ownership of location is transferred. */
ret = notification_thread_command_session_rotation_completed(
notification_thread_handle,
session->name,
session->uid,
session->gid,
- session->current_archive_id,
+ session->last_archived_chunk_id.value,
location);
if (ret != LTTNG_OK) {
ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
}
ret = 0;
-end:
+check_ongoing_rotation:
if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+ uint64_t chunk_being_archived_id;
+
+ chunk_status = lttng_trace_chunk_get_id(
+ session->chunk_being_archived,
+ &chunk_being_archived_id);
+ assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
+
DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " is still pending for session %s",
- session->current_archive_id - 1, session->name);
+ chunk_being_archived_id, session->name);
ret = timer_session_rotation_pending_check_start(session,
DEFAULT_ROTATE_PENDING_TIMER);
if (ret) {
- ERR("Re-enabling rotate pending timer");
+ ERR("Failed to re-enable rotation pending timer");
ret = -1;
goto end;
}
}
+end:
return ret;
}
DBG("[rotation-thread] Launching scheduled time-based rotation on session \"%s\"",
session->name);
- ret = cmd_rotate_session(session, &rotation_return);
+ ret = cmd_rotate_session(session, &rotation_return, false,
+ LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
if (ret == LTTNG_OK) {
DBG("[rotation-thread] Scheduled time-based rotation successfully launched on session \"%s\"",
session->name);
switch (job->type) {
case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
- ret = launch_session_rotation(session);
+ ret = launch_session_rotation(session);
break;
case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
ret = check_session_rotation_pending(session,
* possible for a job targeting that session to have
* already been queued before it was destroyed.
*/
- session_unlock_list();
free(job);
session_put(session);
+ session_unlock_list();
continue;
}
session_lock(session);
- ret = run_job(job, session, handle->notification_thread_handle);
+ ret = run_job(job, session, handle->notification_thread_handle);
session_unlock(session);
/* Release reference held by the job. */
session_put(session);
goto end;
}
session_lock(session);
- session_unlock_list();
ret = unsubscribe_session_consumed_size_rotation(session,
notification_thread_handle);
goto end_unlock;
}
- ret = cmd_rotate_session(session, NULL);
+ ret = cmd_rotate_session(session, NULL, false,
+ LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
if (ret == -LTTNG_ERR_ROTATION_PENDING) {
DBG("Rotate already pending, subscribe to the next threshold value");
} else if (ret != LTTNG_OK) {
end_unlock:
session_unlock(session);
session_put(session);
+ session_unlock_list();
end:
return ret;
}
return ret;
}
+static
void *thread_rotation(void *data)
{
int ret;
struct rotation_thread_handle *handle = data;
struct rotation_thread thread;
- const int queue_pipe_fd = lttng_pipe_get_readfd(
- handle->rotation_timer_queue->event_pipe);
+ int queue_pipe_fd;
DBG("[rotation-thread] Started rotation thread");
+ rcu_register_thread();
+ rcu_thread_online();
+ health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
+ health_code_update();
if (!handle) {
ERR("[rotation-thread] Invalid thread context provided");
goto end;
}
- rcu_register_thread();
- rcu_thread_online();
+ queue_pipe_fd = lttng_pipe_get_readfd(
+ handle->rotation_timer_queue->event_pipe);
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
- health_code_update();
ret = init_thread_state(handle, &thread);
if (ret) {
goto error;
}
- /* Ready to handle client connections. */
- sessiond_notify_ready();
-
while (true) {
int fd_count, i;
ret = lttng_read(fd, &buf, 1);
if (ret != 1) {
ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd);
- ret = -1;
goto error;
}
} else {
error:
DBG("[rotation-thread] Exit");
fini_thread_state(&thread);
+end:
health_unregister(health_sessiond);
rcu_thread_offline();
rcu_unregister_thread();
-end:
return NULL;
}