Cleanup: remove duplicated code in snapshot record command
[lttng-tools.git] / src / common / consumer / consumer.c
index 6de72e2758b5086f35c36e96f69276eaa8dfbdf0..3c20c57bda9d68ac1967de0cf433901ab8cce7e6 100644 (file)
@@ -393,6 +393,10 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        iter.iter.node = &channel->node.node;
        ret = lttng_ht_del(consumer_data.channel_ht, &iter);
        assert(!ret);
+
+       iter.iter.node = &channel->channels_by_session_id_ht_node.node;
+       ret = lttng_ht_del(consumer_data.channels_by_session_id_ht, &iter);
+       assert(!ret);
        rcu_read_unlock();
 
        call_rcu(&channel->node.head, free_channel_rcu);
@@ -1035,6 +1039,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        }
 
        lttng_ht_node_init_u64(&channel->node, channel->key);
+       lttng_ht_node_init_u64(&channel->channels_by_session_id_ht_node,
+                       channel->session_id);
 
        channel->wait_fd = -1;
 
@@ -1067,6 +1073,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
        rcu_read_lock();
        lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
+       lttng_ht_add_u64(consumer_data.channels_by_session_id_ht,
+                       &channel->channels_by_session_id_ht_node);
        rcu_read_unlock();
 
        pthread_mutex_unlock(&channel->timer_lock);
@@ -1225,6 +1233,7 @@ void lttng_consumer_cleanup(void)
        rcu_read_unlock();
 
        lttng_ht_destroy(consumer_data.channel_ht);
+       lttng_ht_destroy(consumer_data.channels_by_session_id_ht);
 
        cleanup_relayd_ht();
 
@@ -1236,6 +1245,8 @@ void lttng_consumer_cleanup(void)
         * it.
         */
        lttng_ht_destroy(consumer_data.stream_list_ht);
+
+       lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry);
 }
 
 /*
@@ -2396,11 +2407,6 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       if (!revents) {
-                               /* No activity for this FD (poll implementation). */
-                               continue;
-                       }
-
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & LPOLLIN) {
                                        ssize_t pipe_len;
@@ -2990,11 +2996,6 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       if (!revents) {
-                               /* No activity for this FD (poll implementation). */
-                               continue;
-                       }
-
                        if (pollfd == ctx->consumer_channel_pipe[0]) {
                                if (revents & LPOLLIN) {
                                        enum consumer_channel_action action;
@@ -3433,6 +3434,12 @@ int lttng_consumer_init(void)
                goto error;
        }
 
+       consumer_data.channels_by_session_id_ht =
+                       lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.channels_by_session_id_ht) {
+               goto error;
+       }
+
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!consumer_data.relayd_ht) {
                goto error;
@@ -3458,6 +3465,11 @@ int lttng_consumer_init(void)
                goto error;
        }
 
+       consumer_data.chunk_registry = lttng_trace_chunk_registry_create();
+       if (!consumer_data.chunk_registry) {
+               goto error;
+       }
+
        return 0;
 
 error:
@@ -3568,7 +3580,6 @@ error:
 
                /* Assign new file descriptor */
                relayd->control_sock.sock.fd = fd;
-               fd = -1;        /* For error path */
                /* Assign version values. */
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
@@ -3596,7 +3607,6 @@ error:
 
                /* Assign new file descriptor */
                relayd->data_sock.sock.fd = fd;
-               fd = -1;        /* for eventual error paths */
                /* Assign version values. */
                relayd->data_sock.major = relayd_sock->major;
                relayd->data_sock.minor = relayd_sock->minor;
@@ -3610,6 +3620,11 @@ error:
        DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
+       /*
+        * We gave the ownership of the fd to the relayd structure. Set the
+        * fd to -1 so we don't call close() on it in the error path below.
+        */
+       fd = -1;
 
        /* We successfully added the socket. Send status back. */
        ret = consumer_send_status_msg(sock, ret_code);
@@ -3896,15 +3911,16 @@ end:
  * is already at the rotate position (produced == consumed), we flag it as
  * ready for rotation. The rotation of ready streams occurs after we have
  * replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_consumer_rotate_channel(uint64_t key, const char *path,
-               uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+               uint64_t key, const char *path, uint64_t relayd_id,
+               uint32_t metadata, uint64_t new_chunk_id,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
@@ -3913,13 +3929,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path,
 
        rcu_read_lock();
 
-       channel = consumer_find_channel(key);
-       if (!channel) {
-               ERR("No channel found for key %" PRIu64, key);
-               ret = -1;
-               goto end;
-       }
-
        pthread_mutex_lock(&channel->lock);
        channel->current_chunk_id = new_chunk_id;
 
@@ -3984,7 +3993,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path,
                if (consumed_pos == stream->rotate_position) {
                        stream->rotate_ready = true;
                }
-               channel->nr_stream_rotate_pending++;
 
                ret = consumer_flush_buffer(stream, 1);
                if (ret < 0) {
@@ -4233,14 +4241,15 @@ error:
  * This is especially important for low throughput streams that have already
  * been consumed, we cannot wait for their next packet to perform the
  * rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_consumer_rotate_ready_streams(uint64_t key,
-               struct lttng_consumer_local_data *ctx)
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+               uint64_t key, struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
@@ -4249,13 +4258,6 @@ int lttng_consumer_rotate_ready_streams(uint64_t key,
 
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
-       channel = consumer_find_channel(key);
-       if (!channel) {
-               ERR("No channel found for key %" PRIu64, key);
-               ret = -1;
-               goto end;
-       }
-
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
@@ -4483,3 +4485,24 @@ int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
                return mkdir_local(path, uid, gid);
        }
 }
+
+enum lttcomm_return_code lttng_consumer_init_command(
+               struct lttng_consumer_local_data *ctx,
+               const lttng_uuid sessiond_uuid)
+{
+       enum lttcomm_return_code ret;
+       char uuid_str[UUID_STR_LEN];
+
+       if (ctx->sessiond_uuid.is_set) {
+               ret = LTTCOMM_CONSUMERD_ALREADY_SET;
+               goto end;
+       }
+
+       ctx->sessiond_uuid.is_set = true;
+       memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+       lttng_uuid_to_str(sessiond_uuid, uuid_str);
+       DBG("Received session daemon UUID: %s", uuid_str);
+end:
+       return ret;
+}
This page took 0.026488 seconds and 4 git commands to generate.