* Fix modulo operation bug on
#define HEALTH_IS_IN_CODE(x) (x % HEALTH_POLL_VALUE) which is causing
the check to think it is never within code. (x % 1 always equals 0).
Simplify this by using a simple & on the poll value, and remove the
IS_IN_CODE, using ! on IS_IN_POLL instead (which removes nothing to
clarity).
* Atomic operations should apply to at most "unsigned long" (32-bit on
32-bit arch) rather than uint64_t.
* Separate the "error" condition from the counters. We clearly cannot
use the "0" value as an error on 32-bit counters anymore, because they
can easily wrap.
* Introduce "exit" condition, will be useful for state tracking in the
future. Error and exit conditions implemented as flags.
* Add "APP_MANAGE" in addition to "APP_REG" health check, to monitor the
app registration thread (which was missing, only the app manager
thread was checked, under the name "APP_REG", which was misleading).
* Remove bogus usage of uatomic_xchg() in health_check_state():
It is not needed to update the "last" value, since the last value is
read and written to by a single thread. Moreover, this specific use of
xchg was not exchanging anything: it was just setting the last value
to the "current" one, and doing nothing with the return value.
Whatever was expected to be achieved by using uatomic_xchg() clearly
wasn't.
* Because the health check thread could still be answering a request
concurrently sessiond teardown, we need to ensure that all threads
only set the "error" condition if they reach teardown paths due to an
actual error, not on "normal" teardown condition (thread quit pipe
being closed). Flagging threads as being in error condition upon all
exit paths would lead to false "errors" sent to the client, which we
want to avoid, since the client could then think it needs to kill a
sessiond when the sessiond might be in the process of gracefully
restarting.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
/* Health component for the health check function. */
enum lttng_health_component {
LTTNG_HEALTH_CMD,
/* Health component for the health check function. */
enum lttng_health_component {
LTTNG_HEALTH_CMD,
+ LTTNG_HEALTH_APP_MANAGE,
LTTNG_HEALTH_APP_REG,
LTTNG_HEALTH_KERNEL,
LTTNG_HEALTH_CONSUMER,
LTTNG_HEALTH_APP_REG,
LTTNG_HEALTH_KERNEL,
LTTNG_HEALTH_CONSUMER,
*/
int health_check_state(struct health_state *state)
{
*/
int health_check_state(struct health_state *state)
{
- int ret;
- uint64_t current;
- uint64_t last;
+ unsigned long current, last;
+ int ret = 1;
current = uatomic_read(&state->current);
current = uatomic_read(&state->current);
- last = uatomic_read(&state->last);
- * Here are the conditions for a bad health. Current state set to 0 or the
- * current state is the same as the last one and we are NOT waiting for a
- * poll() call.
+ * Here are the conditions for a bad health. Either flag HEALTH_ERROR is
+ * set, or the progress counter is the same as the last one and we are NOT
+ * waiting for a poll() call.
- if (current == 0 || (current == last && HEALTH_IS_IN_CODE(current))) {
+ if ((uatomic_read(&state->flags) & HEALTH_ERROR) ||
+ (current == last && !HEALTH_IS_IN_POLL(current))) {
+ /* error */
- /* All good */
- ret = 1;
-
-error:
DBG("Health state current %" PRIu64 ", last %" PRIu64 ", ret %d",
current, last, ret);
DBG("Health state current %" PRIu64 ", last %" PRIu64 ", ret %d",
current, last, ret);
- /* Exchange current state counter into last one */
- uatomic_xchg(&state->last, state->current);
+ /*
+ * Update last counter. This value is and MUST be access only in this
+ * function.
+ */
+ state->last = current;
+
* These are the value added to the current state depending of the position in
* the thread where is either waiting on a poll() or running in the code.
*/
* These are the value added to the current state depending of the position in
* the thread where is either waiting on a poll() or running in the code.
*/
-#define HEALTH_POLL_VALUE 1
-#define HEALTH_CODE_VALUE 2
+#define HEALTH_POLL_VALUE (1UL << 0)
+#define HEALTH_CODE_VALUE (1UL << 1)
-#define HEALTH_IS_IN_POLL(x) (x % HEALTH_CODE_VALUE)
-#define HEALTH_IS_IN_CODE(x) (x % HEALTH_POLL_VALUE)
+#define HEALTH_IS_IN_POLL(x) ((x) & HEALTH_POLL_VALUE)
+
+enum health_flags {
+ HEALTH_EXIT = (1U << 0),
+ HEALTH_ERROR = (1U << 1),
+};
- uint64_t last;
- uint64_t current;
+ /*
+ * last counter is only read and updated by the health_check
+ * thread (single updater).
+ */
+ unsigned long last;
+ /*
+ * current and flags are updated by multiple threads concurrently.
+ */
+ unsigned long current; /* progress counter, updated atomically */
+ enum health_flags flags; /* other flags, updated atomically */
};
/* Health state counters for the client command thread */
extern struct health_state health_thread_cmd;
};
/* Health state counters for the client command thread */
extern struct health_state health_thread_cmd;
+/* Health state counters for the application management thread */
+extern struct health_state health_thread_app_manage;
+
/* Health state counters for the application registration thread */
extern struct health_state health_thread_app_reg;
/* Health state counters for the application registration thread */
extern struct health_state health_thread_app_reg;
extern struct health_state health_thread_kernel;
/*
extern struct health_state health_thread_kernel;
/*
- * Update current counter by 1 to indicate that the thread is in a blocking
- * state cause by a poll().
+ * Update current counter by 1 to indicate that the thread entered or
+ * left a blocking state caused by a poll().
*/
static inline void health_poll_update(struct health_state *state)
{
assert(state);
*/
static inline void health_poll_update(struct health_state *state)
{
assert(state);
uatomic_add(&state->current, HEALTH_POLL_VALUE);
}
/*
uatomic_add(&state->current, HEALTH_POLL_VALUE);
}
/*
- * Update current counter by 2 which indicates that we are currently running in
- * a thread and NOT blocked at a poll().
+ * Update current counter by 2 indicates progress in execution of a
+ * thread.
*/
static inline void health_code_update(struct health_state *state)
{
assert(state);
*/
static inline void health_code_update(struct health_state *state)
{
assert(state);
uatomic_add(&state->current, HEALTH_CODE_VALUE);
}
/*
uatomic_add(&state->current, HEALTH_CODE_VALUE);
}
/*
- * Reset health state. A value of zero indicate a bad health state.
+ * Set health "exit" flag.
-static inline void health_reset(struct health_state *state)
+static inline void health_exit(struct health_state *state)
+ uatomic_or(&state->flags, HEALTH_EXIT);
+}
- uatomic_set(&state->current, 0);
- uatomic_set(&state->last, 0);
+/*
+ * Set health "error" flag.
+ */
+static inline void health_error(struct health_state *state)
+{
+ assert(state);
+ uatomic_or(&state->flags, HEALTH_ERROR);
static inline void health_init(struct health_state *state)
{
assert(state);
static inline void health_init(struct health_state *state)
{
assert(state);
uatomic_set(&state->last, 0);
uatomic_set(&state->last, 0);
- uatomic_set(&state->current, HEALTH_CODE_VALUE);
+ uatomic_set(&state->current, 0);
+ uatomic_set(&state->flags, 0);
}
int health_check_state(struct health_state *state);
}
int health_check_state(struct health_state *state);
/* Used for the health monitoring of the session daemon. See health.h */
struct health_state health_thread_cmd;
/* Used for the health monitoring of the session daemon. See health.h */
struct health_state health_thread_cmd;
+struct health_state health_thread_app_manage;
struct health_state health_thread_app_reg;
struct health_state health_thread_kernel;
struct health_state health_thread_app_reg;
struct health_state health_thread_kernel;
*/
static void *thread_manage_kernel(void *data)
{
*/
static void *thread_manage_kernel(void *data)
{
- int ret, i, pollfd, update_poll_flag = 1;
+ int ret, i, pollfd, update_poll_flag = 1, err = -1;
uint32_t revents, nb_fd;
char tmp;
struct lttng_poll_event events;
uint32_t revents, nb_fd;
char tmp;
struct lttng_poll_event events;
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
/* Check for data on kernel pipe */
}
/* Check for data on kernel pipe */
error:
lttng_poll_clean(&events);
error_poll_create:
error:
lttng_poll_clean(&events);
error_poll_create:
- health_reset(&health_thread_kernel);
+ if (err) {
+ health_error(&health_thread_kernel);
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_exit(&health_thread_kernel);
DBG("Kernel thread dying");
return NULL;
}
DBG("Kernel thread dying");
return NULL;
}
*/
static void *thread_manage_consumer(void *data)
{
*/
static void *thread_manage_consumer(void *data)
{
- int sock = -1, i, ret, pollfd;
+ int sock = -1, i, ret, pollfd, err = -1;
uint32_t revents, nb_fd;
enum lttcomm_return_code code;
struct lttng_poll_event events;
uint32_t revents, nb_fd;
enum lttcomm_return_code code;
struct lttng_poll_event events;
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
/* Event on the registration socket */
}
/* Event on the registration socket */
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
/* Event on the kconsumerd socket */
}
/* Event on the kconsumerd socket */
ERR("consumer return code : %s", lttcomm_get_readable_code(-code));
ERR("consumer return code : %s", lttcomm_get_readable_code(-code));
error:
/* Immediately set the consumerd state to stopped */
if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
error:
/* Immediately set the consumerd state to stopped */
if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
lttng_poll_clean(&events);
error_poll:
error_listen:
lttng_poll_clean(&events);
error_poll:
error_listen:
- health_reset(&consumer_data->health);
+ if (err) {
+ health_error(&consumer_data->health);
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_exit(&consumer_data->health);
DBG("consumer thread cleanup completed");
return NULL;
DBG("consumer thread cleanup completed");
return NULL;
*/
static void *thread_manage_apps(void *data)
{
*/
static void *thread_manage_apps(void *data)
{
+ int i, ret, pollfd, err = -1;
uint32_t revents, nb_fd;
struct ust_command ust_cmd;
struct lttng_poll_event events;
uint32_t revents, nb_fd;
struct ust_command ust_cmd;
struct lttng_poll_event events;
rcu_register_thread();
rcu_thread_online();
rcu_register_thread();
rcu_thread_online();
- health_code_update(&health_thread_app_reg);
+ health_code_update(&health_thread_app_manage);
ret = create_thread_poll_set(&events, 2);
if (ret < 0) {
ret = create_thread_poll_set(&events, 2);
if (ret < 0) {
- health_code_update(&health_thread_app_reg);
+ health_code_update(&health_thread_app_manage);
while (1) {
/* Zeroed the events structure */
while (1) {
/* Zeroed the events structure */
/* Inifinite blocking call, waiting for transmission */
restart:
/* Inifinite blocking call, waiting for transmission */
restart:
- health_poll_update(&health_thread_app_reg);
+ health_poll_update(&health_thread_app_manage);
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
- health_poll_update(&health_thread_app_reg);
+ health_poll_update(&health_thread_app_manage);
if (ret < 0) {
/*
* Restart interrupted system call.
if (ret < 0) {
/*
* Restart interrupted system call.
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- health_code_update(&health_thread_app_reg);
+ health_code_update(&health_thread_app_manage);
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
/* Inspect the apps cmd pipe */
}
/* Inspect the apps cmd pipe */
- health_code_update(&health_thread_app_reg);
+ health_code_update(&health_thread_app_manage);
/* Register applicaton to the session daemon */
ret = ust_app_register(&ust_cmd.reg_msg,
/* Register applicaton to the session daemon */
ret = ust_app_register(&ust_cmd.reg_msg,
- health_code_update(&health_thread_app_reg);
+ health_code_update(&health_thread_app_manage);
/*
* Validate UST version compatibility.
/*
* Validate UST version compatibility.
update_ust_app(ust_cmd.sock);
}
update_ust_app(ust_cmd.sock);
}
- health_code_update(&health_thread_app_reg);
+ health_code_update(&health_thread_app_manage);
ret = ust_app_register_done(ust_cmd.sock);
if (ret < 0) {
ret = ust_app_register_done(ust_cmd.sock);
if (ret < 0) {
- health_code_update(&health_thread_app_reg);
+ health_code_update(&health_thread_app_manage);
- health_code_update(&health_thread_app_reg);
+ health_code_update(&health_thread_app_manage);
error:
lttng_poll_clean(&events);
error_poll_create:
error:
lttng_poll_clean(&events);
error_poll_create:
- health_reset(&health_thread_app_reg);
+ if (err) {
+ health_error(&health_thread_app_manage);
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_exit(&health_thread_app_manage);
DBG("Application communication apps thread cleanup complete");
rcu_thread_offline();
rcu_unregister_thread();
DBG("Application communication apps thread cleanup complete");
rcu_thread_offline();
rcu_unregister_thread();
*/
static void *thread_registration_apps(void *data)
{
*/
static void *thread_registration_apps(void *data)
{
- int sock = -1, i, ret, pollfd;
+ int sock = -1, i, ret, pollfd, err = -1;
uint32_t revents, nb_fd;
struct lttng_poll_event events;
/*
uint32_t revents, nb_fd;
struct lttng_poll_event events;
/*
/* Inifinite blocking call, waiting for transmission */
restart:
/* Inifinite blocking call, waiting for transmission */
restart:
+ health_poll_update(&health_thread_app_reg);
ret = lttng_poll_wait(&events, -1);
ret = lttng_poll_wait(&events, -1);
+ health_poll_update(&health_thread_app_reg);
if (ret < 0) {
/*
* Restart interrupted system call.
if (ret < 0) {
/*
* Restart interrupted system call.
}
for (i = 0; i < nb_fd; i++) {
}
for (i = 0; i < nb_fd; i++) {
+ health_code_update(&health_thread_app_reg);
+
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Fetch once the poll data */
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
/* Event on the registration socket */
}
/* Event on the registration socket */
+ health_code_update(&health_thread_app_reg);
ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg,
sizeof(struct ust_register_msg));
if (ret < 0 || ret < sizeof(struct ust_register_msg)) {
ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg,
sizeof(struct ust_register_msg));
if (ret < 0 || ret < sizeof(struct ust_register_msg)) {
+ health_code_update(&health_thread_app_reg);
ust_cmd->sock = sock;
sock = -1;
ust_cmd->sock = sock;
sock = -1;
+ if (err) {
+ health_error(&health_thread_app_reg);
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_exit(&health_thread_app_reg);
+
/* Notify that the registration thread is gone */
notify_ust_apps(0);
/* Notify that the registration thread is gone */
notify_ust_apps(0);
- * Compute health status of each consumer.
+ * Compute health status of each consumer. If one of them is zero (bad
+ * state), we return 0.
*/
static int check_consumer_health(void)
{
int ret;
*/
static int check_consumer_health(void)
{
int ret;
- ret =
- health_check_state(&kconsumer_data.health) &
- health_check_state(&ustconsumer32_data.health) &
+ ret = health_check_state(&kconsumer_data.health) &&
+ health_check_state(&ustconsumer32_data.health) &&
health_check_state(&ustconsumer64_data.health);
DBG3("Health consumer check %d", ret);
health_check_state(&ustconsumer64_data.health);
DBG3("Health consumer check %d", ret);
*/
static void *thread_manage_health(void *data)
{
*/
static void *thread_manage_health(void *data)
{
- int sock = -1, new_sock, ret, i, pollfd;
+ int sock = -1, new_sock, ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
struct lttng_poll_event events;
struct lttcomm_health_msg msg;
uint32_t revents, nb_fd;
struct lttng_poll_event events;
struct lttcomm_health_msg msg;
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
/* Event on the registration socket */
}
/* Event on the registration socket */
case LTTNG_HEALTH_CMD:
reply.ret_code = health_check_state(&health_thread_cmd);
break;
case LTTNG_HEALTH_CMD:
reply.ret_code = health_check_state(&health_thread_cmd);
break;
+ case LTTNG_HEALTH_APP_MANAGE:
+ reply.ret_code = health_check_state(&health_thread_app_manage);
+ break;
case LTTNG_HEALTH_APP_REG:
reply.ret_code = health_check_state(&health_thread_app_reg);
break;
case LTTNG_HEALTH_APP_REG:
reply.ret_code = health_check_state(&health_thread_app_reg);
break;
reply.ret_code = check_consumer_health();
break;
case LTTNG_HEALTH_ALL:
reply.ret_code = check_consumer_health();
break;
case LTTNG_HEALTH_ALL:
- ret = check_consumer_health();
-
- health_check_state(&health_thread_app_reg) &
- health_check_state(&health_thread_cmd) &
- health_check_state(&health_thread_kernel) &
- ret;
+ health_check_state(&health_thread_app_manage) &&
+ health_check_state(&health_thread_app_reg) &&
+ health_check_state(&health_thread_cmd) &&
+ health_check_state(&health_thread_kernel) &&
+ check_consumer_health();
break;
default:
reply.ret_code = LTTCOMM_UND;
break;
default:
reply.ret_code = LTTCOMM_UND;
+ if (err) {
+ ERR("Health error occurred in %s", __func__);
+ }
DBG("Health check thread dying");
unlink(health_unix_sock_path);
if (sock >= 0) {
DBG("Health check thread dying");
unlink(health_unix_sock_path);
if (sock >= 0) {
*/
static void *thread_manage_clients(void *data)
{
*/
static void *thread_manage_clients(void *data)
{
- int sock = -1, ret, i, pollfd;
+ int sock = -1, ret, i, pollfd, err = -1;
int sock_error;
uint32_t revents, nb_fd;
struct command_ctx *cmd_ctx = NULL;
int sock_error;
uint32_t revents, nb_fd;
struct command_ctx *cmd_ctx = NULL;
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
/* Event on the registration socket */
}
/* Event on the registration socket */
health_code_update(&health_thread_cmd);
}
health_code_update(&health_thread_cmd);
}
- health_reset(&health_thread_cmd);
+ if (err) {
+ health_error(&health_thread_cmd);
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_exit(&health_thread_cmd);
DBG("Client thread dying");
unlink(client_unix_sock_path);
DBG("Client thread dying");
unlink(client_unix_sock_path);
/* Init all health thread counters. */
health_init(&health_thread_cmd);
health_init(&health_thread_kernel);
/* Init all health thread counters. */
health_init(&health_thread_cmd);
health_init(&health_thread_kernel);
+ health_init(&health_thread_app_manage);
health_init(&health_thread_app_reg);
/*
health_init(&health_thread_app_reg);
/*