Implement consumer health check thread
[lttng-tools.git] / src / common / consumer.c
index 2c2b79cf0f501fc864211e1e63b7ec6abc242dbb..6f3a02d2619eeef095684fc927399fca7a9404dd 100644 (file)
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
+#include <common/consumer-timer.h>
 
 #include "consumer.h"
 #include "consumer-stream.h"
+#include "../bin/lttng-consumerd/health-consumerd.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -92,6 +94,18 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
        (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
 }
 
+static void notify_health_quit_pipe(int *pipe)
+{
+       int ret;
+
+       do {
+               ret = write(pipe[1], "4", 1);
+       } while (ret < 0 && errno == EINTR);
+       if (ret < 0 || ret != 1) {
+               PERROR("write consumer health quit");
+       }
+}
+
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *chan,
                uint64_t key,
@@ -305,6 +319,10 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                consumer_stream_destroy(stream, NULL);
        }
 
+       if (channel->live_timer_enabled == 1) {
+               consumer_timer_live_stop(channel);
+       }
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
@@ -515,6 +533,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                stream->metadata_flag = 1;
                /* Metadata is flat out. */
                strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+               /* Live rendez-vous point. */
+               pthread_cond_init(&stream->metadata_rdv, NULL);
+               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
        } else {
                /* Format stream name to <channel_name>_<cpu_number> */
                ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
@@ -729,6 +750,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                if (ret < 0) {
                        goto end;
                }
+
                uatomic_inc(&relayd->refcount);
                stream->sent_to_relayd = 1;
        } else {
@@ -839,7 +861,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t tracefile_size,
                uint64_t tracefile_count,
                uint64_t session_id_per_pid,
-               unsigned int monitor)
+               unsigned int monitor,
+               unsigned int live_timer_interval)
 {
        struct lttng_consumer_channel *channel;
 
@@ -860,6 +883,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
+       channel->live_timer_interval = live_timer_interval;
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
 
@@ -2171,7 +2195,7 @@ static void validate_endpoint_status_metadata_stream(
  */
 void *consumer_thread_metadata_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
@@ -2182,6 +2206,10 @@ void *consumer_thread_metadata_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+
+       health_code_update();
+
        metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!metadata_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2207,14 +2235,19 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
+               health_code_update();
+
                /* Only the metadata pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
 restart:
                DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                DBG("Metadata event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
@@ -2228,6 +2261,8 @@ restart:
 
                /* From here, the event is a metadata wait fd */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
@@ -2297,6 +2332,8 @@ restart:
 
                                        /* We just flushed the stream now read it. */
                                        do {
+                                               health_code_update();
+
                                                len = ctx->on_buffer_ready(stream, ctx);
                                                /*
                                                 * We don't check the return value here since if we get
@@ -2319,6 +2356,8 @@ restart:
                                assert(stream->wait_fd == pollfd);
 
                                do {
+                                       health_code_update();
+
                                        len = ctx->on_buffer_ready(stream, ctx);
                                        /*
                                         * We don't check the return value here since if we get
@@ -2341,6 +2380,8 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 error:
 end:
        DBG("Metadata poll thread exiting");
@@ -2349,6 +2390,11 @@ end:
 end_poll:
        destroy_stream_ht(metadata_ht);
 end_ht:
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2359,7 +2405,7 @@ end_ht:
  */
 void *consumer_thread_data_poll(void *data)
 {
-       int num_rdy, num_hup, high_prio, ret, i;
+       int num_rdy, num_hup, high_prio, ret, i, err = -1;
        struct pollfd *pollfd = NULL;
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
@@ -2370,6 +2416,10 @@ void *consumer_thread_data_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+
+       health_code_update();
+
        data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (data_ht == NULL) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2383,6 +2433,8 @@ void *consumer_thread_data_poll(void *data)
        }
 
        while (1) {
+               health_code_update();
+
                high_prio = 0;
                num_hup = 0;
 
@@ -2429,12 +2481,15 @@ void *consumer_thread_data_poll(void *data)
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
                if (nb_fd == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
+               health_poll_entry();
                num_rdy = poll(pollfd, nb_fd + 1, -1);
+               health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
                        /*
@@ -2484,6 +2539,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2512,6 +2569,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of low priority channels. */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2532,6 +2591,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Handle hangup and errors */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2577,6 +2638,8 @@ void *consumer_thread_data_poll(void *data)
                        }
                }
        }
+       /* All is OK */
+       err = 0;
 end:
        DBG("polling thread exiting");
        free(pollfd);
@@ -2594,6 +2657,12 @@ end:
 
        destroy_data_stream_ht(data_ht);
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2675,7 +2744,7 @@ static void destroy_channel_ht(struct lttng_ht *ht)
  */
 void *consumer_thread_channel_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_channel *chan = NULL;
        struct lttng_ht_iter iter;
@@ -2686,6 +2755,10 @@ void *consumer_thread_channel_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+
+       health_code_update();
+
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!channel_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2710,14 +2783,19 @@ void *consumer_thread_channel_poll(void *data)
        DBG("Channel main loop started");
 
        while (1) {
+               health_code_update();
+
                /* Only the channel pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
 restart:
                DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                DBG("Channel event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
@@ -2731,6 +2809,8 @@ restart:
 
                /* From here, the event is a channel wait fd */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
@@ -2797,6 +2877,8 @@ restart:
                                                        /* Delete streams that might have been left in the stream list. */
                                                        cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
                                                                        send_node) {
+                                                               health_code_update();
+
                                                                cds_list_del(&stream->send_node);
                                                                lttng_ustconsumer_del_stream(stream);
                                                                uatomic_sub(&stream->chan->refcount, 1);
@@ -2869,12 +2951,19 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 end:
        lttng_poll_clean(&events);
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
        DBG("Channel poll thread exiting");
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2912,7 +3001,7 @@ error:
  */
 void *consumer_thread_sessiond_poll(void *data)
 {
-       int sock = -1, client_socket, ret;
+       int sock = -1, client_socket, ret, err = -1;
        /*
         * structure to poll for incoming data on communication socket avoids
         * making blocking sockets.
@@ -2922,6 +3011,10 @@ void *consumer_thread_sessiond_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+
+       health_code_update();
+
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
        unlink(ctx->consumer_command_sock_path);
        client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
@@ -2982,7 +3075,12 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
        while (1) {
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               health_code_update();
+
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        goto end;
                }
                DBG("Incoming command on sock");
@@ -2997,14 +3095,19 @@ void *consumer_thread_sessiond_poll(void *data)
                         * ERR() here.
                         */
                        DBG("Communication interrupted on command socket");
+                       err = 0;
                        goto end;
                }
                if (consumer_quit) {
                        DBG("consumer_thread_receive_fds received quit from signal");
+                       err = 0;        /* All is OK */
                        goto end;
                }
                DBG("received command on sock");
        }
+       /* All is OK */
+       err = 0;
+
 end:
        DBG("Consumer thread sessiond poll exiting");
 
@@ -3030,6 +3133,8 @@ end:
 
        notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
+       notify_health_quit_pipe(health_quit_pipe);
+
        /* Cleaning up possibly open sockets. */
        if (sock >= 0) {
                ret = close(sock);
@@ -3044,6 +3149,12 @@ end:
                }
        }
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -3054,6 +3165,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ssize_t ret;
 
        pthread_mutex_lock(&stream->lock);
+       if (stream->metadata_flag) {
+               pthread_mutex_lock(&stream->metadata_rdv_lock);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -3070,6 +3184,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        }
 
+       if (stream->metadata_flag) {
+               pthread_cond_broadcast(&stream->metadata_rdv);
+               pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        return ret;
 }
@@ -3109,7 +3227,8 @@ void lttng_consumer_init(void)
 int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
-               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
+               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
+               uint64_t relayd_session_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttng_error_code ret_code = LTTNG_OK;
@@ -3209,29 +3328,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
 
-               /*
-                * Create a session on the relayd and store the returned id. Lock the
-                * control socket mutex if the relayd was NOT created before.
-                */
-               if (!relayd_created) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               }
-               ret = relayd_create_session(&relayd->control_sock,
-                               &relayd->relayd_session_id);
-               if (!relayd_created) {
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               }
-               if (ret < 0) {
-                       /*
-                        * Close all sockets of a relayd object. It will be freed if it was
-                        * created at the error code path or else it will be garbage
-                        * collect.
-                        */
-                       (void) relayd_close(&relayd->control_sock);
-                       (void) relayd_close(&relayd->data_sock);
-                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
-                       goto error;
-               }
+               relayd->relayd_session_id = relayd_session_id;
 
                break;
        case LTTNG_STREAM_DATA:
This page took 0.030071 seconds and 4 git commands to generate.