X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=cbcb2f74d27f8dfb8297b8c37f04b67112f6806e;hb=282dadbc28bdf6c8565bd0cabd6a2fbffb6b9396;hp=152064078d34b354263a79992e93d965a5b738a4;hpb=0c759fc95033a3d6d7cb939f39dd643ce7e127ee;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 152064078..cbcb2f74d 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -768,6 +768,44 @@ end: return ret; } +/* + * Find a relayd and send the streams sent message + * + * Returns 0 on success, < 0 on error + */ +int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) +{ + int ret = 0; + struct consumer_relayd_sock_pair *relayd; + + assert(net_seq_idx != -1ULL); + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(net_seq_idx); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_streams_sent(&relayd->control_sock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else { + ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", + net_seq_idx); + ret = -1; + goto end; + } + + ret = 0; + DBG("All streams sent relayd id %" PRIu64, net_seq_idx); + +end: + rcu_read_unlock(); + return ret; +} + /* * Find a relayd and close the stream */ @@ -1273,6 +1311,57 @@ error: return NULL; } +/* + * Iterate over all streams of the hashtable and free them properly. + */ +static void destroy_data_stream_ht(struct lttng_ht *ht) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + +/* + * Iterate over all streams of the metadata hashtable and free them + * properly. + */ +static void destroy_metadata_stream_ht(struct lttng_ht *ht) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + /* * Close all fds associated with the instance and free the context. */ @@ -1282,6 +1371,9 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) DBG("Consumer destroying it. Closing everything."); + destroy_data_stream_ht(data_ht); + destroy_metadata_stream_ht(metadata_ht); + ret = close(ctx->consumer_error_socket); if (ret) { PERROR("close"); @@ -1354,7 +1446,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding, - struct lttng_packet_index *index) + struct ctf_packet_index *index) { unsigned long mmap_offset; void *mmap_base; @@ -1562,7 +1654,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding, - struct lttng_packet_index *index) + struct ctf_packet_index *index) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@ -1856,60 +1948,6 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } -/* - * Iterate over all streams of the hashtable and free them properly. - * - * WARNING: *MUST* be used with data stream only. - */ -static void destroy_data_stream_ht(struct lttng_ht *ht) -{ - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - - if (ht == NULL) { - return; - } - - rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_stream(stream, ht); - } - rcu_read_unlock(); - - lttng_ht_destroy(ht); -} - -/* - * Iterate over all streams of the hashtable and free them properly. - * - * XXX: Should not be only for metadata stream or else use an other name. - */ -static void destroy_stream_ht(struct lttng_ht *ht) -{ - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - - if (ht == NULL) { - return; - } - - rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_metadata_stream(stream, ht); - } - rcu_read_unlock(); - - lttng_ht_destroy(ht); -} - void lttng_consumer_close_metadata(void) { switch (consumer_data.type) { @@ -2218,12 +2256,6 @@ void *consumer_thread_metadata_poll(void *data) health_code_update(); - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (!metadata_ht) { - /* ENOMEM at this point. Better to bail out. */ - goto end_ht; - } - DBG("Thread metadata poll started"); /* Size is set to 1 for the consumer_metadata pipe */ @@ -2396,8 +2428,6 @@ end: lttng_poll_clean(&events); end_poll: - destroy_stream_ht(metadata_ht); -end_ht: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -2428,12 +2458,6 @@ void *consumer_thread_data_poll(void *data) health_code_update(); - data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (data_ht == NULL) { - /* ENOMEM at this point. Better to bail out. */ - goto end; - } - local_stream = zmalloc(sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2663,8 +2687,6 @@ end: */ (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe); - destroy_data_stream_ht(data_ht); - if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -3218,12 +3240,42 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) /* * Allocate and set consumer data hash tables. */ -void lttng_consumer_init(void) +int lttng_consumer_init(void) { consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.channel_ht) { + goto error; + } + consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.relayd_ht) { + goto error; + } + consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.stream_list_ht) { + goto error; + } + consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!consumer_data.stream_per_chan_id_ht) { + goto error; + } + + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!data_ht) { + goto error; + } + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!metadata_ht) { + goto error; + } + + return 0; + +error: + return -1; } /*