Fix: add globally visible flag in stream
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 02198bd1b9e854f26a8614c5f76fbf76a36835f1..bdf0eff7c4a2b56a72bdda1f5bd06ea35e213a98 100644 (file)
@@ -104,72 +104,6 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-/*
- * Find a relayd and send the stream
- *
- * Returns 0 on success, < 0 on error
- */
-static int send_relayd_stream(struct lttng_consumer_stream *stream,
-               char *path)
-{
-       int ret = 0;
-       const char *stream_path;
-       struct consumer_relayd_sock_pair *relayd;
-
-       assert(stream);
-       assert(stream->net_seq_idx != -1ULL);
-
-       if (path != NULL) {
-               stream_path = path;
-       } else {
-               stream_path = stream->chan->pathname;
-       }
-
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               /* Add stream on the relayd */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_add_stream(&relayd->control_sock, stream->name,
-                               stream_path, &stream->relayd_stream_id,
-                               stream->chan->tracefile_size, stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       goto end;
-               }
-               uatomic_inc(&relayd->refcount);
-       } else {
-               ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
-                               stream->key, stream->net_seq_idx);
-               ret = -1;
-               goto end;
-       }
-
-       DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
-                       stream->name, stream->key, stream->net_seq_idx);
-
-end:
-       rcu_read_unlock();
-       return ret;
-}
-
-/*
- * Find a relayd and close the stream
- */
-static void close_relayd_stream(struct lttng_consumer_stream *stream)
-{
-       struct consumer_relayd_sock_pair *relayd;
-
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd) {
-               consumer_stream_relayd_close(stream, relayd);
-       }
-       rcu_read_unlock();
-}
-
 /*
  * Take a snapshot of all the stream of a channel
  *
@@ -201,8 +135,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                goto end;
        }
 
-       cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
-                       no_monitor_node) {
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /*
                 * Lock stream because we are about to change its state.
                 */
@@ -215,14 +148,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
                if (relayd_id != (uint64_t) -1ULL) {
-                       ret = send_relayd_stream(stream, path);
+                       ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
                                goto end_unlock;
                        }
                } else {
                        ret = utils_create_stream_file(path, stream->name,
-                                       stream->chan->tracefile_size, stream->tracefile_count_current,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current,
                                        stream->uid, stream->gid);
                        if (ret < 0) {
                                ERR("utils_create_stream_file");
@@ -238,7 +172,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
-                       ERR("Failed to flush kernel metadata stream");
+                       ERR("Failed to flush kernel stream");
                        goto end_unlock;
                }
 
@@ -391,7 +325,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        }
 
        if (use_relayd) {
-               ret = send_relayd_stream(metadata_stream, path);
+               ret = consumer_send_relayd_stream(metadata_stream, path);
                if (ret < 0) {
                        goto error;
                }
@@ -436,6 +370,9 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
 
        ret = 0;
 
+       cds_list_del(&metadata_stream->send_node);
+       consumer_stream_destroy(metadata_stream, NULL);
+       metadata_channel->metadata_stream = NULL;
 error:
        rcu_read_unlock();
        return ret;
@@ -622,7 +559,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                channel->session_id,
                                msg.u.stream.cpu,
                                &alloc_ret,
-                               channel->type);
+                               channel->type,
+                               channel->monitor);
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
@@ -685,15 +623,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        DBG("Kernel consumer add stream %s in no monitor mode with "
                                        "relayd id %" PRIu64, new_stream->name,
                                        new_stream->net_seq_idx);
-                       cds_list_add(&new_stream->no_monitor_node,
-                                       &channel->stream_no_monitor_list.head);
+                       cds_list_add(&new_stream->send_node, &channel->streams.head);
                        break;
                }
 
-               ret = send_relayd_stream(new_stream, NULL);
-               if (ret < 0) {
-                       consumer_stream_free(new_stream);
-                       goto end_nosignal;
+               /* Send stream to relayd if the stream has an ID. */
+               if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+                       ret = consumer_send_relayd_stream(new_stream, NULL);
+                       if (ret < 0) {
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                }
 
                /* Get the right pipe where the stream will be sent. */
@@ -1023,11 +963,12 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        return 0;
 
 error_close_fd:
-       {
+       if (stream->out_fd >= 0) {
                int err;
 
                err = close(stream->out_fd);
                assert(!err);
+               stream->out_fd = -1;
        }
 error:
        return ret;
This page took 0.025322 seconds and 4 git commands to generate.