X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=35df86dd81c922217a6dbf628af93fd79d5b3432;hp=be78e256f489661eda948fc8b143a25b69c23a0c;hb=58b1f4255ea457f2965f31b84205cb0eec21e71f;hpb=c869f647b0c4476645ab9ee01e362401fb8c1e42 diff --git a/src/common/consumer.c b/src/common/consumer.c index be78e256f..35df86dd8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -58,6 +58,17 @@ int consumer_poll_timeout = -1; */ volatile int consumer_quit = 0; +/* + * The following two hash tables are visible by all threads which are separated + * in different source files. + * + * Global hash table containing respectively metadata and data streams. The + * stream element in this ht should only be updated by the metadata poll thread + * for the metadata and the data poll thread for the data. + */ +struct lttng_ht *metadata_ht = NULL; +struct lttng_ht *data_ht = NULL; + /* * Find a stream. The consumer_data.lock must be locked during this * call. @@ -161,17 +172,6 @@ void consumer_free_stream(struct rcu_head *head) free(stream); } -static -void consumer_free_metadata_stream(struct rcu_head *head) -{ - struct lttng_ht_node_ulong *node = - caa_container_of(head, struct lttng_ht_node_ulong, head); - struct lttng_consumer_stream *stream = - caa_container_of(node, struct lttng_consumer_stream, waitfd_node); - - free(stream); -} - /* * RCU protected relayd socket pair free. */ @@ -406,8 +406,17 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->metadata_flag = metadata_flag; strncpy(stream->path_name, path_name, sizeof(stream->path_name)); stream->path_name[sizeof(stream->path_name) - 1] = '\0'; - lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); - lttng_ht_node_init_ulong(&stream->node, stream->key); + + /* + * Index differently the metadata node because the thread is using an + * internal hash table to match streams in the metadata_ht to the epoll set + * file descriptor. + */ + if (metadata_flag) { + lttng_ht_node_init_ulong(&stream->node, stream->wait_fd); + } else { + lttng_ht_node_init_ulong(&stream->node, stream->key); + } /* * The cpu number is needed before using any ustctl_* actions. Ignored for @@ -433,19 +442,24 @@ end: /* * Add a stream to the global list protected by a mutex. */ -int consumer_add_stream(struct lttng_consumer_stream *stream) +static int consumer_add_stream(struct lttng_consumer_stream *stream, + struct lttng_ht *ht) { int ret = 0; struct consumer_relayd_sock_pair *relayd; assert(stream); + assert(ht); DBG3("Adding consumer stream %d", stream->key); pthread_mutex_lock(&consumer_data.lock); rcu_read_lock(); - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); + /* Steal stream identifier to avoid having streams with the same key */ + consumer_steal_stream_key(stream->key, ht); + + lttng_ht_add_unique_ulong(ht, &stream->node); /* Check and cleanup relayd */ relayd = consumer_find_relayd(stream->net_seq_idx); @@ -783,9 +797,9 @@ end: * * Returns the number of fds in the structures. */ -int consumer_update_poll_array( +static int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_stream) + struct lttng_consumer_stream **local_stream, struct lttng_ht *ht) { int i = 0; struct lttng_ht_iter iter; @@ -793,8 +807,7 @@ int consumer_update_poll_array( DBG("Updating poll fd array"); rcu_read_lock(); - cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, - node.node) { + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) { continue; } @@ -1518,6 +1531,33 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } +/* + * Iterate over all streams of the hashtable and free them properly. + * + * WARNING: *MUST* be used with data stream only. + */ +static void destroy_data_stream_ht(struct lttng_ht *ht) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + ret = lttng_ht_del(ht, &iter); + assert(!ret); + + call_rcu(&stream->node.head, consumer_free_stream); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + /* * Iterate over all streams of the hashtable and free them properly. * @@ -1534,11 +1574,11 @@ static void destroy_stream_ht(struct lttng_ht *ht) } rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, waitfd_node.node) { + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { ret = lttng_ht_del(ht, &iter); assert(!ret); - call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream); + call_rcu(&stream->node.head, consumer_free_stream); } rcu_read_unlock(); @@ -1591,7 +1631,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, } rcu_read_lock(); - iter.iter.node = &stream->waitfd_node.node; + iter.iter.node = &stream->node.node; ret = lttng_ht_del(ht, &iter); assert(!ret); rcu_read_unlock(); @@ -1662,7 +1702,7 @@ end: } free_stream: - call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream); + call_rcu(&stream->node.head, consumer_free_stream); } /* @@ -1708,7 +1748,10 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, uatomic_dec(&stream->chan->nb_init_streams); } - lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); + /* Steal stream identifier to avoid having streams with the same key */ + consumer_steal_stream_key(stream->key, ht); + + lttng_ht_add_unique_ulong(ht, &stream->node); rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); @@ -1726,7 +1769,6 @@ void *consumer_thread_metadata_poll(void *data) struct lttng_consumer_stream *stream = NULL; struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; - struct lttng_ht *metadata_ht = NULL; struct lttng_poll_event events; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -1735,11 +1777,6 @@ void *consumer_thread_metadata_poll(void *data) DBG("Thread metadata poll started"); - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - if (metadata_ht == NULL) { - goto end; - } - /* Size is set to 1 for the consumer_metadata pipe */ ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); if (ret < 0) { @@ -1839,7 +1876,7 @@ restart: assert(node); stream = caa_container_of(node, struct lttng_consumer_stream, - waitfd_node); + node); /* Check for error event */ if (revents & (LPOLLERR | LPOLLHUP)) { @@ -1918,6 +1955,11 @@ void *consumer_thread_data_poll(void *data) rcu_register_thread(); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (data_ht == NULL) { + goto end; + } + local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); while (1) { @@ -1955,7 +1997,8 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); goto end; } - ret = consumer_update_poll_array(ctx, &pollfd, local_stream); + ret = consumer_update_poll_array(ctx, &pollfd, local_stream, + data_ht); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2015,7 +2058,7 @@ void *consumer_thread_data_poll(void *data) continue; } - ret = consumer_add_stream(new_stream); + ret = consumer_add_stream(new_stream, data_ht); if (ret) { ERR("Consumer add stream %d failed. Continuing", new_stream->key); @@ -2088,22 +2131,19 @@ void *consumer_thread_data_poll(void *data) if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } @@ -2131,6 +2171,10 @@ end: */ close(ctx->consumer_metadata_pipe[1]); + if (data_ht) { + destroy_data_stream_ht(data_ht); + } + rcu_unregister_thread(); return NULL; } @@ -2299,6 +2343,11 @@ void lttng_consumer_init(void) consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + assert(metadata_ht); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + assert(data_ht); } /*