From 65931c8b1d91fd946a2e117a5a1f61eba0b5087a Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 9 Oct 2013 10:21:14 -0400 Subject: [PATCH] Add health thread to relayd Signed-off-by: Mathieu Desnoyers --- src/bin/lttng-relayd/Makefile.am | 2 +- src/bin/lttng-relayd/health-relayd.c | 391 +++++++++++++++++++++++++++ src/bin/lttng-relayd/health-relayd.h | 4 + src/bin/lttng-relayd/lttng-relayd.h | 2 + src/bin/lttng-relayd/main.c | 47 +++- src/common/defaults.h | 8 + 6 files changed, 452 insertions(+), 2 deletions(-) create mode 100644 src/bin/lttng-relayd/health-relayd.c diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index fc8e6d055..1e675454c 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -12,7 +12,7 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \ cmd-2-1.c cmd-2-1.h \ cmd-2-2.c cmd-2-2.h \ cmd-2-4.c cmd-2-4.h \ - health-relayd.h + health-relayd.c health-relayd.h # link on liblttngctl for check if relayd is already alive. lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \ diff --git a/src/bin/lttng-relayd/health-relayd.c b/src/bin/lttng-relayd/health-relayd.c new file mode 100644 index 000000000..f2b167c4a --- /dev/null +++ b/src/bin/lttng-relayd/health-relayd.c @@ -0,0 +1,391 @@ +/* + * Copyright (C) 2013 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "lttng-relayd.h" +#include "health-relayd.h" + +/* Global health check unix path */ +static char health_unix_sock_path[PATH_MAX]; + +int health_quit_pipe[2]; + +/* + * Check if the thread quit pipe was triggered. + * + * Return 1 if it was triggered else 0; + */ +static +int check_health_quit_pipe(int fd, uint32_t events) +{ + if (fd == health_quit_pipe[0] && (events & LPOLLIN)) { + return 1; + } + + return 0; +} + +/* + * Send data on a unix socket using the liblttsessiondcomm API. + * + * Return lttcomm error code. + */ +static int send_unix_sock(int sock, void *buf, size_t len) +{ + /* Check valid length */ + if (len == 0) { + return -1; + } + + return lttcomm_send_unix_sock(sock, buf, len); +} + +static int create_lttng_rundir_with_perm(const char *rundir) +{ + int ret; + + DBG3("Creating LTTng run directory: %s", rundir); + + ret = mkdir(rundir, S_IRWXU); + if (ret < 0) { + if (errno != EEXIST) { + ERR("Unable to create %s", rundir); + goto error; + } else { + ret = 0; + } + } else if (ret == 0) { + int is_root = !getuid(); + + if (is_root) { + ret = chown(rundir, 0, + utils_get_group_id(tracing_group_name)); + if (ret < 0) { + ERR("Unable to set group on %s", rundir); + PERROR("chown"); + ret = -1; + goto error; + } + + ret = chmod(rundir, + S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); + if (ret < 0) { + ERR("Unable to set permissions on %s", health_unix_sock_path); + PERROR("chmod"); + ret = -1; + goto error; + } + } + } + +error: + return ret; +} + +static +int setup_health_path(void) +{ + int is_root, ret = 0; + char *home_path = NULL, *rundir, *relayd_path; + + is_root = !getuid(); + + if (is_root) { + rundir = strdup(DEFAULT_LTTNG_RUNDIR); + } else { + /* + * Create rundir from home path. This will create something like + * $HOME/.lttng + */ + home_path = utils_get_home_dir(); + + if (home_path == NULL) { + /* TODO: Add --socket PATH option */ + ERR("Can't get HOME directory for sockets creation."); + ret = -EPERM; + goto end; + } + + ret = asprintf(&rundir, DEFAULT_LTTNG_HOME_RUNDIR, home_path); + if (ret < 0) { + ret = -ENOMEM; + goto end; + } + } + + ret = asprintf(&relayd_path, DEFAULT_RELAYD_PATH, rundir); + if (ret < 0) { + ret = -ENOMEM; + goto end; + } + + ret = create_lttng_rundir_with_perm(rundir); + if (ret < 0) { + goto end; + } + + ret = create_lttng_rundir_with_perm(relayd_path); + if (ret < 0) { + goto end; + } + + if (is_root) { + if (strlen(health_unix_sock_path) != 0) { + goto end; + } + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_GLOBAL_RELAY_HEALTH_UNIX_SOCK, + getpid()); + } else { + /* Set health check Unix path */ + if (strlen(health_unix_sock_path) != 0) { + goto end; + } + + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_HOME_RELAY_HEALTH_UNIX_SOCK, + home_path, getpid()); + } + +end: + return ret; +} + +/* + * Thread managing health check socket. + */ +void *thread_manage_health(void *data) +{ + int sock = -1, new_sock = -1, ret, i, pollfd, err = -1; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + struct health_comm_msg msg; + struct health_comm_reply reply; + int is_root; + + DBG("[thread] Manage health check started"); + + setup_health_path(); + + rcu_register_thread(); + + /* We might hit an error path before this is created. */ + lttng_poll_init(&events); + + /* Create unix socket */ + sock = lttcomm_create_unix_sock(health_unix_sock_path); + if (sock < 0) { + ERR("Unable to create health check Unix socket"); + ret = -1; + goto error; + } + + is_root = !getuid(); + if (is_root) { + /* lttng health client socket path permissions */ + ret = chown(health_unix_sock_path, 0, + utils_get_group_id(tracing_group_name)); + if (ret < 0) { + ERR("Unable to set group on %s", health_unix_sock_path); + PERROR("chown"); + ret = -1; + goto error; + } + + ret = chmod(health_unix_sock_path, + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + if (ret < 0) { + ERR("Unable to set permissions on %s", health_unix_sock_path); + PERROR("chmod"); + ret = -1; + goto error; + } + } + + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(sock); + + ret = lttcomm_listen_unix_sock(sock); + if (ret < 0) { + goto error; + } + + /* Size is set to 1 for the consumer_channel pipe */ + ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); + if (ret < 0) { + ERR("Poll set creation failed"); + goto error; + } + + ret = lttng_poll_add(&events, health_quit_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + + /* Add the application registration socket */ + ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI); + if (ret < 0) { + goto error; + } + + while (1) { + DBG("Health check ready"); + + /* Inifinite blocking call, waiting for transmission */ +restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_health_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } + + /* Event on the registration socket */ + if (pollfd == sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Health socket poll error"); + goto error; + } + } + } + + new_sock = lttcomm_accept_unix_sock(sock); + if (new_sock < 0) { + goto error; + } + + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(new_sock); + + DBG("Receiving data from client for health..."); + ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg)); + if (ret <= 0) { + DBG("Nothing recv() from client... continuing"); + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + continue; + } + + rcu_thread_online(); + + assert(msg.cmd == HEALTH_CMD_CHECK); + + reply.ret_code = 0; + for (i = 0; i < NR_HEALTH_RELAYD_TYPES; i++) { + /* + * health_check_state return 0 if thread is in + * error. + */ + if (!health_check_state(health_relayd, i)) { + reply.ret_code |= 1ULL << i; + } + } + + DBG2("Health check return value %" PRIx64, reply.ret_code); + + ret = send_unix_sock(new_sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + ERR("Failed to send health data back to client"); + } + + /* End of transmission */ + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + } + +exit: +error: + if (err) { + ERR("Health error occurred in %s", __func__); + } + DBG("Health check thread dying"); + unlink(health_unix_sock_path); + if (sock >= 0) { + ret = close(sock); + if (ret) { + PERROR("close"); + } + } + + lttng_poll_clean(&events); + + rcu_unregister_thread(); + return NULL; +} diff --git a/src/bin/lttng-relayd/health-relayd.h b/src/bin/lttng-relayd/health-relayd.h index 002ce4b4d..6bfb73b5a 100644 --- a/src/bin/lttng-relayd/health-relayd.h +++ b/src/bin/lttng-relayd/health-relayd.h @@ -34,4 +34,8 @@ enum health_type_relayd { extern struct health_app *health_relayd; +extern int health_quit_pipe[2]; + +void *thread_manage_health(void *data); + #endif /* HEALTH_RELAYD_H */ diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index 358615f9a..f264c1867 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -179,6 +179,8 @@ extern struct lttng_ht *relay_streams_ht; extern struct lttng_ht *viewer_streams_ht; extern struct lttng_ht *indexes_ht; +extern const char *tracing_group_name; + struct relay_stream *relay_stream_find_by_id(uint64_t stream_id); #endif /* LTTNG_RELAYD_H */ diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 6727a547d..81aa642b1 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -68,6 +68,8 @@ static struct lttng_uri *live_uri; const char *progname; +const char *tracing_group_name = DEFAULT_TRACING_GROUP; + /* * Quit pipe for all threads. This permits a single cancellation point * for all threads when receiving an event on the pipe. @@ -86,6 +88,7 @@ static int dispatch_thread_exit; static pthread_t listener_thread; static pthread_t dispatcher_thread; static pthread_t worker_thread; +static pthread_t health_thread; static uint64_t last_relay_stream_id; static uint64_t last_relay_session_id; @@ -131,6 +134,7 @@ void usage(void) fprintf(stderr, " -D, --data-port URL Data port listening.\n"); fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n"); fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); + fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n"); } static @@ -144,6 +148,7 @@ int parse_args(int argc, char **argv) { "control-port", 1, 0, 'C', }, { "data-port", 1, 0, 'D', }, { "daemonize", 0, 0, 'd', }, + { "group", 1, 0, 'g', }, { "help", 0, 0, 'h', }, { "output", 1, 0, 'o', }, { "verbose", 0, 0, 'v', }, @@ -152,7 +157,7 @@ int parse_args(int argc, char **argv) while (1) { int option_index = 0; - c = getopt_long(argc, argv, "dhv" "C:D:o:", + c = getopt_long(argc, argv, "dhv" "C:D:o:g:", long_options, &option_index); if (c == -1) { break; @@ -188,6 +193,9 @@ int parse_args(int argc, char **argv) case 'd': opt_daemon = 1; break; + case 'g': + tracing_group_name = optarg; + break; case 'h': usage(); exit(EXIT_FAILURE); @@ -297,6 +305,18 @@ int notify_thread_pipe(int wpipe) return ret; } +static void notify_health_quit_pipe(int *pipe) +{ + int ret; + + do { + ret = write(pipe[1], "4", 1); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != 1) { + PERROR("write relay health quit"); + } +} + /* * Stop all threads by closing the thread quit pipe. */ @@ -312,6 +332,8 @@ void stop_threads(void) ERR("write error on thread quit pipe"); } + notify_health_quit_pipe(health_quit_pipe); + /* Dispatch thread */ CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&relay_cmd_queue.futex); @@ -2569,6 +2591,19 @@ int main(int argc, char **argv) goto exit_health_app_create; } + ret = utils_create_pipe(health_quit_pipe); + if (ret < 0) { + goto error_health_pipe; + } + + /* Create thread to manage the client socket */ + ret = pthread_create(&health_thread, NULL, + thread_manage_health, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create health"); + goto health_error; + } + /* Setup the dispatcher thread */ ret = pthread_create(&dispatcher_thread, NULL, relay_thread_dispatcher, (void *) NULL); @@ -2623,6 +2658,16 @@ exit_worker: } exit_dispatcher: + ret = pthread_join(health_thread, &status); + if (ret != 0) { + PERROR("pthread_join health thread"); + goto error; /* join error, exit without cleanup */ + } + +health_error: + utils_close_pipe(health_quit_pipe); + +error_health_pipe: health_app_destroy(health_relayd); exit_health_app_create: diff --git a/src/common/defaults.h b/src/common/defaults.h index 40b814a16..0c9f134a9 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -78,6 +78,10 @@ #define DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH DEFAULT_USTCONSUMERD32_PATH "/command" #define DEFAULT_USTCONSUMERD32_ERR_SOCK_PATH DEFAULT_USTCONSUMERD32_PATH "/error" +/* Relayd path */ +#define DEFAULT_RELAYD_RUNDIR "%s" +#define DEFAULT_RELAYD_PATH DEFAULT_RELAYD_RUNDIR "/relayd" + /* Default lttng run directory */ #define DEFAULT_LTTNG_HOME_ENV_VAR "LTTNG_HOME" #define DEFAULT_LTTNG_FALLBACK_HOME_ENV_VAR "HOME" @@ -99,6 +103,10 @@ #define DEFAULT_GLOBAL_KCONSUMER_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/kconsumerd/health" #define DEFAULT_HOME_KCONSUMER_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/kconsumerd/health" +/* Default relay health unix socket path */ +#define DEFAULT_GLOBAL_RELAY_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/relayd/health-%d" +#define DEFAULT_HOME_RELAY_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/relayd/health-%d" + #define DEFAULT_GLOBAL_APPS_UNIX_SOCK \ DEFAULT_LTTNG_RUNDIR "/" LTTNG_UST_SOCK_FILENAME #define DEFAULT_HOME_APPS_UNIX_SOCK \ -- 2.34.1