Fix: race with the viewer and readiness of streams
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 1b01d414a0248b0207bbf72aba620a8fa62b93e4..c95355e9a472e35970ab7b734b18611d14bc356c 100644 (file)
@@ -176,6 +176,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
                                        path, stream->name, stream->key);
                }
+               ret = consumer_send_relayd_streams_sent(relayd_id);
+               if (ret < 0) {
+                       ERR("sending streams sent to relayd");
+                       goto end_unlock;
+               }
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
@@ -751,6 +756,57 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                new_stream->name, fd, new_stream->relayd_stream_id);
                break;
        }
+       case LTTNG_CONSUMER_STREAMS_SENT:
+       {
+               struct lttng_consumer_channel *channel;
+
+               /*
+                * Get stream's channel reference. Needed when adding the stream to the
+                * global hash table.
+                */
+               channel = consumer_find_channel(msg.u.sent_streams.channel_key);
+               if (!channel) {
+                       /*
+                        * We could not find the channel. Can happen if cpu hotplug
+                        * happens while tearing down.
+                        */
+                       ERR("Unable to find channel key %" PRIu64,
+                                       msg.u.sent_streams.channel_key);
+                       ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+
+               /*
+                * Send status code to session daemon.
+                */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               health_code_update();
+
+               /*
+                * We should not send this message if we don't monitor the
+                * streams in this channel.
+                */
+               if (!channel->monitor) {
+                       break;
+               }
+
+               health_code_update();
+               /* Send stream to relayd if the stream has an ID. */
+               if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
+                       ret = consumer_send_relayd_streams_sent(
+                                       msg.u.sent_streams.net_seq_idx);
+                       if (ret < 0) {
+                               goto end_nosignal;
+                       }
+               }
+               break;
+       }
        case LTTNG_CONSUMER_UPDATE_STREAM:
        {
                rcu_read_unlock();
@@ -911,7 +967,7 @@ error_fatal:
  *
  * Return 0 on success or else a negative value.
  */
-static int get_index_values(struct lttng_packet_index *index, int infd)
+static int get_index_values(struct ctf_packet_index *index, int infd)
 {
        int ret;
 
@@ -1007,7 +1063,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
-       struct lttng_packet_index index;
+       struct ctf_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
This page took 0.024826 seconds and 4 git commands to generate.