Fix: race with the viewer and readiness of streams
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index eaccce3d0df294815be8289ec6983deea3c7f042..c95355e9a472e35970ab7b734b18611d14bc356c 100644 (file)
@@ -29,6 +29,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/sessiond-comm.h>
@@ -39,6 +40,7 @@
 #include <common/utils.h>
 #include <common/consumer-stream.h>
 #include <common/index/index.h>
+#include <common/consumer-timer.h>
 
 #include "kernel-consumer.h"
 
@@ -138,6 +140,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        }
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+               health_code_update();
+
                /*
                 * Lock stream because we are about to change its state.
                 */
@@ -171,6 +176,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
                                        path, stream->name, stream->key);
                }
+               ret = consumer_send_relayd_streams_sent(relayd_id);
+               if (ret < 0) {
+                       ERR("sending streams sent to relayd");
+                       goto end_unlock;
+               }
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
@@ -220,6 +230,8 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ssize_t read_len;
                        unsigned long len, padded_len;
 
+                       health_code_update();
+
                        DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
@@ -361,6 +373,8 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        }
 
        do {
+               health_code_update();
+
                ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
                if (ret_read < 0) {
                        if (ret_read != -EAGAIN) {
@@ -409,9 +423,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttcomm_consumer_msg msg;
 
+       health_code_update();
+
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
                if (ret > 0) {
@@ -420,6 +436,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                return ret;
        }
+
+       health_code_update();
+
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
                /*
                 * Notify the session daemon that the command is completed.
@@ -432,6 +451,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return -ENOENT;
        }
 
+       health_code_update();
+
        /* relayd needs RCU read-side protection */
        rcu_read_lock();
 
@@ -441,7 +462,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Session daemon status message are handled in the following call. */
                ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
-                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
+                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+                                msg.u.relayd_sock.relayd_session_id);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_CHANNEL:
@@ -449,12 +471,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *new_channel;
                int ret_recv;
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
+
+               health_code_update();
+
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
@@ -492,6 +519,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                };
 
+               health_code_update();
+
                if (ctx->on_recv_channel != NULL) {
                        ret_recv = ctx->on_recv_channel(new_channel);
                        if (ret_recv == 0) {
@@ -502,6 +531,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = consumer_add_channel(new_channel, ctx);
                }
+               if (CONSUMER_CHANNEL_TYPE_DATA) {
+                       consumer_timer_live_start(new_channel,
+                                       msg.u.channel.live_timer_interval);
+               }
+
+               health_code_update();
 
                /* If we received an error in add_channel, we need to report it. */
                if (ret < 0) {
@@ -536,23 +571,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
-               if (ret_code != LTTNG_OK) {
+
+               health_code_update();
+
+               if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
                        /* Channel was not found. */
                        goto end_nosignal;
                }
 
                /* Blocking call */
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        rcu_read_unlock();
                        return -EINTR;
                }
 
+               health_code_update();
+
                /* Get stream file descriptor from socket */
                ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
                if (ret != sizeof(fd)) {
@@ -561,6 +606,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               health_code_update();
+
                /*
                 * Send status code to session daemon only if the recv works. If the
                 * above recv() failed, the session daemon is notified through the
@@ -572,6 +619,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                new_stream = consumer_allocate_stream(channel->key,
                                fd,
                                LTTNG_CONSUMER_ACTIVE_STREAM,
@@ -629,6 +678,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                new_stream->hangup_flush_done = 0;
 
+               health_code_update();
+
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
@@ -637,6 +688,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               health_code_update();
+
                if (new_stream->metadata_flag) {
                        channel->metadata_stream = new_stream;
                }
@@ -684,6 +737,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Vitible to other threads */
                new_stream->globally_visible = 1;
 
+               health_code_update();
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
@@ -701,6 +756,57 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                new_stream->name, fd, new_stream->relayd_stream_id);
                break;
        }
+       case LTTNG_CONSUMER_STREAMS_SENT:
+       {
+               struct lttng_consumer_channel *channel;
+
+               /*
+                * Get stream's channel reference. Needed when adding the stream to the
+                * global hash table.
+                */
+               channel = consumer_find_channel(msg.u.sent_streams.channel_key);
+               if (!channel) {
+                       /*
+                        * We could not find the channel. Can happen if cpu hotplug
+                        * happens while tearing down.
+                        */
+                       ERR("Unable to find channel key %" PRIu64,
+                                       msg.u.sent_streams.channel_key);
+                       ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+
+               /*
+                * Send status code to session daemon.
+                */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               health_code_update();
+
+               /*
+                * We should not send this message if we don't monitor the
+                * streams in this channel.
+                */
+               if (!channel->monitor) {
+                       break;
+               }
+
+               health_code_update();
+               /* Send stream to relayd if the stream has an ID. */
+               if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
+                       ret = consumer_send_relayd_streams_sent(
+                                       msg.u.sent_streams.net_seq_idx);
+                       if (ret < 0) {
+                               goto end_nosignal;
+                       }
+               }
+               break;
+       }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
                rcu_read_unlock();
@@ -734,6 +840,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_flag_relayd_for_destroy(relayd);
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -751,6 +859,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                ret = consumer_data_pending(id);
 
+               health_code_update();
+
                /* Send back returned value to session daemon */
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
@@ -786,6 +896,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -804,12 +916,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                /*
                 * This command should ONLY be issued for channel with streams set in
                 * no monitor mode.
@@ -837,6 +953,7 @@ end_nosignal:
         * Return 1 to indicate success since the 0 value can be a socket
         * shutdown during the recv() or send() call.
         */
+       health_code_update();
        return 1;
 
 error_fatal:
@@ -850,7 +967,7 @@ error_fatal:
  *
  * Return 0 on success or else a negative value.
  */
-static int get_index_values(struct lttng_packet_index *index, int infd)
+static int get_index_values(struct ctf_packet_index *index, int infd)
 {
        int ret;
 
@@ -899,6 +1016,42 @@ static int get_index_values(struct lttng_packet_index *index, int infd)
 error:
        return ret;
 }
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+{
+       int ret;
+
+       assert(metadata);
+
+       ret = kernctl_buffer_flush(metadata->wait_fd);
+       if (ret < 0) {
+               ERR("Failed to flush kernel stream");
+               goto end;
+       }
+
+       ret = kernctl_snapshot(metadata->wait_fd);
+       if (ret < 0) {
+               if (errno != EAGAIN) {
+                       ERR("Sync metadata, taking kernel snapshot failed.");
+                       goto end;
+               }
+               DBG("Sync metadata, no new kernel metadata");
+               /* No new metadata, exit. */
+               ret = ENODATA;
+               goto end;
+       }
+
+end:
+       return ret;
+}
 
 /*
  * Consume data on a file descriptor and write it on a trace file.
@@ -907,18 +1060,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 0;
+       int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
-       struct lttng_packet_index index;
+       struct ctf_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
-       /* Indicate that for this stream we have to write the index. */
-       if (stream->index_fd >= 0) {
-               write_index = 1;
-       }
-
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
@@ -942,11 +1090,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       if (!stream->metadata_flag && write_index) {
+       if (!stream->metadata_flag) {
                ret = get_index_values(&index, infd);
                if (ret < 0) {
                        goto end;
                }
+       } else {
+               write_index = 0;
        }
 
        switch (stream->chan->output) {
@@ -1028,14 +1178,25 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 
        /* Write index if needed. */
-       if (write_index) {
-               err = index_write(stream->index_fd, &index, sizeof(index));
+       if (!write_index) {
+               goto end;
+       }
+
+       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+               /*
+                * In live, block until all the metadata is sent.
+                */
+               err = consumer_stream_sync_metadata(ctx, stream->session_id);
                if (err < 0) {
-                       ret = -1;
                        goto end;
                }
        }
 
+       err = consumer_stream_write_index(stream, &index);
+       if (err < 0) {
+               goto end;
+       }
+
 end:
        return ret;
 }
This page took 0.030398 seconds and 4 git commands to generate.