consumerd: register threads to health monitoring
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 16 Sep 2013 22:44:22 +0000 (17:44 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 9 Oct 2013 13:16:58 +0000 (09:16 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/bin/lttng-consumerd/Makefile.am
src/bin/lttng-consumerd/health-consumerd.h [new file with mode: 0644]
src/bin/lttng-consumerd/lttng-consumerd.c
src/common/consumer-timer.c
src/common/consumer.c

index a7971ade0336b7eafeaaee93498bb10377fefa88..9f029a15aeee8bc511edc062a894cae067c80a1c 100644 (file)
@@ -2,13 +2,16 @@ AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/src
 
 lttnglibexec_PROGRAMS = lttng-consumerd
 
-lttng_consumerd_SOURCES = lttng-consumerd.c lttng-consumerd.h
+lttng_consumerd_SOURCES = lttng-consumerd.c \
+       lttng-consumerd.h \
+       health-consumerd.h
 
 lttng_consumerd_LDADD = \
           $(top_builddir)/src/common/libconsumer.la \
           $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \
           $(top_builddir)/src/common/libcommon.la \
           $(top_builddir)/src/common/index/libindex.la \
+          $(top_builddir)/src/common/health/libhealth.la \
           -lrt
 
 if HAVE_LIBLTTNG_UST_CTL
diff --git a/src/bin/lttng-consumerd/health-consumerd.h b/src/bin/lttng-consumerd/health-consumerd.h
new file mode 100644 (file)
index 0000000..f5e2a34
--- /dev/null
@@ -0,0 +1,37 @@
+#ifndef HEALTH_CONSUMERD_H
+#define HEALTH_CONSUMERD_H
+
+/*
+ * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2013 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <lttng/health-internal.h>
+
+enum health_type {
+       HEALTH_CONSUMERD_TYPE_CHANNEL           = 0,
+       HEALTH_CONSUMERD_TYPE_METADATA          = 1,
+       HEALTH_CONSUMERD_TYPE_DATA              = 2,
+       HEALTH_CONSUMERD_TYPE_SESSIOND          = 3,
+       HEALTH_CONSUMERD_TYPE_METADATA_TIMER    = 4,
+
+       NR_HEALTH_CONSUMERD_TYPES,
+};
+
+/* Consumerd health monitoring */
+struct health_app *health_consumerd;
+
+#endif /* HEALTH_CONSUMERD_H */
index 59397594b914c2c99533ff408d77d9c824c08034..e33a470f59268e6c3f33f09c72e506930ae859fd 100644 (file)
@@ -49,6 +49,7 @@
 #include <common/sessiond-comm/sessiond-comm.h>
 
 #include "lttng-consumerd.h"
+#include "health-consumerd.h"
 
 /* TODO : support UST (all direct kernel-ctl accesses). */
 
@@ -72,6 +73,9 @@ static enum lttng_consumer_type opt_type = LTTNG_CONSUMER_KERNEL;
 /* the liblttngconsumerd context */
 static struct lttng_consumer_local_data *ctx;
 
+/* Consumerd health monitoring */
+struct health_app *health_consumerd;
+
 /*
  * Signal handler for the daemon
  */
@@ -325,6 +329,11 @@ int main(int argc, char **argv)
                set_ulimit();
        }
 
+       health_consumerd = health_app_create(NR_HEALTH_CONSUMERD_TYPES);
+       if (!health_consumerd) {
+               goto error;
+       }
+
        /* create the consumer instance with and assign the callbacks */
        ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer,
                NULL, lttng_consumer_on_recv_stream, NULL);
@@ -469,6 +478,9 @@ error:
 end:
        lttng_consumer_destroy(ctx);
        lttng_consumer_cleanup();
+       if (health_consumerd) {
+               health_app_destroy(health_consumerd);
+       }
 
        return ret;
 }
index 68b5638ddc404e009ba7c5d835b7b2fec782ecc9..b02ccbb115a9cdd6337f5214bcd49b8d268aed79 100644 (file)
@@ -28,6 +28,7 @@
 
 #include "consumer-timer.h"
 #include "ust-consumer/ust-consumer.h"
+#include "../bin/lttng-consumerd/health-consumerd.h"
 
 static struct timer_signal_data timer_signal = {
        .tid = 0,
@@ -469,6 +470,8 @@ void *consumer_timer_thread(void *data)
        siginfo_t info;
        struct lttng_consumer_local_data *ctx = data;
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
+
        /* Only self thread will receive signal mask. */
        setmask(&mask);
        CMM_STORE_SHARED(timer_signal.tid, pthread_self());
@@ -494,5 +497,9 @@ void *consumer_timer_thread(void *data)
                }
        }
 
+       /* Currently never reached */
+       health_unregister(health_consumerd);
+
+       /* Never return */
        return NULL;
 }
index 892c841ba0f8958ae46cff1f4d577f0426df1415..6abd8b1e86de34c71d330ac68726846dd971da3b 100644 (file)
@@ -44,6 +44,7 @@
 
 #include "consumer.h"
 #include "consumer-stream.h"
+#include "../bin/lttng-consumerd/health-consumerd.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -2182,7 +2183,7 @@ static void validate_endpoint_status_metadata_stream(
  */
 void *consumer_thread_metadata_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
@@ -2193,6 +2194,8 @@ void *consumer_thread_metadata_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+
        metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!metadata_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2220,6 +2223,7 @@ void *consumer_thread_metadata_poll(void *data)
        while (1) {
                /* Only the metadata pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
@@ -2352,6 +2356,8 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 error:
 end:
        DBG("Metadata poll thread exiting");
@@ -2360,6 +2366,11 @@ end:
 end_poll:
        destroy_stream_ht(metadata_ht);
 end_ht:
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2370,7 +2381,7 @@ end_ht:
  */
 void *consumer_thread_data_poll(void *data)
 {
-       int num_rdy, num_hup, high_prio, ret, i;
+       int num_rdy, num_hup, high_prio, ret, i, err = -1;
        struct pollfd *pollfd = NULL;
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
@@ -2381,6 +2392,8 @@ void *consumer_thread_data_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+
        data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (data_ht == NULL) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2440,6 +2453,7 @@ void *consumer_thread_data_poll(void *data)
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
                if (nb_fd == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
                /* poll on the array of fds */
@@ -2588,6 +2602,8 @@ void *consumer_thread_data_poll(void *data)
                        }
                }
        }
+       /* All is OK */
+       err = 0;
 end:
        DBG("polling thread exiting");
        free(pollfd);
@@ -2605,6 +2621,12 @@ end:
 
        destroy_data_stream_ht(data_ht);
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2686,7 +2708,7 @@ static void destroy_channel_ht(struct lttng_ht *ht)
  */
 void *consumer_thread_channel_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_channel *chan = NULL;
        struct lttng_ht_iter iter;
@@ -2697,6 +2719,8 @@ void *consumer_thread_channel_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!channel_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2723,6 +2747,7 @@ void *consumer_thread_channel_poll(void *data)
        while (1) {
                /* Only the channel pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
@@ -2880,12 +2905,19 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 end:
        lttng_poll_clean(&events);
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
        DBG("Channel poll thread exiting");
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2923,7 +2955,7 @@ error:
  */
 void *consumer_thread_sessiond_poll(void *data)
 {
-       int sock = -1, client_socket, ret;
+       int sock = -1, client_socket, ret, err = -1;
        /*
         * structure to poll for incoming data on communication socket avoids
         * making blocking sockets.
@@ -2933,6 +2965,8 @@ void *consumer_thread_sessiond_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
        unlink(ctx->consumer_command_sock_path);
        client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
@@ -3013,10 +3047,14 @@ void *consumer_thread_sessiond_poll(void *data)
                }
                if (consumer_quit) {
                        DBG("consumer_thread_receive_fds received quit from signal");
+                       err = 0;        /* All is OK */
                        goto end;
                }
                DBG("received command on sock");
        }
+       /* All is OK */
+       err = 0;
+
 end:
        DBG("Consumer thread sessiond poll exiting");
 
@@ -3056,6 +3094,12 @@ end:
                }
        }
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
This page took 0.030314 seconds and 4 git commands to generate.