Fix: race with the viewer and readiness of streams
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index a506737ab4170880b23c28a959c546e65ed9f81c..c95355e9a472e35970ab7b734b18611d14bc356c 100644 (file)
@@ -29,6 +29,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/sessiond-comm.h>
@@ -42,7 +43,6 @@
 #include <common/consumer-timer.h>
 
 #include "kernel-consumer.h"
-#include "../../bin/lttng-consumerd/health-consumerd.h"
 
 extern struct lttng_consumer_global_data consumer_data;
 extern int consumer_poll_timeout;
@@ -176,6 +176,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
                                        path, stream->name, stream->key);
                }
+               ret = consumer_send_relayd_streams_sent(relayd_id);
+               if (ret < 0) {
+                       ERR("sending streams sent to relayd");
+                       goto end_unlock;
+               }
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
@@ -418,7 +423,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttcomm_consumer_msg msg;
 
        health_code_update();
@@ -577,7 +582,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                health_code_update();
 
-               if (ret_code != LTTNG_OK) {
+               if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
                        /* Channel was not found. */
                        goto end_nosignal;
                }
@@ -751,6 +756,57 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                new_stream->name, fd, new_stream->relayd_stream_id);
                break;
        }
+       case LTTNG_CONSUMER_STREAMS_SENT:
+       {
+               struct lttng_consumer_channel *channel;
+
+               /*
+                * Get stream's channel reference. Needed when adding the stream to the
+                * global hash table.
+                */
+               channel = consumer_find_channel(msg.u.sent_streams.channel_key);
+               if (!channel) {
+                       /*
+                        * We could not find the channel. Can happen if cpu hotplug
+                        * happens while tearing down.
+                        */
+                       ERR("Unable to find channel key %" PRIu64,
+                                       msg.u.sent_streams.channel_key);
+                       ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+
+               /*
+                * Send status code to session daemon.
+                */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               health_code_update();
+
+               /*
+                * We should not send this message if we don't monitor the
+                * streams in this channel.
+                */
+               if (!channel->monitor) {
+                       break;
+               }
+
+               health_code_update();
+               /* Send stream to relayd if the stream has an ID. */
+               if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
+                       ret = consumer_send_relayd_streams_sent(
+                                       msg.u.sent_streams.net_seq_idx);
+                       if (ret < 0) {
+                               goto end_nosignal;
+                       }
+               }
+               break;
+       }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
                rcu_read_unlock();
@@ -911,7 +967,7 @@ error_fatal:
  *
  * Return 0 on success or else a negative value.
  */
-static int get_index_values(struct lttng_packet_index *index, int infd)
+static int get_index_values(struct ctf_packet_index *index, int infd)
 {
        int ret;
 
@@ -1007,7 +1063,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
-       struct lttng_packet_index index;
+       struct ctf_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
This page took 0.025907 seconds and 4 git commands to generate.