From db66e57489e5014289dc1d831e1eac90ac4fc0da Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Thu, 14 Dec 2017 16:27:49 -0500 Subject: [PATCH 1/1] Sessiond rotation thread MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This thread is responsible to receive the notifications from the consumers that a channel has finished its rotation and perform the rename of the chunk ready to be processed by the client when all the channels of a session have completed their rotation. Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- include/Makefile.am | 1 + include/lttng/rotation.h | 58 +++ src/bin/lttng-sessiond/Makefile.am | 4 +- src/bin/lttng-sessiond/health-sessiond.h | 1 + src/bin/lttng-sessiond/main.c | 46 ++- src/bin/lttng-sessiond/rotate.c | 320 ++++++++++++++++ src/bin/lttng-sessiond/rotate.h | 51 +++ src/bin/lttng-sessiond/rotation-thread.c | 453 +++++++++++++++++++++++ src/bin/lttng-sessiond/rotation-thread.h | 54 +++ src/bin/lttng-sessiond/session.h | 45 +++ src/lib/lttng-ctl/lttng-ctl-health.c | 1 + 11 files changed, 1032 insertions(+), 2 deletions(-) create mode 100644 include/lttng/rotation.h create mode 100644 src/bin/lttng-sessiond/rotate.c create mode 100644 src/bin/lttng-sessiond/rotate.h create mode 100644 src/bin/lttng-sessiond/rotation-thread.c create mode 100644 src/bin/lttng-sessiond/rotation-thread.h diff --git a/include/Makefile.am b/include/Makefile.am index 7de634b8e..ae9264eab 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -78,6 +78,7 @@ lttnginclude_HEADERS = \ lttng/save.h \ lttng/load.h \ lttng/endpoint.h \ + lttng/rotation.h \ version.h.tmpl lttngactioninclude_HEADERS= \ diff --git a/include/lttng/rotation.h b/include/lttng/rotation.h new file mode 100644 index 000000000..47ca25715 --- /dev/null +++ b/include/lttng/rotation.h @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2017 - Julien Desfossez + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_ROTATION_H +#define LTTNG_ROTATION_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Return codes for lttng_rotate_session_get_output_path. + */ +enum lttng_rotation_status { + /* + * After starting a rotation. + */ + LTTNG_ROTATION_STATUS_STARTED = 0, + /* + * When the rotation is complete. + */ + LTTNG_ROTATION_STATUS_COMPLETED = 1, + /* + * If the handle does not match the last rotate command, we cannot + * retrieve the path for the chunk. + */ + LTTNG_ROTATION_STATUS_EXPIRED = 2, + /* + * On error. + */ + LTTNG_ROTATION_STATUS_ERROR = 3, + /* + * If no rotation occured during this session. + */ + LTTNG_ROTATION_STATUS_NO_ROTATION = 4, +}; + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_ROTATION_H */ diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 413fe75a2..ea6e9e9bb 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -35,7 +35,9 @@ lttng_sessiond_SOURCES = utils.c utils.h \ notification-thread.h notification-thread.c \ notification-thread-commands.h notification-thread-commands.c \ notification-thread-events.h notification-thread-events.c \ - sessiond-config.h sessiond-config.c + sessiond-config.h sessiond-config.c \ + rotate.h rotate.c \ + rotation-thread.h rotation-thread.c if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \ diff --git a/src/bin/lttng-sessiond/health-sessiond.h b/src/bin/lttng-sessiond/health-sessiond.h index 5d94cc639..abeb4f0cf 100644 --- a/src/bin/lttng-sessiond/health-sessiond.h +++ b/src/bin/lttng-sessiond/health-sessiond.h @@ -30,6 +30,7 @@ enum health_type_sessiond { HEALTH_SESSIOND_TYPE_APP_MANAGE_NOTIFY = 6, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH = 7, HEALTH_SESSIOND_TYPE_NOTIFICATION = 8, + HEALTH_SESSIOND_TYPE_ROTATION = 9, NR_HEALTH_SESSIOND_TYPES, }; diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 532e345c0..b1229e721 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -73,6 +73,7 @@ #include "load-session-thread.h" #include "notification-thread.h" #include "notification-thread-commands.h" +#include "rotation-thread.h" #include "syscall.h" #include "agent.h" #include "ht-cleanup.h" @@ -208,6 +209,7 @@ static pthread_t ht_cleanup_thread; static pthread_t agent_reg_thread; static pthread_t load_session_thread; static pthread_t notification_thread; +static pthread_t rotation_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -288,6 +290,9 @@ struct load_session_thread_data *load_info; /* Notification thread handle. */ struct notification_thread_handle *notification_thread_handle; +/* Rotation thread handle. */ +struct rotation_thread_handle *rotation_thread_handle; + /* Global hash tables */ struct lttng_ht *agent_apps_ht_by_sock = NULL; @@ -297,7 +302,7 @@ struct lttng_ht *agent_apps_ht_by_sock = NULL; * NR_LTTNG_SESSIOND_READY must match the number of calls to * sessiond_notify_ready(). */ -#define NR_LTTNG_SESSIOND_READY 4 +#define NR_LTTNG_SESSIOND_READY 5 int lttng_sessiond_ready = NR_LTTNG_SESSIOND_READY; int sessiond_check_thread_quit_pipe(int fd, uint32_t events) @@ -5476,6 +5481,7 @@ int main(int argc, char **argv) *ust64_channel_monitor_pipe = NULL, *kernel_channel_monitor_pipe = NULL; bool notification_thread_running = false; + bool rotation_thread_running = false; struct lttng_pipe *ust32_channel_rotate_pipe = NULL, *ust64_channel_rotate_pipe = NULL, *kernel_channel_rotate_pipe = NULL; @@ -5894,6 +5900,31 @@ int main(int argc, char **argv) } notification_thread_running = true; + /* rotation_thread_data acquires the pipes' read side. */ + rotation_thread_handle = rotation_thread_handle_create( + ust32_channel_rotate_pipe, + ust64_channel_rotate_pipe, + kernel_channel_rotate_pipe, + thread_quit_pipe[0]); + if (!rotation_thread_handle) { + retval = -1; + ERR("Failed to create rotation thread shared data"); + stop_threads(); + goto exit_rotation; + } + rotation_thread_running = true; + + /* Create rotation thread. */ + ret = pthread_create(&rotation_thread, default_pthread_attr(), + thread_rotation, rotation_thread_handle); + if (ret) { + errno = ret; + PERROR("pthread_create rotation"); + retval = -1; + stop_threads(); + goto exit_rotation; + } + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, default_pthread_attr(), thread_manage_clients, (void *) NULL); @@ -6060,6 +6091,7 @@ exit_dispatch: } exit_client: +exit_rotation: exit_notification: ret = pthread_join(health_thread, &status); if (ret) { @@ -6109,6 +6141,18 @@ exit_init_data: notification_thread_handle_destroy(notification_thread_handle); } + if (rotation_thread_handle) { + if (rotation_thread_running) { + ret = pthread_join(rotation_thread, &status); + if (ret) { + errno = ret; + PERROR("pthread_join rotation thread"); + retval = -1; + } + } + rotation_thread_handle_destroy(rotation_thread_handle); + } + rcu_thread_offline(); rcu_unregister_thread(); diff --git a/src/bin/lttng-sessiond/rotate.c b/src/bin/lttng-sessiond/rotate.c new file mode 100644 index 000000000..05e9bb08a --- /dev/null +++ b/src/bin/lttng-sessiond/rotate.c @@ -0,0 +1,320 @@ +/* + * Copyright (C) 2017 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _LGPL_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "session.h" +#include "rotate.h" +#include "rotation-thread.h" +#include "lttng-sessiond.h" +#include "health-sessiond.h" +#include "cmd.h" +#include "utils.h" + +#include +#include +#include + +unsigned long hash_channel_key(struct rotation_channel_key *key) +{ + return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong( + (void *) (unsigned long) key->domain, lttng_ht_seed); +} + +/* The session's lock must be held by the caller. */ +static +int session_rename_chunk(struct ltt_session *session, char *current_path, + char *new_path) +{ + int ret; + struct consumer_socket *socket; + struct consumer_output *output; + struct lttng_ht_iter iter; + uid_t uid; + gid_t gid; + + DBG("Renaming session chunk path of session \"%s\" from %s to %s", + session->name, current_path, new_path); + + /* + * Either one of the sessions is enough to find the consumer_output + * and uid/gid. + */ + if (session->kernel_session) { + output = session->kernel_session->consumer; + uid = session->kernel_session->uid; + gid = session->kernel_session->gid; + } else if (session->ust_session) { + output = session->ust_session->consumer; + uid = session->ust_session->uid; + gid = session->ust_session->gid; + } else { + assert(0); + } + + if (!output || !output->socks) { + ERR("No consumer output found for session \"%s\"", + session->name); + ret = -1; + goto end; + } + + rcu_read_lock(); + /* + * We have to iterate to find a socket, but we only need to send the + * rename command to one consumer, so we break after the first one. + */ + cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_rotate_rename(socket, session->id, output, + current_path, new_path, uid, gid); + pthread_mutex_unlock(socket->lock); + if (ret) { + ret = -1; + goto end_unlock; + } + break; + } + + ret = 0; + +end_unlock: + rcu_read_unlock(); +end: + return ret; +} + +/* The session's lock must be held by the caller. */ +static +int rename_first_chunk(struct ltt_session *session, + struct consumer_output *consumer, char *new_path) +{ + int ret; + char current_full_path[LTTNG_PATH_MAX], new_full_path[LTTNG_PATH_MAX]; + + /* Current domain path: /kernel */ + if (session->net_handle > 0) { + ret = snprintf(current_full_path, sizeof(current_full_path), "%s/%s", + consumer->dst.net.base_dir, consumer->subdir); + if (ret < 0 || ret >= sizeof(current_full_path)) { + ERR("Failed to initialize current full path while renaming first rotation chunk of session \"%s\"", + session->name); + ret = -1; + goto error; + } + } else { + ret = snprintf(current_full_path, sizeof(current_full_path), "%s/%s", + consumer->dst.session_root_path, consumer->subdir); + if (ret < 0 || ret >= sizeof(current_full_path)) { + ERR("Failed to initialize current full path while renaming first rotation chunk of session \"%s\"", + session->name); + ret = -1; + goto error; + } + } + /* New domain path: /--/kernel */ + ret = snprintf(new_full_path, sizeof(new_full_path), "%s/%s", + new_path, consumer->subdir); + if (ret < 0 || ret >= sizeof(new_full_path)) { + ERR("Failed to initialize new full path while renaming first rotation chunk of session \"%s\"", + session->name); + ret = -1; + goto error; + } + /* + * Move the per-domain fcurrenter inside the first rotation + * fcurrenter. + */ + ret = session_rename_chunk(session, current_full_path, new_full_path); + if (ret < 0) { + ret = -LTTNG_ERR_UNK; + goto error; + } + + ret = 0; + +error: + return ret; +} + +/* + * Rename a chunk folder after a rotation is complete. + * session_lock_list and session lock must be held. + * + * Returns 0 on success, a negative value on error. + */ +int rename_complete_chunk(struct ltt_session *session, time_t ts) +{ + struct tm *timeinfo; + char datetime[16], start_datetime[16]; + char new_path[LTTNG_PATH_MAX]; + int ret; + size_t strf_ret; + + DBG("Renaming completed chunk for session %s", session->name); + timeinfo = localtime(&ts); + if (!timeinfo) { + ERR("Failed to retrieve local time while renaming completed chunk"); + ret = -1; + goto end; + } + strf_ret = strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", + timeinfo); + if (strf_ret == 0) { + ERR("Failed to format timestamp while renaming completed session chunk"); + ret = -1; + goto end; + } + + if (session->rotate_count == 1) { + char start_time[16]; + + timeinfo = localtime(&session->last_chunk_start_ts); + if (!timeinfo) { + ERR("Failed to retrieve local time while renaming completed chunk"); + ret = -1; + goto end; + } + + strf_ret = strftime(start_time, sizeof(start_time), + "%Y%m%d-%H%M%S", timeinfo); + if (strf_ret == 0) { + ERR("Failed to format timestamp while renaming completed session chunk"); + ret = -1; + goto end; + } + + /* + * On the first rotation, the current_rotate_path is the + * session_root_path, so we need to create the chunk folder + * and move the domain-specific folders inside it. + */ + ret = snprintf(new_path, sizeof(new_path), "%s/%s-%s-%" PRIu64, + session->rotation_chunk.current_rotate_path, + start_time, + datetime, session->rotate_count); + if (ret < 0 || ret >= sizeof(new_path)) { + ERR("Failed to format new chunk path while renaming session \"%s\"'s first chunk", + session->name); + ret = -1; + goto end; + } + + if (session->kernel_session) { + ret = rename_first_chunk(session, + session->kernel_session->consumer, + new_path); + if (ret) { + ERR("Failed to rename kernel session trace folder to %s", new_path); + /* + * This is not a fatal error for the rotation + * thread, we just need to inform the client + * that a problem occurred with the rotation. + * Returning 0, same for the other errors + * below. + */ + ret = 0; + goto error; + } + } + if (session->ust_session) { + ret = rename_first_chunk(session, + session->ust_session->consumer, + new_path); + if (ret) { + ERR("Failed to rename userspace session trace folder to %s", new_path); + ret = 0; + goto error; + } + } + } else { + /* + * After the first rotation, all the trace data is already in + * its own chunk folder, we just need to append the suffix. + */ + /* Recreate the session->rotation_chunk.current_rotate_path */ + timeinfo = localtime(&session->last_chunk_start_ts); + if (!timeinfo) { + ERR("Failed to retrieve local time while renaming completed chunk"); + ret = -1; + goto end; + } + strf_ret = strftime(start_datetime, sizeof(start_datetime), "%Y%m%d-%H%M%S", timeinfo); + if (!strf_ret) { + ERR("Failed to format timestamp while renaming completed session chunk"); + ret = -1; + goto end; + } + ret = snprintf(new_path, sizeof(new_path), "%s/%s-%s-%" PRIu64, + session_get_base_path(session), + start_datetime, + datetime, session->rotate_count); + if (ret < 0 || ret >= sizeof(new_path)) { + ERR("Failed to format new chunk path while renaming chunk of session \"%s\"", + session->name); + ret = -1; + goto error; + } + ret = session_rename_chunk(session, + session->rotation_chunk.current_rotate_path, + new_path); + if (ret) { + ERR("Failed to rename session trace folder from %s to %s", + session->rotation_chunk.current_rotate_path, + new_path); + ret = 0; + goto error; + } + } + + /* + * Store the path where the readable chunk is. This path is valid + * and can be queried by the client with rotate_pending until the next + * rotation is started. + */ + ret = lttng_strncpy(session->rotation_chunk.current_rotate_path, + new_path, + sizeof(session->rotation_chunk.current_rotate_path)); + if (ret) { + ERR("Failed the current chunk's path of session \"%s\"", + session->name); + ret = -1; + goto error; + } + + goto end; + +error: + session->rotation_status = LTTNG_ROTATION_STATUS_ERROR; +end: + return ret; +} diff --git a/src/bin/lttng-sessiond/rotate.h b/src/bin/lttng-sessiond/rotate.h new file mode 100644 index 000000000..b2f006f3e --- /dev/null +++ b/src/bin/lttng-sessiond/rotate.h @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2017 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef ROTATE_H +#define ROTATE_H + +#include "rotation-thread.h" + +/* + * Key in channel_pending_rotate_ht to map a channel to a + * struct rotation_channel_info. + */ +struct rotation_channel_key { + uint64_t key; + enum lttng_domain_type domain; +}; + +/* + * Added in channel_pending_rotate_ht everytime we start the rotation of a + * channel. The consumer notifies the rotation thread with the channel_key to + * inform a rotation is complete, we use that information to lookup the related + * session from channel_pending_rotate_ht. + */ +struct rotation_channel_info { + uint64_t session_id; + struct rotation_channel_key channel_key; + struct cds_lfht_node rotate_channels_ht_node; +}; + +extern struct cds_lfht *channel_pending_rotate_ht; + +unsigned long hash_channel_key(struct rotation_channel_key *key); + +/* session lock must be held by this function's caller. */ +int rename_complete_chunk(struct ltt_session *session, time_t ts); + +#endif /* ROTATE_H */ diff --git a/src/bin/lttng-sessiond/rotation-thread.c b/src/bin/lttng-sessiond/rotation-thread.c new file mode 100644 index 000000000..b245f75ad --- /dev/null +++ b/src/bin/lttng-sessiond/rotation-thread.c @@ -0,0 +1,453 @@ +/* + * Copyright (C) 2017 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _LGPL_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "rotation-thread.h" +#include "lttng-sessiond.h" +#include "health-sessiond.h" +#include "rotate.h" +#include "cmd.h" +#include "session.h" + +#include +#include +#include + +/* + * Store a struct rotation_channel_info for each channel that is currently + * being rotated by the consumer. + */ +struct cds_lfht *channel_pending_rotate_ht; + +struct rotation_thread_state { + struct lttng_poll_event events; +}; + +static +void channel_rotation_info_destroy(struct rotation_channel_info *channel_info) +{ + assert(channel_info); + free(channel_info); +} + +static +int match_channel_info(struct cds_lfht_node *node, const void *key) +{ + struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key; + struct rotation_channel_info *channel_info; + + channel_info = caa_container_of(node, struct rotation_channel_info, + rotate_channels_ht_node); + + return !!((channel_key->key == channel_info->channel_key.key) && + (channel_key->domain == channel_info->channel_key.domain)); +} + +static +struct rotation_channel_info *lookup_channel_pending(uint64_t key, + enum lttng_domain_type domain) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + struct rotation_channel_info *channel_info = NULL; + struct rotation_channel_key channel_key = { .key = key, + .domain = domain }; + + cds_lfht_lookup(channel_pending_rotate_ht, + hash_channel_key(&channel_key), + match_channel_info, + &channel_key, &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end; + } + + channel_info = caa_container_of(node, struct rotation_channel_info, + rotate_channels_ht_node); + cds_lfht_del(channel_pending_rotate_ht, node); +end: + return channel_info; +} + +/* + * Destroy the thread data previously created by the init function. + */ +void rotation_thread_handle_destroy( + struct rotation_thread_handle *handle) +{ + int ret; + + if (!handle) { + goto end; + } + + if (handle->ust32_consumer >= 0) { + ret = close(handle->ust32_consumer); + if (ret) { + PERROR("close 32-bit consumer channel rotation pipe"); + } + } + if (handle->ust64_consumer >= 0) { + ret = close(handle->ust64_consumer); + if (ret) { + PERROR("close 64-bit consumer channel rotation pipe"); + } + } + if (handle->kernel_consumer >= 0) { + ret = close(handle->kernel_consumer); + if (ret) { + PERROR("close kernel consumer channel rotation pipe"); + } + } + +end: + free(handle); +} + +struct rotation_thread_handle *rotation_thread_handle_create( + struct lttng_pipe *ust32_channel_rotate_pipe, + struct lttng_pipe *ust64_channel_rotate_pipe, + struct lttng_pipe *kernel_channel_rotate_pipe, + int thread_quit_pipe) +{ + struct rotation_thread_handle *handle; + + handle = zmalloc(sizeof(*handle)); + if (!handle) { + goto end; + } + + if (ust32_channel_rotate_pipe) { + handle->ust32_consumer = + lttng_pipe_release_readfd( + ust32_channel_rotate_pipe); + if (handle->ust32_consumer < 0) { + goto error; + } + } else { + handle->ust32_consumer = -1; + } + if (ust64_channel_rotate_pipe) { + handle->ust64_consumer = + lttng_pipe_release_readfd( + ust64_channel_rotate_pipe); + if (handle->ust64_consumer < 0) { + goto error; + } + } else { + handle->ust64_consumer = -1; + } + if (kernel_channel_rotate_pipe) { + handle->kernel_consumer = + lttng_pipe_release_readfd( + kernel_channel_rotate_pipe); + if (handle->kernel_consumer < 0) { + goto error; + } + } else { + handle->kernel_consumer = -1; + } + handle->thread_quit_pipe = thread_quit_pipe; + +end: + return handle; +error: + rotation_thread_handle_destroy(handle); + return NULL; +} + +static +int init_poll_set(struct lttng_poll_event *poll_set, + struct rotation_thread_handle *handle) +{ + int ret; + + /* + * Create pollset with size 4: + * - sessiond quit pipe + * - consumerd (32-bit user space) channel rotate pipe, + * - consumerd (64-bit user space) channel rotate pipe, + * - consumerd (kernel) channel rotate pipe, + */ + ret = lttng_poll_create(poll_set, 4, LTTNG_CLOEXEC); + if (ret < 0) { + goto end; + } + + ret = lttng_poll_add(poll_set, handle->thread_quit_pipe, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset"); + goto error; + } + ret = lttng_poll_add(poll_set, handle->ust32_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset"); + goto error; + } + ret = lttng_poll_add(poll_set, handle->ust64_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset"); + goto error; + } + if (handle->kernel_consumer >= 0) { + ret = lttng_poll_add(poll_set, handle->kernel_consumer, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset"); + goto error; + } + } + +end: + return ret; +error: + lttng_poll_clean(poll_set); + return ret; +} + +static +void fini_thread_state(struct rotation_thread_state *state) +{ + lttng_poll_clean(&state->events); + cds_lfht_destroy(channel_pending_rotate_ht, NULL); +} + +static +int init_thread_state(struct rotation_thread_handle *handle, + struct rotation_thread_state *state) +{ + int ret; + + memset(state, 0, sizeof(*state)); + lttng_poll_init(&state->events); + + ret = init_poll_set(&state->events, handle); + if (ret) { + ERR("[rotation-thread] Failed to initialize rotation thread poll set"); + goto end; + } + + channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!channel_pending_rotate_ht) { + ERR("[rotation-thread] Failed to create channel pending rotation hash table"); + ret = -1; + goto end; + } + +end: + return ret; +} + +static +int handle_channel_rotation_pipe(int fd, uint32_t revents, + struct rotation_thread_handle *handle, + struct rotation_thread_state *state) +{ + int ret = 0; + enum lttng_domain_type domain; + struct rotation_channel_info *channel_info; + struct ltt_session *session = NULL; + uint64_t key; + + if (fd == handle->ust32_consumer || + fd == handle->ust64_consumer) { + domain = LTTNG_DOMAIN_UST; + } else if (fd == handle->kernel_consumer) { + domain = LTTNG_DOMAIN_KERNEL; + } else { + ERR("[rotation-thread] Unknown channel rotation pipe fd %d", + fd); + abort(); + } + + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ret = lttng_poll_del(&state->events, fd); + if (ret) { + ERR("[rotation-thread] Failed to remove consumer " + "rotation pipe from poll set"); + } + goto end; + } + + do { + ret = read(fd, &key, sizeof(key)); + } while (ret == -1 && errno == EINTR); + if (ret != sizeof(key)) { + ERR("[rotation-thread] Failed to read from pipe (fd = %i)", + fd); + ret = -1; + goto end; + } + + DBG("[rotation-thread] Received notification for chan %" PRIu64 + ", domain %d\n", key, domain); + + channel_info = lookup_channel_pending(key, domain); + if (!channel_info) { + ERR("[rotation-thread] Failed to find channel_info (key = %" + PRIu64 ")", key); + ret = -1; + goto end; + } + rcu_read_lock(); + session_lock_list(); + session = session_find_by_id(channel_info->session_id); + if (!session) { + /* + * The session may have been destroyed before we had a chance to + * perform this action, return gracefully. + */ + DBG("[rotation-thread] Session %" PRIu64 " not found", + channel_info->session_id); + ret = 0; + goto end_unlock_session_list; + } + + session_lock(session); + if (--session->nr_chan_rotate_pending == 0) { + time_t now = time(NULL); + + if (now == (time_t) -1) { + session->rotation_status = LTTNG_ROTATION_STATUS_ERROR; + ret = LTTNG_ERR_UNK; + goto end_unlock_session; + } + + ret = rename_complete_chunk(session, now); + if (ret < 0) { + ERR("Failed to rename completed rotation chunk"); + goto end_unlock_session; + } + session->rotate_pending = false; + session->rotation_status = LTTNG_ROTATION_STATUS_COMPLETED; + session->last_chunk_start_ts = session->current_chunk_start_ts; + DBG("Rotation completed for session %s", session->name); + } + + ret = 0; + +end_unlock_session: + channel_rotation_info_destroy(channel_info); + session_unlock(session); +end_unlock_session_list: + session_unlock_list(); + rcu_read_unlock(); +end: + return ret; +} + +void *thread_rotation(void *data) +{ + int ret; + struct rotation_thread_handle *handle = data; + struct rotation_thread_state state; + + DBG("[rotation-thread] Started rotation thread"); + + if (!handle) { + ERR("[rotation-thread] Invalid thread context provided"); + goto end; + } + + rcu_register_thread(); + rcu_thread_online(); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION); + health_code_update(); + + ret = init_thread_state(handle, &state); + if (ret) { + goto end; + } + + /* Ready to handle client connections. */ + sessiond_notify_ready(); + + while (true) { + int fd_count, i; + + health_poll_entry(); + DBG("[rotation-thread] Entering poll wait"); + ret = lttng_poll_wait(&state.events, -1); + DBG("[rotation-thread] Poll wait returned (%i)", ret); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + continue; + } + ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret); + goto error; + } + + fd_count = ret; + for (i = 0; i < fd_count; i++) { + int fd = LTTNG_POLL_GETFD(&state.events, i); + uint32_t revents = LTTNG_POLL_GETEV(&state.events, i); + + DBG("[rotation-thread] Handling fd (%i) activity (%u)", + fd, revents); + + if (fd == handle->thread_quit_pipe) { + DBG("[rotation-thread] Quit pipe activity"); + goto exit; + } else if (fd == handle->ust32_consumer || + fd == handle->ust64_consumer || + fd == handle->kernel_consumer) { + ret = handle_channel_rotation_pipe(fd, + revents, handle, &state); + if (ret) { + ERR("[rotation-thread] Handle channel rotation pipe"); + goto error; + } + } + } + } +exit: +error: + DBG("[rotation-thread] Exit"); + fini_thread_state(&state); + health_unregister(health_sessiond); + rcu_thread_offline(); + rcu_unregister_thread(); +end: + return NULL; +} diff --git a/src/bin/lttng-sessiond/rotation-thread.h b/src/bin/lttng-sessiond/rotation-thread.h new file mode 100644 index 000000000..64e1ad98d --- /dev/null +++ b/src/bin/lttng-sessiond/rotation-thread.h @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2017 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef ROTATION_THREAD_H +#define ROTATION_THREAD_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include "session.h" + +struct rotation_thread_handle { + /* + * Read side of pipes used to communicate with the rotation thread. + */ + /* Notification from the consumers */ + int ust32_consumer; + int ust64_consumer; + int kernel_consumer; + /* quit pipe */ + int thread_quit_pipe; +}; + +struct rotation_thread_handle *rotation_thread_handle_create( + struct lttng_pipe *ust32_channel_rotate_pipe, + struct lttng_pipe *ust64_channel_rotate_pipe, + struct lttng_pipe *kernel_channel_rotate_pipe, + int thread_quit_pipe); + +void rotation_thread_handle_destroy( + struct rotation_thread_handle *handle); + +void *thread_rotation(void *data); + +#endif /* ROTATION_THREAD_H */ diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 77f0bf7b1..3b3380c3f 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -22,6 +22,7 @@ #include #include +#include #include "snapshot.h" #include "trace-kernel.h" @@ -117,6 +118,50 @@ struct ltt_session { * Node in ltt_sessions_ht_by_id. */ struct lttng_ht_node_u64 node; + /* Number of session rotation for this session. */ + uint64_t rotate_count; + /* + * Rotation is pending between the time it starts until the consumer has + * finished extracting the data. If the session uses a relay, data related + * to a rotation can still be in flight after that, see + * rotate_pending_relay. + */ + bool rotate_pending; + /* Current status of a rotation. */ + enum lttng_rotation_status rotation_status; + /* + * Number of channels waiting for a rotation. + * When this number reaches 0, we can handle the rename of the chunk + * folder and inform the client that the rotate is finished. + */ + unsigned int nr_chan_rotate_pending; + struct { + /* + * When the rotation is in progress, the temporary path name is + * stored here. When the rotation is complete, the final path name + * is here and can be queried with the rotate_pending call. + */ + char current_rotate_path[LTTNG_PATH_MAX]; + /* + * The path where the consumer is currently writing after the first + * session rotation. + */ + char active_tracing_path[LTTNG_PATH_MAX]; + } rotation_chunk; + /* + * The timestamp of the beginning of the previous chunk. For the + * first chunk, this is the "lttng start" timestamp. For the + * subsequent ones, this copies the current_chunk_start_ts value when + * a new rotation starts. This value is used to set the name of a + * complete chunk directory, ex: "last_chunk_start_ts-now()". + */ + time_t last_chunk_start_ts; + /* + * This is the timestamp when a new chunk starts. When a new rotation + * starts, we copy this value to last_chunk_start_ts and replace it + * with the current timestamp. + */ + time_t current_chunk_start_ts; }; /* Prototypes */ diff --git a/src/lib/lttng-ctl/lttng-ctl-health.c b/src/lib/lttng-ctl/lttng-ctl-health.c index fffdfa0d8..1f1f16056 100644 --- a/src/lib/lttng-ctl/lttng-ctl-health.c +++ b/src/lib/lttng-ctl/lttng-ctl-health.c @@ -70,6 +70,7 @@ const char *sessiond_thread_name[NR_HEALTH_SESSIOND_TYPES] = { [ HEALTH_SESSIOND_TYPE_HT_CLEANUP ] = "Session daemon hash table cleanup", [ HEALTH_SESSIOND_TYPE_APP_MANAGE_NOTIFY ] = "Session daemon application notification manager", [ HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH ] = "Session daemon application registration dispatcher", + [ HEALTH_SESSIOND_TYPE_ROTATION ] = "Session daemon rotation manager", }; static -- 2.34.1