Fix: race with the viewer and readiness of streams
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index d02e8502d99914c0acf5652d9835bd71f20b540e..c95355e9a472e35970ab7b734b18611d14bc356c 100644 (file)
@@ -29,6 +29,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/sessiond-comm.h>
@@ -139,6 +140,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        }
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+               health_code_update();
+
                /*
                 * Lock stream because we are about to change its state.
                 */
@@ -172,6 +176,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
                                        path, stream->name, stream->key);
                }
+               ret = consumer_send_relayd_streams_sent(relayd_id);
+               if (ret < 0) {
+                       ERR("sending streams sent to relayd");
+                       goto end_unlock;
+               }
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
@@ -221,6 +230,8 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ssize_t read_len;
                        unsigned long len, padded_len;
 
+                       health_code_update();
+
                        DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
@@ -362,6 +373,8 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        }
 
        do {
+               health_code_update();
+
                ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
                if (ret_read < 0) {
                        if (ret_read != -EAGAIN) {
@@ -410,9 +423,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttcomm_consumer_msg msg;
 
+       health_code_update();
+
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
                if (ret > 0) {
@@ -421,6 +436,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                return ret;
        }
+
+       health_code_update();
+
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
                /*
                 * Notify the session daemon that the command is completed.
@@ -433,6 +451,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return -ENOENT;
        }
 
+       health_code_update();
+
        /* relayd needs RCU read-side protection */
        rcu_read_lock();
 
@@ -451,12 +471,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *new_channel;
                int ret_recv;
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
+
+               health_code_update();
+
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
@@ -494,6 +519,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                };
 
+               health_code_update();
+
                if (ctx->on_recv_channel != NULL) {
                        ret_recv = ctx->on_recv_channel(new_channel);
                        if (ret_recv == 0) {
@@ -509,6 +536,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        msg.u.channel.live_timer_interval);
                }
 
+               health_code_update();
+
                /* If we received an error in add_channel, we need to report it. */
                if (ret < 0) {
                        ret = consumer_send_status_msg(sock, ret);
@@ -542,23 +571,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
-               if (ret_code != LTTNG_OK) {
+
+               health_code_update();
+
+               if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
                        /* Channel was not found. */
                        goto end_nosignal;
                }
 
                /* Blocking call */
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        rcu_read_unlock();
                        return -EINTR;
                }
 
+               health_code_update();
+
                /* Get stream file descriptor from socket */
                ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
                if (ret != sizeof(fd)) {
@@ -567,6 +606,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               health_code_update();
+
                /*
                 * Send status code to session daemon only if the recv works. If the
                 * above recv() failed, the session daemon is notified through the
@@ -578,6 +619,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                new_stream = consumer_allocate_stream(channel->key,
                                fd,
                                LTTNG_CONSUMER_ACTIVE_STREAM,
@@ -635,6 +678,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                new_stream->hangup_flush_done = 0;
 
+               health_code_update();
+
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
@@ -643,6 +688,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               health_code_update();
+
                if (new_stream->metadata_flag) {
                        channel->metadata_stream = new_stream;
                }
@@ -690,6 +737,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Vitible to other threads */
                new_stream->globally_visible = 1;
 
+               health_code_update();
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
@@ -707,6 +756,57 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                new_stream->name, fd, new_stream->relayd_stream_id);
                break;
        }
+       case LTTNG_CONSUMER_STREAMS_SENT:
+       {
+               struct lttng_consumer_channel *channel;
+
+               /*
+                * Get stream's channel reference. Needed when adding the stream to the
+                * global hash table.
+                */
+               channel = consumer_find_channel(msg.u.sent_streams.channel_key);
+               if (!channel) {
+                       /*
+                        * We could not find the channel. Can happen if cpu hotplug
+                        * happens while tearing down.
+                        */
+                       ERR("Unable to find channel key %" PRIu64,
+                                       msg.u.sent_streams.channel_key);
+                       ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+
+               /*
+                * Send status code to session daemon.
+                */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               health_code_update();
+
+               /*
+                * We should not send this message if we don't monitor the
+                * streams in this channel.
+                */
+               if (!channel->monitor) {
+                       break;
+               }
+
+               health_code_update();
+               /* Send stream to relayd if the stream has an ID. */
+               if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
+                       ret = consumer_send_relayd_streams_sent(
+                                       msg.u.sent_streams.net_seq_idx);
+                       if (ret < 0) {
+                               goto end_nosignal;
+                       }
+               }
+               break;
+       }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
                rcu_read_unlock();
@@ -740,6 +840,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_flag_relayd_for_destroy(relayd);
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -757,6 +859,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                ret = consumer_data_pending(id);
 
+               health_code_update();
+
                /* Send back returned value to session daemon */
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
@@ -792,6 +896,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -810,12 +916,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                /*
                 * This command should ONLY be issued for channel with streams set in
                 * no monitor mode.
@@ -843,6 +953,7 @@ end_nosignal:
         * Return 1 to indicate success since the 0 value can be a socket
         * shutdown during the recv() or send() call.
         */
+       health_code_update();
        return 1;
 
 error_fatal:
@@ -856,7 +967,7 @@ error_fatal:
  *
  * Return 0 on success or else a negative value.
  */
-static int get_index_values(struct lttng_packet_index *index, int infd)
+static int get_index_values(struct ctf_packet_index *index, int infd)
 {
        int ret;
 
@@ -952,7 +1063,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
-       struct lttng_packet_index index;
+       struct ctf_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
This page took 0.028011 seconds and 4 git commands to generate.