X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=36ec02479edc3b74ccf70df1cc8f6a8a023770fe;hb=13b425222b8ba84f33797aa3dd3b77ccecaa3478;hp=f47d8de1b2669665a4a138a8b0fcbcdb65dc3b3d;hpb=a4baae1b0463bc4ce65c2a458c4a941e7fabc594;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index f47d8de1b..36ec02479 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -45,6 +45,7 @@ #include "consumer.h" #include "consumer-stream.h" +#include "consumer-testpoint.h" struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -1311,6 +1312,57 @@ error: return NULL; } +/* + * Iterate over all streams of the hashtable and free them properly. + */ +static void destroy_data_stream_ht(struct lttng_ht *ht) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + +/* + * Iterate over all streams of the metadata hashtable and free them + * properly. + */ +static void destroy_metadata_stream_ht(struct lttng_ht *ht) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + /* * Close all fds associated with the instance and free the context. */ @@ -1320,6 +1372,9 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) DBG("Consumer destroying it. Closing everything."); + destroy_data_stream_ht(data_ht); + destroy_metadata_stream_ht(metadata_ht); + ret = close(ctx->consumer_error_socket); if (ret) { PERROR("close"); @@ -1894,60 +1949,6 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } -/* - * Iterate over all streams of the hashtable and free them properly. - * - * WARNING: *MUST* be used with data stream only. - */ -static void destroy_data_stream_ht(struct lttng_ht *ht) -{ - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - - if (ht == NULL) { - return; - } - - rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_stream(stream, ht); - } - rcu_read_unlock(); - - lttng_ht_destroy(ht); -} - -/* - * Iterate over all streams of the hashtable and free them properly. - * - * XXX: Should not be only for metadata stream or else use an other name. - */ -static void destroy_stream_ht(struct lttng_ht *ht) -{ - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - - if (ht == NULL) { - return; - } - - rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_metadata_stream(stream, ht); - } - rcu_read_unlock(); - - lttng_ht_destroy(ht); -} - void lttng_consumer_close_metadata(void) { switch (consumer_data.type) { @@ -2254,14 +2255,12 @@ void *consumer_thread_metadata_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA); - health_code_update(); - - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (!metadata_ht) { - /* ENOMEM at this point. Better to bail out. */ - goto end_ht; + if (testpoint(consumerd_thread_metadata)) { + goto error_testpoint; } + health_code_update(); + DBG("Thread metadata poll started"); /* Size is set to 1 for the consumer_metadata pipe */ @@ -2434,8 +2433,7 @@ end: lttng_poll_clean(&events); end_poll: - destroy_stream_ht(metadata_ht); -end_ht: +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -2464,14 +2462,12 @@ void *consumer_thread_data_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA); - health_code_update(); - - data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (data_ht == NULL) { - /* ENOMEM at this point. Better to bail out. */ - goto end; + if (testpoint(consumerd_thread_data)) { + goto error_testpoint; } + health_code_update(); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2701,8 +2697,7 @@ end: */ (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe); - destroy_data_stream_ht(data_ht); - +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -2803,6 +2798,10 @@ void *consumer_thread_channel_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL); + if (testpoint(consumerd_thread_channel)) { + goto error_testpoint; + } + health_code_update(); channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); @@ -3004,6 +3003,7 @@ end: end_poll: destroy_channel_ht(channel_ht); end_ht: +error_testpoint: DBG("Channel poll thread exiting"); if (err) { health_error(); @@ -3059,6 +3059,10 @@ void *consumer_thread_sessiond_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND); + if (testpoint(consumerd_thread_sessiond)) { + goto error_testpoint; + } + health_code_update(); DBG("Creating command socket %s", ctx->consumer_command_sock_path); @@ -3195,6 +3199,7 @@ end: } } +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -3256,12 +3261,42 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) /* * Allocate and set consumer data hash tables. */ -void lttng_consumer_init(void) +int lttng_consumer_init(void) { consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.channel_ht) { + goto error; + } + consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.relayd_ht) { + goto error; + } + consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.stream_list_ht) { + goto error; + } + consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.stream_per_chan_id_ht) { + goto error; + } + + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!data_ht) { + goto error; + } + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!metadata_ht) { + goto error; + } + + return 0; + +error: + return -1; } /*