};
struct rotation_thread_handle {
- int quit_pipe;
struct rotation_thread_timer_queue *rotation_timer_queue;
/* Access to the notification thread cmd_queue */
struct notification_thread_handle *notification_thread_handle;
}
struct rotation_thread_handle *rotation_thread_handle_create(
- int quit_pipe,
struct rotation_thread_timer_queue *rotation_timer_queue,
struct notification_thread_handle *notification_thread_handle,
sem_t *notification_thread_ready)
goto end;
}
- handle->quit_pipe = quit_pipe;
handle->rotation_timer_queue = rotation_timer_queue;
handle->notification_thread_handle = notification_thread_handle;
handle->notification_thread_ready = notification_thread_ready;
* - quit pipe,
* - rotation thread timer queue pipe,
*/
- ret = lttng_poll_create(poll_set, 2, LTTNG_CLOEXEC);
- if (ret < 0) {
- goto end;
- }
-
- ret = lttng_poll_add(poll_set, handle->quit_pipe,
- LPOLLIN | LPOLLERR);
- if (ret < 0) {
- ERR("[rotation-thread] Failed to add quit_pipe fd to pollset");
+ ret = sessiond_set_thread_pollset(poll_set, 2);
+ if (ret) {
goto error;
}
ret = lttng_poll_add(poll_set,
goto error;
}
-end:
return ret;
error:
lttng_poll_clean(poll_set);
static
int check_session_rotation_pending_local(struct ltt_session *session)
{
- int ret;
+ int ret = 0;
struct consumer_socket *socket;
struct cds_lfht_iter iter;
bool rotation_completed = true;
session->rotation_pending_local = false;
}
if (ret) {
- session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+ ret = session_reset_rotation_state(session,
+ LTTNG_ROTATION_STATE_ERROR);
+ if (ret) {
+ ERR("Failed to reset rotation state of session \"%s\"",
+ session->name);
+ }
}
return 0;
}
ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay",
session->current_archive_id - 1,
session->name);
- session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+ ret = session_reset_rotation_state(session,
+ LTTNG_ROTATION_STATE_ERROR);
+ if (ret) {
+ ERR("Failed to reset rotation state of session \"%s\"",
+ session->name);
+ }
rotation_completed = false;
}
rcu_read_unlock();
if (rotation_completed) {
- DBG("[rotation-thread] Totation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
+ DBG("[rotation-thread] Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
session->current_archive_id - 1,
session->name);
session->rotation_pending_relay = false;
/* Rename the completed trace archive's location. */
now = time(NULL);
if (now == (time_t) -1) {
- session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+ ret = session_reset_rotation_state(session,
+ LTTNG_ROTATION_STATE_ERROR);
+ if (ret) {
+ ERR("Failed to reset rotation state of session \"%s\"",
+ session->name);
+ }
ret = LTTNG_ERR_UNK;
goto end;
}
session->name);
}
+ if (!session->active) {
+ /*
+ * A stop command was issued during the rotation, it is
+ * up to the rotation completion check to perform the
+ * renaming of the last chunk that was produced.
+ */
+ ret = notification_thread_command_session_rotation_ongoing(
+ notification_thread_handle,
+ session->name,
+ session->uid,
+ session->gid,
+ session->current_archive_id);
+ if (ret != LTTNG_OK) {
+ ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+ session->name);
+ }
+
+ ret = rename_active_chunk(session);
+ if (ret < 0) {
+ ERR("[rotation-thread] Failed to rename active rotation chunk");
+ goto end;
+ }
+
+ /* Ownership of location is transferred. */
+ location = session_get_trace_archive_location(session);
+ ret = notification_thread_command_session_rotation_completed(
+ notification_thread_handle,
+ session->name,
+ session->uid,
+ session->gid,
+ session->current_archive_id,
+ location);
+ if (ret != LTTNG_OK) {
+ ERR("[rotation-thread] Failed to notify notification thread of completed rotation for session %s",
+ session->name);
+ }
+ }
+
ret = 0;
end:
if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
return ret;
}
-/* Call with the session lock held. */
+/* Call with the session and session_list locks held. */
static
int launch_session_rotation(struct ltt_session *session)
{
}
session_lock(session);
- session_unlock_list();
-
ret = run_job(job, session, handle->notification_thread_handle);
session_unlock(session);
+ session_unlock_list();
free(job);
if (ret) {
goto end;
ret = unsubscribe_session_consumed_size_rotation(session,
notification_thread_handle);
if (ret) {
- goto end;
+ goto end_unlock;
}
ret = cmd_rotate_session(session, NULL);
goto end;
}
+ rcu_register_thread();
+ rcu_thread_online();
+
health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
health_code_update();
ret = init_thread_state(handle, &thread);
if (ret) {
- goto end;
+ goto error;
}
/* Ready to handle client connections. */
goto error;
}
- if (fd == handle->quit_pipe) {
+ if (sessiond_check_thread_quit_pipe(fd, revents)) {
DBG("[rotation-thread] Quit pipe activity");
/* TODO flush the queue. */
goto exit;
DBG("[rotation-thread] Exit");
fini_thread_state(&thread);
health_unregister(health_sessiond);
+ rcu_thread_offline();
+ rcu_unregister_thread();
end:
return NULL;
}