Add consumer socket object and relayd commands
[lttng-tools.git] / src / common / consumer.c
index 974c65f550c94a6be3fb94c18227f86ad133076e..63d0d65ee3157b05ef1113336185535ea23a9dbe 100644 (file)
@@ -177,11 +177,18 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
        int ret;
        struct lttng_ht_iter iter;
 
+       if (relayd == NULL) {
+               return;
+       }
+
        DBG("Consumer destroy and close relayd socket pair");
 
        iter.iter.node = &relayd->node.node;
        ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
-       assert(!ret);
+       if (ret != 0) {
+               /* We assume the relayd was already destroyed */
+               return;
+       }
 
        /* Close all sockets */
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
@@ -261,16 +268,27 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
        }
 
        /* Check and cleanup relayd */
+       rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
-               /* We are about to modify the relayd refcount */
-               rcu_read_lock();
-               if (!--relayd->refcount) {
-                       /* Refcount of the relayd struct is 0, destroy it */
+               uatomic_dec(&relayd->refcount);
+               assert(uatomic_read(&relayd->refcount) >= 0);
+
+               ret = relayd_send_close_stream(&relayd->control_sock,
+                               stream->relayd_stream_id,
+                               stream->next_net_seq_num - 1);
+               if (ret < 0) {
+                       ERR("Unable to close stream on the relayd. Continuing");
+                       /* Continue here. There is nothing we can do for the relayd.*/
+               }
+
+               /* Both conditions are met, we destroy the relayd. */
+               if (uatomic_read(&relayd->refcount) == 0 &&
+                               uatomic_read(&relayd->destroy_flag)) {
                        consumer_destroy_relayd(relayd);
                }
-               rcu_read_unlock();
        }
+       rcu_read_unlock();
 
        if (!--stream->chan->refcount) {
                free_chan = stream->chan;
@@ -371,8 +389,8 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal stream identifier, for UST */
        consumer_steal_stream_key(stream->key);
-       rcu_read_lock();
 
+       rcu_read_lock();
        lttng_ht_lookup(consumer_data.stream_ht,
                        (void *)((unsigned long) stream->key), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -383,16 +401,13 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        }
 
        lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
-       rcu_read_unlock();
 
        /* Check and cleanup relayd */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
-               /* We are about to modify the relayd refcount */
-               rcu_read_lock();
-               relayd->refcount++;
-               rcu_read_unlock();
+               uatomic_inc(&relayd->refcount);
        }
+       rcu_read_unlock();
 
        /* Update consumer data */
        consumer_data.stream_count++;
@@ -471,6 +486,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
 
        obj->net_seq_idx = net_seq_idx;
        obj->refcount = 0;
+       obj->destroy_flag = 0;
        lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
        pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
 
@@ -482,6 +498,8 @@ error:
  * Find a relayd socket pair in the global consumer data.
  *
  * Return the object if found else NULL.
+ * RCU read-side lock must be held across this call and while using the
+ * returned object.
  */
 struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
 {
@@ -494,8 +512,6 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
                goto error;
        }
 
-       rcu_read_lock();
-
        lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
                        &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -503,8 +519,6 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
                relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
        }
 
-       rcu_read_unlock();
-
 error:
        return relayd;
 }
@@ -528,6 +542,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
        /* Reset data header */
        memset(&data_hdr, 0, sizeof(data_hdr));
 
+       rcu_read_lock();
        /* Get relayd reference of the stream. */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd == NULL) {
@@ -549,6 +564,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                /* Set header with stream information */
                data_hdr.stream_id = htobe64(stream->relayd_stream_id);
                data_hdr.data_size = htobe32(data_size);
+               data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
                /* Other fields are zeroed previously */
 
                ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
@@ -562,6 +578,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
        }
 
 error:
+       rcu_read_unlock();
        return outfd;
 }
 
This page took 0.024752 seconds and 4 git commands to generate.