consumerd: add health instrumentation into threads
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index d02e8502d99914c0acf5652d9835bd71f20b540e..a506737ab4170880b23c28a959c546e65ed9f81c 100644 (file)
@@ -42,6 +42,7 @@
 #include <common/consumer-timer.h>
 
 #include "kernel-consumer.h"
+#include "../../bin/lttng-consumerd/health-consumerd.h"
 
 extern struct lttng_consumer_global_data consumer_data;
 extern int consumer_poll_timeout;
@@ -139,6 +140,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        }
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+               health_code_update();
+
                /*
                 * Lock stream because we are about to change its state.
                 */
@@ -221,6 +225,8 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ssize_t read_len;
                        unsigned long len, padded_len;
 
+                       health_code_update();
+
                        DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
@@ -362,6 +368,8 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        }
 
        do {
+               health_code_update();
+
                ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
                if (ret_read < 0) {
                        if (ret_read != -EAGAIN) {
@@ -413,6 +421,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        enum lttng_error_code ret_code = LTTNG_OK;
        struct lttcomm_consumer_msg msg;
 
+       health_code_update();
+
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
                if (ret > 0) {
@@ -421,6 +431,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                return ret;
        }
+
+       health_code_update();
+
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
                /*
                 * Notify the session daemon that the command is completed.
@@ -433,6 +446,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return -ENOENT;
        }
 
+       health_code_update();
+
        /* relayd needs RCU read-side protection */
        rcu_read_lock();
 
@@ -451,12 +466,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *new_channel;
                int ret_recv;
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
+
+               health_code_update();
+
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
@@ -494,6 +514,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                };
 
+               health_code_update();
+
                if (ctx->on_recv_channel != NULL) {
                        ret_recv = ctx->on_recv_channel(new_channel);
                        if (ret_recv == 0) {
@@ -509,6 +531,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        msg.u.channel.live_timer_interval);
                }
 
+               health_code_update();
+
                /* If we received an error in add_channel, we need to report it. */
                if (ret < 0) {
                        ret = consumer_send_status_msg(sock, ret);
@@ -542,23 +566,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
+
+               health_code_update();
+
                if (ret_code != LTTNG_OK) {
                        /* Channel was not found. */
                        goto end_nosignal;
                }
 
                /* Blocking call */
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        rcu_read_unlock();
                        return -EINTR;
                }
 
+               health_code_update();
+
                /* Get stream file descriptor from socket */
                ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
                if (ret != sizeof(fd)) {
@@ -567,6 +601,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               health_code_update();
+
                /*
                 * Send status code to session daemon only if the recv works. If the
                 * above recv() failed, the session daemon is notified through the
@@ -578,6 +614,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                new_stream = consumer_allocate_stream(channel->key,
                                fd,
                                LTTNG_CONSUMER_ACTIVE_STREAM,
@@ -635,6 +673,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                new_stream->hangup_flush_done = 0;
 
+               health_code_update();
+
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
@@ -643,6 +683,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               health_code_update();
+
                if (new_stream->metadata_flag) {
                        channel->metadata_stream = new_stream;
                }
@@ -690,6 +732,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Vitible to other threads */
                new_stream->globally_visible = 1;
 
+               health_code_update();
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
@@ -740,6 +784,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_flag_relayd_for_destroy(relayd);
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -757,6 +803,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                ret = consumer_data_pending(id);
 
+               health_code_update();
+
                /* Send back returned value to session daemon */
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
@@ -792,6 +840,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -810,12 +860,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                /*
                 * This command should ONLY be issued for channel with streams set in
                 * no monitor mode.
@@ -843,6 +897,7 @@ end_nosignal:
         * Return 1 to indicate success since the 0 value can be a socket
         * shutdown during the recv() or send() call.
         */
+       health_code_update();
        return 1;
 
 error_fatal:
This page took 0.025385 seconds and 4 git commands to generate.