Doc: live protocol clarifications
[lttng-tools.git] / src / common / consumer.c
index 152064078d34b354263a79992e93d965a5b738a4..36ec02479edc3b74ccf70df1cc8f6a8a023770fe 100644 (file)
@@ -45,6 +45,7 @@
 
 #include "consumer.h"
 #include "consumer-stream.h"
+#include "consumer-testpoint.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -768,6 +769,44 @@ end:
        return ret;
 }
 
+/*
+ * Find a relayd and send the streams sent message
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(net_seq_idx != -1ULL);
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_streams_sent(&relayd->control_sock);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+       } else {
+               ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
+                               net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+       DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Find a relayd and close the stream
  */
@@ -1273,6 +1312,57 @@ error:
        return NULL;
 }
 
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       if (ht == NULL) {
+               return;
+       }
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_stream(stream, ht);
+       }
+       rcu_read_unlock();
+
+       lttng_ht_destroy(ht);
+}
+
+/*
+ * Iterate over all streams of the metadata hashtable and free them
+ * properly.
+ */
+static void destroy_metadata_stream_ht(struct lttng_ht *ht)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+
+       if (ht == NULL) {
+               return;
+       }
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_metadata_stream(stream, ht);
+       }
+       rcu_read_unlock();
+
+       lttng_ht_destroy(ht);
+}
+
 /*
  * Close all fds associated with the instance and free the context.
  */
@@ -1282,6 +1372,9 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 
        DBG("Consumer destroying it. Closing everything.");
 
+       destroy_data_stream_ht(data_ht);
+       destroy_metadata_stream_ht(metadata_ht);
+
        ret = close(ctx->consumer_error_socket);
        if (ret) {
                PERROR("close");
@@ -1354,7 +1447,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
@@ -1562,7 +1655,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1856,60 +1949,6 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
 }
 
-/*
- * Iterate over all streams of the hashtable and free them properly.
- *
- * WARNING: *MUST* be used with data stream only.
- */
-static void destroy_data_stream_ht(struct lttng_ht *ht)
-{
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
-       if (ht == NULL) {
-               return;
-       }
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_stream(stream, ht);
-       }
-       rcu_read_unlock();
-
-       lttng_ht_destroy(ht);
-}
-
-/*
- * Iterate over all streams of the hashtable and free them properly.
- *
- * XXX: Should not be only for metadata stream or else use an other name.
- */
-static void destroy_stream_ht(struct lttng_ht *ht)
-{
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
-       if (ht == NULL) {
-               return;
-       }
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
-               /*
-                * Ignore return value since we are currently cleaning up so any error
-                * can't be handled.
-                */
-               (void) consumer_del_metadata_stream(stream, ht);
-       }
-       rcu_read_unlock();
-
-       lttng_ht_destroy(ht);
-}
-
 void lttng_consumer_close_metadata(void)
 {
        switch (consumer_data.type) {
@@ -2216,14 +2255,12 @@ void *consumer_thread_metadata_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
 
-       health_code_update();
-
-       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
-       if (!metadata_ht) {
-               /* ENOMEM at this point. Better to bail out. */
-               goto end_ht;
+       if (testpoint(consumerd_thread_metadata)) {
+               goto error_testpoint;
        }
 
+       health_code_update();
+
        DBG("Thread metadata poll started");
 
        /* Size is set to 1 for the consumer_metadata pipe */
@@ -2396,8 +2433,7 @@ end:
 
        lttng_poll_clean(&events);
 end_poll:
-       destroy_stream_ht(metadata_ht);
-end_ht:
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2426,14 +2462,12 @@ void *consumer_thread_data_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
 
-       health_code_update();
-
-       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
-       if (data_ht == NULL) {
-               /* ENOMEM at this point. Better to bail out. */
-               goto end;
+       if (testpoint(consumerd_thread_data)) {
+               goto error_testpoint;
        }
 
+       health_code_update();
+
        local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
        if (local_stream == NULL) {
                PERROR("local_stream malloc");
@@ -2663,8 +2697,7 @@ end:
         */
        (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
 
-       destroy_data_stream_ht(data_ht);
-
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2765,6 +2798,10 @@ void *consumer_thread_channel_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
 
+       if (testpoint(consumerd_thread_channel)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
@@ -2966,6 +3003,7 @@ end:
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
+error_testpoint:
        DBG("Channel poll thread exiting");
        if (err) {
                health_error();
@@ -3021,6 +3059,10 @@ void *consumer_thread_sessiond_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
 
+       if (testpoint(consumerd_thread_sessiond)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
@@ -3157,6 +3199,7 @@ end:
                }
        }
 
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -3218,12 +3261,42 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
 /*
  * Allocate and set consumer data hash tables.
  */
-void lttng_consumer_init(void)
+int lttng_consumer_init(void)
 {
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.channel_ht) {
+               goto error;
+       }
+
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.relayd_ht) {
+               goto error;
+       }
+
        consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.stream_list_ht) {
+               goto error;
+       }
+
        consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!consumer_data.stream_per_chan_id_ht) {
+               goto error;
+       }
+
+       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!data_ht) {
+               goto error;
+       }
+
+       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!metadata_ht) {
+               goto error;
+       }
+
+       return 0;
+
+error:
+       return -1;
 }
 
 /*
This page took 0.026615 seconds and 4 git commands to generate.