Move LTTng-UST buffer ownership from application to consumer
[lttng-tools.git] / src / common / consumer.c
index 08592f6c0b8713f1ce1765a7de4d9237fd8bd81a..09b3bee330457d46483a85d1f2f5402e25241b9e 100644 (file)
@@ -82,7 +82,7 @@ static void notify_thread_pipe(int wpipe)
  * Find a stream. The consumer_data.lock must be locked during this
  * call.
  */
-static struct lttng_consumer_stream *consumer_find_stream(int key,
+static struct lttng_consumer_stream *find_stream(int key,
                struct lttng_ht *ht)
 {
        struct lttng_ht_iter iter;
@@ -109,12 +109,12 @@ static struct lttng_consumer_stream *consumer_find_stream(int key,
        return stream;
 }
 
-void consumer_steal_stream_key(int key, struct lttng_ht *ht)
+static void steal_stream_key(int key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
        rcu_read_lock();
-       stream = consumer_find_stream(key, ht);
+       stream = find_stream(key, ht);
        if (stream) {
                stream->key = -1;
                /*
@@ -133,7 +133,7 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht)
  * RCU read side lock MUST be acquired before calling this function and
  * protects the channel ptr.
  */
-static struct lttng_consumer_channel *consumer_find_channel(int key)
+struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
 {
        struct lttng_ht_iter iter;
        struct lttng_ht_node_ulong *node;
@@ -144,8 +144,7 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
                return NULL;
        }
 
-       lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
-                       &iter);
+       lttng_ht_lookup(consumer_data.channel_ht, (void *) key, &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
                channel = caa_container_of(node, struct lttng_consumer_channel, node);
@@ -154,39 +153,30 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
        return channel;
 }
 
-static void consumer_steal_channel_key(int key)
+static void free_stream_rcu(struct rcu_head *head)
 {
-       struct lttng_consumer_channel *channel;
+       struct lttng_ht_node_ulong *node =
+               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_consumer_stream *stream =
+               caa_container_of(node, struct lttng_consumer_stream, node);
 
-       rcu_read_lock();
-       channel = consumer_find_channel(key);
-       if (channel) {
-               channel->key = -1;
-               /*
-                * We don't want the lookup to match, but we still need
-                * to iterate on this channel when iterating over the hash table. Just
-                * change the node key.
-                */
-               channel->node.key = -1;
-       }
-       rcu_read_unlock();
+       free(stream);
 }
 
-static
-void consumer_free_stream(struct rcu_head *head)
+static void free_channel_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_ulong *node =
                caa_container_of(head, struct lttng_ht_node_ulong, head);
-       struct lttng_consumer_stream *stream =
-               caa_container_of(node, struct lttng_consumer_stream, node);
+       struct lttng_consumer_channel *channel =
+               caa_container_of(node, struct lttng_consumer_channel, node);
 
-       free(stream);
+       free(channel);
 }
 
 /*
  * RCU protected relayd socket pair free.
  */
-static void consumer_rcu_free_relayd(struct rcu_head *head)
+static void free_relayd_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_ulong *node =
                caa_container_of(head, struct lttng_ht_node_ulong, head);
@@ -231,7 +221,44 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
        }
 
        /* RCU free() call */
-       call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
+       call_rcu(&relayd->node.head, free_relayd_rcu);
+}
+
+/*
+ * Remove a channel from the global list protected by a mutex. This function is
+ * also responsible for freeing its data structures.
+ */
+void consumer_del_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+
+       DBG("Consumer delete channel key %d", channel->key);
+
+       pthread_mutex_lock(&consumer_data.lock);
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_del_channel(channel);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               goto end;
+       }
+
+       rcu_read_lock();
+       iter.iter.node = &channel->node.node;
+       ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+       assert(!ret);
+       rcu_read_unlock();
+
+       call_rcu(&channel->node.head, free_channel_rcu);
+end:
+       pthread_mutex_unlock(&consumer_data.lock);
 }
 
 /*
@@ -368,7 +395,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
 
        if (ht == NULL) {
                /* Means the stream was allocated but not successfully added */
-               goto free_stream;
+               goto free_stream_rcu;
        }
 
        pthread_mutex_lock(&consumer_data.lock);
@@ -413,18 +440,6 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
                        PERROR("close");
                }
        }
-       if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
-               ret = close(stream->wait_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-       if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
-               ret = close(stream->shm_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
 
        /* Check and cleanup relayd */
        rcu_read_lock();
@@ -458,7 +473,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
 
        uatomic_dec(&stream->chan->refcount);
        if (!uatomic_read(&stream->chan->refcount)
-                       && !uatomic_read(&stream->chan->nb_init_streams)) {
+                       && !uatomic_read(&stream->chan->nb_init_stream_left)) {
                free_chan = stream->chan;
        }
 
@@ -471,91 +486,67 @@ end:
                consumer_del_channel(free_chan);
        }
 
-free_stream:
-       call_rcu(&stream->node.head, consumer_free_stream);
+free_stream_rcu:
+       call_rcu(&stream->node.head, free_stream_rcu);
 }
 
-struct lttng_consumer_stream *consumer_allocate_stream(
-               int channel_key, int stream_key,
-               int shm_fd, int wait_fd,
+struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
+               int stream_key,
                enum lttng_consumer_stream_state state,
-               uint64_t mmap_len,
-               enum lttng_event_output output,
-               const char *path_name,
+               const char *channel_name,
                uid_t uid,
                gid_t gid,
-               int net_index,
-               int metadata_flag,
+               int relayd_id,
                uint64_t session_id,
-               int *alloc_ret)
+               int cpu,
+               int *alloc_ret,
+               enum consumer_channel_type type)
 {
+       int ret;
        struct lttng_consumer_stream *stream;
 
        stream = zmalloc(sizeof(*stream));
        if (stream == NULL) {
                PERROR("malloc struct lttng_consumer_stream");
-               *alloc_ret = -ENOMEM;
+               ret = -ENOMEM;
                goto end;
        }
 
        rcu_read_lock();
 
-       /*
-        * Get stream's channel reference. Needed when adding the stream to the
-        * global hash table.
-        */
-       stream->chan = consumer_find_channel(channel_key);
-       if (!stream->chan) {
-               *alloc_ret = -ENOENT;
-               ERR("Unable to find channel for stream %d", stream_key);
-               goto error;
-       }
-
        stream->key = stream_key;
-       stream->shm_fd = shm_fd;
-       stream->wait_fd = wait_fd;
        stream->out_fd = -1;
        stream->out_fd_offset = 0;
        stream->state = state;
-       stream->mmap_len = mmap_len;
-       stream->mmap_base = NULL;
-       stream->output = output;
        stream->uid = uid;
        stream->gid = gid;
-       stream->net_seq_idx = net_index;
-       stream->metadata_flag = metadata_flag;
+       stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
-       strncpy(stream->path_name, path_name, sizeof(stream->path_name));
-       stream->path_name[sizeof(stream->path_name) - 1] = '\0';
        pthread_mutex_init(&stream->lock, NULL);
 
-       /*
-        * Index differently the metadata node because the thread is using an
-        * internal hash table to match streams in the metadata_ht to the epoll set
-        * file descriptor.
-        */
-       if (metadata_flag) {
-               lttng_ht_node_init_ulong(&stream->node, stream->wait_fd);
+       /* If channel is the metadata, flag this stream as metadata. */
+       if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               stream->metadata_flag = 1;
+               /* Metadata is flat out. */
+               strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
        } else {
-               lttng_ht_node_init_ulong(&stream->node, stream->key);
+               /* Format stream name to <channel_name>_<cpu_number> */
+               ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
+                               channel_name, cpu);
+               if (ret < 0) {
+                       PERROR("snprintf stream name");
+                       goto error;
+               }
        }
 
+       /* Key is always the wait_fd for streams. */
+       lttng_ht_node_init_ulong(&stream->node, stream->key);
+
        /* Init session id node with the stream session id */
        lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
 
-       /*
-        * The cpu number is needed before using any ustctl_* actions. Ignored for
-        * the kernel so the value does not matter.
-        */
-       pthread_mutex_lock(&consumer_data.lock);
-       stream->cpu = stream->chan->cpucount++;
-       pthread_mutex_unlock(&consumer_data.lock);
-
-       DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
-                       " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
-                       stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
-                       (unsigned long long) stream->mmap_len, stream->out_fd,
-                       stream->net_seq_idx, stream->session_id);
+       DBG3("Allocated stream %s (key %d, relayd_id %d, session_id %" PRIu64,
+                       stream->name, stream->key, stream->net_seq_idx, stream->session_id);
 
        rcu_read_unlock();
        return stream;
@@ -564,13 +555,16 @@ error:
        rcu_read_unlock();
        free(stream);
 end:
+       if (alloc_ret) {
+               *alloc_ret = ret;
+       }
        return NULL;
 }
 
 /*
  * Add a stream to the global list protected by a mutex.
  */
-static int consumer_add_stream(struct lttng_consumer_stream *stream,
+static int add_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
        int ret = 0;
@@ -586,7 +580,7 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        rcu_read_lock();
 
        /* Steal stream identifier to avoid having streams with the same key */
-       consumer_steal_stream_key(stream->key, ht);
+       steal_stream_key(stream->key, ht);
 
        lttng_ht_add_unique_ulong(ht, &stream->node);
 
@@ -607,14 +601,14 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
        uatomic_inc(&stream->chan->refcount);
 
        /*
-        * When nb_init_streams reaches 0, we don't need to trigger any action in
-        * terms of destroying the associated channel, because the action that
+        * When nb_init_stream_left reaches 0, we don't need to trigger any action
+        * in terms of destroying the associated channel, because the action that
         * causes the count to become 0 also causes a stream to be added. The
         * channel deletion will thus be triggered by the following removal of this
         * stream.
         */
-       if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
-               uatomic_dec(&stream->chan->nb_init_streams);
+       if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+               uatomic_dec(&stream->chan->nb_init_stream_left);
        }
 
        /* Update consumer data once the node is inserted. */
@@ -638,16 +632,12 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd)
        struct lttng_ht_node_ulong *node;
        struct lttng_ht_iter iter;
 
-       if (relayd == NULL) {
-               ret = -1;
-               goto end;
-       }
+       assert(relayd);
 
        lttng_ht_lookup(consumer_data.relayd_ht,
                        (void *)((unsigned long) relayd->net_seq_idx), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
-               /* Relayd already exist. Ignore the insertion */
                goto end;
        }
        lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
@@ -774,119 +764,48 @@ error:
        return outfd;
 }
 
-static
-void consumer_free_channel(struct rcu_head *head)
-{
-       struct lttng_ht_node_ulong *node =
-               caa_container_of(head, struct lttng_ht_node_ulong, head);
-       struct lttng_consumer_channel *channel =
-               caa_container_of(node, struct lttng_consumer_channel, node);
-
-       free(channel);
-}
-
 /*
- * Remove a channel from the global list protected by a mutex. This
- * function is also responsible for freeing its data structures.
+ * Allocate and return a new lttng_consumer_channel object using the given key
+ * to initialize the hash table node.
+ *
+ * On error, return NULL.
  */
-void consumer_del_channel(struct lttng_consumer_channel *channel)
-{
-       int ret;
-       struct lttng_ht_iter iter;
-
-       DBG("Consumer delete channel key %d", channel->key);
-
-       pthread_mutex_lock(&consumer_data.lock);
-
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_del_channel(channel);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               goto end;
-       }
-
-       rcu_read_lock();
-       iter.iter.node = &channel->node.node;
-       ret = lttng_ht_del(consumer_data.channel_ht, &iter);
-       assert(!ret);
-       rcu_read_unlock();
-
-       if (channel->mmap_base != NULL) {
-               ret = munmap(channel->mmap_base, channel->mmap_len);
-               if (ret != 0) {
-                       PERROR("munmap");
-               }
-       }
-       if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
-               ret = close(channel->wait_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-       if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
-               ret = close(channel->shm_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
-       call_rcu(&channel->node.head, consumer_free_channel);
-end:
-       pthread_mutex_unlock(&consumer_data.lock);
-}
-
-struct lttng_consumer_channel *consumer_allocate_channel(
-               int channel_key,
-               int shm_fd, int wait_fd,
-               uint64_t mmap_len,
-               uint64_t max_sb_size,
-               unsigned int nb_init_streams)
+struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+               uint64_t session_id,
+               const char *pathname,
+               const char *name,
+               uid_t uid,
+               gid_t gid,
+               int relayd_id,
+               enum lttng_event_output output)
 {
        struct lttng_consumer_channel *channel;
-       int ret;
 
        channel = zmalloc(sizeof(*channel));
        if (channel == NULL) {
                PERROR("malloc struct lttng_consumer_channel");
                goto end;
        }
-       channel->key = channel_key;
-       channel->shm_fd = shm_fd;
-       channel->wait_fd = wait_fd;
-       channel->mmap_len = mmap_len;
-       channel->max_sb_size = max_sb_size;
+
+       channel->key = key;
        channel->refcount = 0;
-       channel->nb_init_streams = nb_init_streams;
+       channel->session_id = session_id;
+       channel->uid = uid;
+       channel->gid = gid;
+       channel->relayd_id = relayd_id;
+       channel->output = output;
+
+       strncpy(channel->pathname, pathname, sizeof(channel->pathname));
+       channel->pathname[sizeof(channel->pathname) - 1] = '\0';
+
+       strncpy(channel->name, name, sizeof(channel->name));
+       channel->name[sizeof(channel->name) - 1] = '\0';
+
        lttng_ht_node_init_ulong(&channel->node, channel->key);
+       CDS_INIT_LIST_HEAD(&channel->streams.head);
+
+       DBG("Allocated channel (key %d)", channel->key)
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               channel->mmap_base = NULL;
-               channel->mmap_len = 0;
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               ret = lttng_ustconsumer_allocate_channel(channel);
-               if (ret) {
-                       free(channel);
-                       return NULL;
-               }
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               goto end;
-       }
-       DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
-                       channel->key, channel->shm_fd, channel->wait_fd,
-                       (unsigned long long) channel->mmap_len,
-                       (unsigned long long) channel->max_sb_size);
 end:
        return channel;
 }
@@ -896,12 +815,11 @@ end:
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel)
 {
+       int ret = 0;
        struct lttng_ht_node_ulong *node;
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
-       /* Steal channel identifier, for UST */
-       consumer_steal_channel_key(channel->key);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht,
@@ -909,6 +827,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel)
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
                /* Channel already exist. Ignore the insertion */
+               ERR("Consumer add channel key %d already exists!", channel->key);
+               ret = -1;
                goto end;
        }
 
@@ -918,7 +838,7 @@ end:
        rcu_read_unlock();
        pthread_mutex_unlock(&consumer_data.lock);
 
-       return 0;
+       return ret;
 }
 
 /*
@@ -928,14 +848,19 @@ end:
  *
  * Returns the number of fds in the structures.
  */
-static int consumer_update_poll_array(
-               struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
-               struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
+static int update_poll_array(struct lttng_consumer_local_data *ctx,
+               struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
+               struct lttng_ht *ht)
 {
        int i = 0;
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
 
+       assert(ctx);
+       assert(ht);
+       assert(pollfd);
+       assert(local_stream);
+
        DBG("Updating poll fd array");
        rcu_read_lock();
        cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
@@ -1002,8 +927,8 @@ exit:
 /*
  * Set the error socket.
  */
-void lttng_consumer_set_error_sock(
-               struct lttng_consumer_local_data *ctx, int sock)
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
+               int sock)
 {
        ctx->consumer_error_socket = sock;
 }
@@ -1021,8 +946,7 @@ void lttng_consumer_set_command_sock_path(
  * Send return code to the session daemon.
  * If the socket is not defined, we return 0, it is not a fatal error
  */
-int lttng_consumer_send_error(
-               struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
 {
        if (ctx->consumer_error_socket > 0) {
                return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
@@ -1039,14 +963,12 @@ int lttng_consumer_send_error(
 void lttng_consumer_cleanup(void)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_consumer_channel *channel;
 
        rcu_read_lock();
 
-       cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
-                       node) {
-               struct lttng_consumer_channel *channel =
-                       caa_container_of(node, struct lttng_consumer_channel, node);
+       cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
+                       node.node) {
                consumer_del_channel(channel);
        }
 
@@ -1092,11 +1014,11 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
         * Don't care about error values, as these are just hints and ways to
         * limit the amount of page cache used.
         */
-       if (orig_offset < stream->chan->max_sb_size) {
+       if (orig_offset < stream->max_sb_size) {
                return;
        }
-       lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
-                       stream->chan->max_sb_size,
+       lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
+                       stream->max_sb_size,
                        SYNC_FILE_RANGE_WAIT_BEFORE
                        | SYNC_FILE_RANGE_WRITE
                        | SYNC_FILE_RANGE_WAIT_AFTER);
@@ -1114,8 +1036,8 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
         * defined. So it can be expected to lead to lower throughput in
         * streaming.
         */
-       posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
-                       stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
+       posix_fadvise(outfd, orig_offset - stream->max_sb_size,
+                       stream->max_sb_size, POSIX_FADV_DONTNEED);
 }
 
 /*
@@ -1281,8 +1203,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
  */
 static int write_relayd_metadata_id(int fd,
                struct lttng_consumer_stream *stream,
-               struct consumer_relayd_sock_pair *relayd,
-               unsigned long padding)
+               struct consumer_relayd_sock_pair *relayd, unsigned long padding)
 {
        int ret;
        struct lttcomm_relayd_metadata_payload hdr;
@@ -1334,6 +1255,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                unsigned long padding)
 {
        unsigned long mmap_offset;
+       void *mmap_base;
        ssize_t ret = 0, written = 0;
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
@@ -1355,12 +1277,18 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* get the offset inside the fd to mmap */
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
+               mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
-                               stream->buf, &mmap_offset);
+               mmap_base = lttng_ustctl_get_mmap_base(stream);
+               if (!mmap_base) {
+                       ERR("read mmap get mmap base for stream %s", stream->name);
+                       written = -1;
+                       goto end;
+               }
+               ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -1420,7 +1348,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
 
        while (len > 0) {
                do {
-                       ret = write(outfd, stream->mmap_base + mmap_offset, len);
+                       ret = write(outfd, mmap_base + mmap_offset, len);
                } while (ret < 0 && errno == EINTR);
                DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
                if (ret < 0) {
@@ -1693,15 +1621,14 @@ end:
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
 {
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               return lttng_kconsumer_take_snapshot(ctx, stream);
+               return lttng_kconsumer_take_snapshot(stream);
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               return lttng_ustconsumer_take_snapshot(ctx, stream);
+               return lttng_ustconsumer_take_snapshot(stream);
        default:
                ERR("Unknown consumer_data type");
                assert(0);
@@ -1715,17 +1642,15 @@ int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_consumer_get_produced_snapshot(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream,
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
                unsigned long *pos)
 {
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
+               return lttng_kconsumer_get_produced_snapshot(stream, pos);
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
+               return lttng_ustconsumer_get_produced_snapshot(stream, pos);
        default:
                ERR("Unknown consumer_data type");
                assert(0);
@@ -1825,7 +1750,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 
        if (ht == NULL) {
                /* Means the stream was allocated but not successfully added */
-               goto free_stream;
+               goto free_stream_rcu;
        }
 
        pthread_mutex_lock(&consumer_data.lock);
@@ -1868,20 +1793,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                }
        }
 
-       if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
-               ret = close(stream->wait_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
-       if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
-               ret = close(stream->shm_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
        /* Check and cleanup relayd */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1914,7 +1825,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        /* Atomically decrement channel refcount since other threads can use it. */
        uatomic_dec(&stream->chan->refcount);
        if (!uatomic_read(&stream->chan->refcount)
-                       && !uatomic_read(&stream->chan->nb_init_streams)) {
+                       && !uatomic_read(&stream->chan->nb_init_stream_left)) {
                /* Go for channel deletion! */
                free_chan = stream->chan;
        }
@@ -1927,15 +1838,15 @@ end:
                consumer_del_channel(free_chan);
        }
 
-free_stream:
-       call_rcu(&stream->node.head, consumer_free_stream);
+free_stream_rcu:
+       call_rcu(&stream->node.head, free_stream_rcu);
 }
 
 /*
  * Action done with the metadata stream when adding it to the consumer internal
  * data structures to handle it.
  */
-static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
+static int add_metadata_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
        int ret = 0;
@@ -1946,7 +1857,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        assert(stream);
        assert(ht);
 
-       DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
+       DBG3("Adding metadata stream %d to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->lock);
@@ -1962,7 +1873,7 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
         * Lookup the stream just to make sure it does not exist in our internal
         * state. This should NEVER happen.
         */
-       lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+       lttng_ht_lookup(ht, (void *)((unsigned long) stream->key), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        assert(!node);
 
@@ -1976,14 +1887,14 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        uatomic_inc(&stream->chan->refcount);
 
        /*
-        * When nb_init_streams reaches 0, we don't need to trigger any action in
-        * terms of destroying the associated channel, because the action that
+        * When nb_init_stream_left reaches 0, we don't need to trigger any action
+        * in terms of destroying the associated channel, because the action that
         * causes the count to become 0 also causes a stream to be added. The
         * channel deletion will thus be triggered by the following removal of this
         * stream.
         */
-       if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
-               uatomic_dec(&stream->chan->nb_init_streams);
+       if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+               uatomic_dec(&stream->chan->nb_init_stream_left);
        }
 
        lttng_ht_add_unique_ulong(ht, &stream->node);
@@ -2163,7 +2074,7 @@ restart:
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
-                                       ret = consumer_add_metadata_stream(stream, metadata_ht);
+                                       ret = add_metadata_stream(stream, metadata_ht);
                                        if (ret) {
                                                ERR("Unable to add metadata stream");
                                                /* Stream was not setup properly. Continuing. */
@@ -2305,7 +2216,7 @@ void *consumer_thread_data_poll(void *data)
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
-                       ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+                       ret = update_poll_array(ctx, &pollfd, local_stream,
                                        data_ht);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
@@ -2372,7 +2283,7 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
 
-                       ret = consumer_add_stream(new_stream, data_ht);
+                       ret = add_stream(new_stream, data_ht);
                        if (ret) {
                                ERR("Consumer add stream %d failed. Continuing",
                                                new_stream->key);
@@ -2605,10 +2516,10 @@ void *consumer_thread_sessiond_poll(void *data)
                        DBG("consumer_thread_receive_fds received quit from signal");
                        goto end;
                }
-               DBG("received fds on sock");
+               DBG("received command on sock");
        }
 end:
-       DBG("consumer_thread_receive_fds exiting");
+       DBG("Consumer thread sessiond poll exiting");
 
        /*
         * when all fds have hung up, the polling thread
@@ -2741,7 +2652,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
                ret = -1;
                fd = -1;        /* Just in case it gets set with an invalid value. */
-               goto error;
+               goto error_close;
        }
 
        /* We have the fds without error. Send status back. */
@@ -2784,6 +2695,13 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                }
                if (ret < 0) {
+                       /*
+                        * Close all sockets of a relayd object. It will be freed if it was
+                        * created at the error code path or else it will be garbage
+                        * collect.
+                        */
+                       (void) relayd_close(&relayd->control_sock);
+                       (void) relayd_close(&relayd->data_sock);
                        goto error;
                }
 
@@ -2833,10 +2751,8 @@ error:
                }
        }
 
+error_close:
        if (relayd_created) {
-               /* We just want to cleanup. Ignore ret value. */
-               (void) relayd_close(&relayd->control_sock);
-               (void) relayd_close(&relayd->data_sock);
                free(relayd);
        }
 
@@ -3044,3 +2960,26 @@ int consumer_send_status_msg(int sock, int ret_code)
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
+
+/*
+ * Send a channel status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_channel(int sock,
+               struct lttng_consumer_channel *channel)
+{
+       struct lttcomm_consumer_status_channel msg;
+
+       assert(sock >= 0);
+
+       if (!channel) {
+               msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+       } else {
+               msg.ret_code = LTTNG_OK;
+               msg.key = channel->key;
+               msg.stream_count = channel->streams.count;
+       }
+
+       return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}
This page took 0.034814 seconds and 4 git commands to generate.