From 5c635c724d60fe8e8bfeb00907dfa1e113cc3548 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 24 Sep 2013 14:35:42 -0400 Subject: [PATCH] Implement consumer health check thread Signed-off-by: Mathieu Desnoyers --- src/bin/lttng-consumerd/Makefile.am | 3 +- src/bin/lttng-consumerd/health-consumerd.c | 364 +++++++++++++++++++++ src/bin/lttng-consumerd/health-consumerd.h | 4 + src/bin/lttng-consumerd/lttng-consumerd.c | 39 ++- src/bin/lttng-consumerd/lttng-consumerd.h | 2 + src/common/consumer.c | 14 + src/common/defaults.h | 8 + 7 files changed, 430 insertions(+), 4 deletions(-) create mode 100644 src/bin/lttng-consumerd/health-consumerd.c diff --git a/src/bin/lttng-consumerd/Makefile.am b/src/bin/lttng-consumerd/Makefile.am index 9f029a15a..1aa0f27f7 100644 --- a/src/bin/lttng-consumerd/Makefile.am +++ b/src/bin/lttng-consumerd/Makefile.am @@ -4,7 +4,8 @@ lttnglibexec_PROGRAMS = lttng-consumerd lttng_consumerd_SOURCES = lttng-consumerd.c \ lttng-consumerd.h \ - health-consumerd.h + health-consumerd.h \ + health-consumerd.c lttng_consumerd_LDADD = \ $(top_builddir)/src/common/libconsumer.la \ diff --git a/src/bin/lttng-consumerd/health-consumerd.c b/src/bin/lttng-consumerd/health-consumerd.c new file mode 100644 index 000000000..062e46b93 --- /dev/null +++ b/src/bin/lttng-consumerd/health-consumerd.c @@ -0,0 +1,364 @@ +/* + * Copyright (C) 2013 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "lttng-consumerd.h" +#include "health-consumerd.h" + +/* Global health check unix path */ +static char health_unix_sock_path[PATH_MAX]; + +int health_quit_pipe[2]; + +/* + * Check if the thread quit pipe was triggered. + * + * Return 1 if it was triggered else 0; + */ +static +int check_health_quit_pipe(int fd, uint32_t events) +{ + if (fd == health_quit_pipe[0] && (events & LPOLLIN)) { + return 1; + } + + return 0; +} + +/* + * Send data on a unix socket using the liblttsessiondcomm API. + * + * Return lttcomm error code. + */ +static int send_unix_sock(int sock, void *buf, size_t len) +{ + /* Check valid length */ + if (len == 0) { + return -1; + } + + return lttcomm_send_unix_sock(sock, buf, len); +} + +static +int setup_health_path(void) +{ + int is_root, ret = 0; + enum lttng_consumer_type type; + const char *home_path; + + type = lttng_consumer_get_type(); + is_root = !getuid(); + + if (is_root) { + if (strlen(health_unix_sock_path) != 0) { + goto end; + } + switch (type) { + case LTTNG_CONSUMER_KERNEL: + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_GLOBAL_KCONSUMER_HEALTH_UNIX_SOCK); + break; + case LTTNG_CONSUMER64_UST: + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_GLOBAL_USTCONSUMER64_HEALTH_UNIX_SOCK); + break; + case LTTNG_CONSUMER32_UST: + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_GLOBAL_USTCONSUMER32_HEALTH_UNIX_SOCK); + break; + default: + ret = -EINVAL; + goto end; + } + } else { + static char *rundir; + + home_path = utils_get_home_dir(); + if (home_path == NULL) { + /* TODO: Add --socket PATH option */ + ERR("Can't get HOME directory for sockets creation."); + ret = -EPERM; + goto end; + } + + /* + * Create rundir from home path. This will create something like + * $HOME/.lttng + */ + ret = asprintf(&rundir, DEFAULT_LTTNG_HOME_RUNDIR, home_path); + if (ret < 0) { + ret = -ENOMEM; + goto end; + } + + /* Set health check Unix path */ + if (strlen(health_unix_sock_path) != 0) { + goto end; + } + switch (type) { + case LTTNG_CONSUMER_KERNEL: + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_HOME_KCONSUMER_HEALTH_UNIX_SOCK, rundir); + break; + case LTTNG_CONSUMER64_UST: + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_HOME_USTCONSUMER64_HEALTH_UNIX_SOCK, rundir); + break; + case LTTNG_CONSUMER32_UST: + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_HOME_USTCONSUMER32_HEALTH_UNIX_SOCK, rundir); + break; + default: + ret = -EINVAL; + goto end; + } + } + +end: + return ret; +} + +/* + * Thread managing health check socket. + */ +void *thread_manage_health(void *data) +{ + int sock = -1, new_sock = -1, ret, i, pollfd, err = -1; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + struct health_comm_msg msg; + struct health_comm_reply reply; + + DBG("[thread] Manage health check started"); + + setup_health_path(); + + rcu_register_thread(); + + /* We might hit an error path before this is created. */ + lttng_poll_init(&events); + + /* Create unix socket */ + sock = lttcomm_create_unix_sock(health_unix_sock_path); + if (sock < 0) { + ERR("Unable to create health check Unix socket"); + ret = -1; + goto error; + } + + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(sock); + + ret = lttcomm_listen_unix_sock(sock); + if (ret < 0) { + goto error; + } + + /* Size is set to 1 for the consumer_channel pipe */ + ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); + if (ret < 0) { + ERR("Poll set creation failed"); + goto error; + } + + ret = lttng_poll_add(&events, health_quit_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + + /* Add the application registration socket */ + ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI); + if (ret < 0) { + goto error; + } + + while (1) { + DBG("Health check ready"); + + /* Inifinite blocking call, waiting for transmission */ +restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_health_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } + + /* Event on the registration socket */ + if (pollfd == sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Health socket poll error"); + goto error; + } + } + } + + new_sock = lttcomm_accept_unix_sock(sock); + if (new_sock < 0) { + goto error; + } + + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(new_sock); + + DBG("Receiving data from client for health..."); + ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg)); + if (ret <= 0) { + DBG("Nothing recv() from client... continuing"); + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + continue; + } + + rcu_thread_online(); + + assert(msg.cmd == HEALTH_CMD_CHECK); + + switch (msg.component) { + case LTTNG_HEALTH_CONSUMERD_CHANNEL: + reply.ret_code = health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL); + break; + case LTTNG_HEALTH_CONSUMERD_METADATA: + reply.ret_code = health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA); + break; + case LTTNG_HEALTH_CONSUMERD_DATA: + reply.ret_code = health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA); + break; + case LTTNG_HEALTH_CONSUMERD_SESSIOND: + reply.ret_code = health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND); + break; + case LTTNG_HEALTH_CONSUMERD_METADATA_TIMER: + reply.ret_code = health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER); + break; + + case LTTNG_HEALTH_CONSUMERD_ALL: + reply.ret_code = + health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL) && + health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA) && + health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA) && + health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND) && + health_check_state(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER); + break; + default: + reply.ret_code = LTTNG_ERR_UND; + break; + } + + /* + * Flip ret value since 0 is a success and 1 indicates a bad health for + * the client where in the sessiond it is the opposite. Again, this is + * just to make things easier for us poor developer which enjoy a lot + * lazyness. + */ + if (reply.ret_code == 0 || reply.ret_code == 1) { + reply.ret_code = !reply.ret_code; + } + + DBG2("Health check return value %d", reply.ret_code); + + ret = send_unix_sock(new_sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + ERR("Failed to send health data back to client"); + } + + /* End of transmission */ + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + } + +exit: +error: + if (err) { + ERR("Health error occurred in %s", __func__); + } + DBG("Health check thread dying"); + unlink(health_unix_sock_path); + if (sock >= 0) { + ret = close(sock); + if (ret) { + PERROR("close"); + } + } + + lttng_poll_clean(&events); + + rcu_unregister_thread(); + return NULL; +} diff --git a/src/bin/lttng-consumerd/health-consumerd.h b/src/bin/lttng-consumerd/health-consumerd.h index f5e2a34ce..b1cf4a2c9 100644 --- a/src/bin/lttng-consumerd/health-consumerd.h +++ b/src/bin/lttng-consumerd/health-consumerd.h @@ -34,4 +34,8 @@ enum health_type { /* Consumerd health monitoring */ struct health_app *health_consumerd; +void *thread_manage_health(void *data); + +int health_quit_pipe[2]; + #endif /* HEALTH_CONSUMERD_H */ diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index e33a470f5..cf9cb205c 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -47,6 +47,7 @@ #include #include #include +#include #include "lttng-consumerd.h" #include "health-consumerd.h" @@ -55,8 +56,8 @@ /* threads (channel handling, poll, metadata, sessiond) */ -static pthread_t channel_thread, data_thread, metadata_thread, sessiond_thread; -static pthread_t metadata_timer_thread; +static pthread_t channel_thread, data_thread, metadata_thread, + sessiond_thread, metadata_timer_thread, health_thread; /* to count the number of times the user pressed ctrl+c */ static int sigintcount = 0; @@ -76,6 +77,14 @@ static struct lttng_consumer_local_data *ctx; /* Consumerd health monitoring */ struct health_app *health_consumerd; +enum lttng_consumer_type lttng_consumer_get_type(void) +{ + if (!ctx) { + return LTTNG_CONSUMER_UNKNOWN; + } + return ctx->type; +} + /* * Signal handler for the daemon */ @@ -386,12 +395,25 @@ int main(int argc, char **argv) /* Initialize communication library */ lttcomm_init(); + ret = utils_create_pipe(health_quit_pipe); + if (ret < 0) { + goto error_health_pipe; + } + + /* Create thread to manage the client socket */ + ret = pthread_create(&health_thread, NULL, + thread_manage_health, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create health"); + goto health_error; + } + /* Create thread to manage channels */ ret = pthread_create(&channel_thread, NULL, consumer_thread_channel_poll, (void *) ctx); if (ret != 0) { perror("pthread_create"); - goto error; + goto channel_error; } /* Create thread to manage the polling/writing of trace metadata */ @@ -463,6 +485,17 @@ metadata_error: goto error; } +channel_error: + ret = pthread_join(health_thread, &status); + if (ret != 0) { + PERROR("pthread_join health thread"); + goto error; /* join error, exit without cleanup */ + } + +health_error: + utils_close_pipe(health_quit_pipe); + +error_health_pipe: if (!ret) { ret = EXIT_SUCCESS; lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_EXIT_SUCCESS); diff --git a/src/bin/lttng-consumerd/lttng-consumerd.h b/src/bin/lttng-consumerd/lttng-consumerd.h index 9b34547c1..6deb789c6 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.h +++ b/src/bin/lttng-consumerd/lttng-consumerd.h @@ -19,4 +19,6 @@ #ifndef _LTTNG_CONSUMERD_H #define _LTTNG_CONSUMERD_H +enum lttng_consumer_type lttng_consumer_get_type(void); + #endif /* _LTTNG_CONSUMERD_H */ diff --git a/src/common/consumer.c b/src/common/consumer.c index 2f20ffb5d..6f3a02d26 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -94,6 +94,18 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe) (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); } +static void notify_health_quit_pipe(int *pipe) +{ + int ret; + + do { + ret = write(pipe[1], "4", 1); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != 1) { + PERROR("write consumer health quit"); + } +} + static void notify_channel_pipe(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *chan, uint64_t key, @@ -3121,6 +3133,8 @@ end: notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT); + notify_health_quit_pipe(health_quit_pipe); + /* Cleaning up possibly open sockets. */ if (sock >= 0) { ret = close(sock); diff --git a/src/common/defaults.h b/src/common/defaults.h index 616f3cd97..ff7425815 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -91,6 +91,14 @@ #define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/health.sock" #define DEFAULT_HOME_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/health.sock" +/* Default consumer health unix socket path */ +#define DEFAULT_GLOBAL_USTCONSUMER32_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/health.ustconsumer32.sock" +#define DEFAULT_HOME_USTCONSUMER32_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/health.ustconsumer32.sock" +#define DEFAULT_GLOBAL_USTCONSUMER64_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/health.ustconsumer64.sock" +#define DEFAULT_HOME_USTCONSUMER64_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/health.ustconsumer64.sock" +#define DEFAULT_GLOBAL_KCONSUMER_HEALTH_UNIX_SOCK DEFAULT_LTTNG_RUNDIR "/health.kconsumer.sock" +#define DEFAULT_HOME_KCONSUMER_HEALTH_UNIX_SOCK DEFAULT_LTTNG_HOME_RUNDIR "/health.kconsumer.sock" + #define DEFAULT_GLOBAL_APPS_UNIX_SOCK \ DEFAULT_LTTNG_RUNDIR "/" LTTNG_UST_SOCK_FILENAME #define DEFAULT_HOME_APPS_UNIX_SOCK \ -- 2.34.1