X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=35df86dd81c922217a6dbf628af93fd79d5b3432;hp=8b43257bd134987a3d0811320276010b724579b3;hb=58b1f4255ea457f2965f31b84205cb0eec21e71f;hpb=4bb94b7597f56f5200ebd6a88e906488172241fb diff --git a/src/common/consumer.c b/src/common/consumer.c index 8b43257bd..35df86dd8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -58,6 +58,17 @@ int consumer_poll_timeout = -1; */ volatile int consumer_quit = 0; +/* + * The following two hash tables are visible by all threads which are separated + * in different source files. + * + * Global hash table containing respectively metadata and data streams. The + * stream element in this ht should only be updated by the metadata poll thread + * for the metadata and the data poll thread for the data. + */ +struct lttng_ht *metadata_ht = NULL; +struct lttng_ht *data_ht = NULL; + /* * Find a stream. The consumer_data.lock must be locked during this * call. @@ -89,7 +100,7 @@ static struct lttng_consumer_stream *consumer_find_stream(int key, return stream; } -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) +void consumer_steal_stream_key(int key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; @@ -161,17 +172,6 @@ void consumer_free_stream(struct rcu_head *head) free(stream); } -static -void consumer_free_metadata_stream(struct rcu_head *head) -{ - struct lttng_ht_node_ulong *node = - caa_container_of(head, struct lttng_ht_node_ulong, head); - struct lttng_consumer_stream *stream = - caa_container_of(node, struct lttng_consumer_stream, waitfd_node); - - free(stream); -} - /* * RCU protected relayd socket pair free. */ @@ -406,8 +406,25 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->metadata_flag = metadata_flag; strncpy(stream->path_name, path_name, sizeof(stream->path_name)); stream->path_name[sizeof(stream->path_name) - 1] = '\0'; - lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); - lttng_ht_node_init_ulong(&stream->node, stream->key); + + /* + * Index differently the metadata node because the thread is using an + * internal hash table to match streams in the metadata_ht to the epoll set + * file descriptor. + */ + if (metadata_flag) { + lttng_ht_node_init_ulong(&stream->node, stream->wait_fd); + } else { + lttng_ht_node_init_ulong(&stream->node, stream->key); + } + + /* + * The cpu number is needed before using any ustctl_* actions. Ignored for + * the kernel so the value does not matter. + */ + pthread_mutex_lock(&consumer_data.lock); + stream->cpu = stream->chan->cpucount++; + pthread_mutex_unlock(&consumer_data.lock); DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu," " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, @@ -425,41 +442,24 @@ end: /* * Add a stream to the global list protected by a mutex. */ -int consumer_add_stream(struct lttng_consumer_stream *stream) +static int consumer_add_stream(struct lttng_consumer_stream *stream, + struct lttng_ht *ht) { int ret = 0; struct consumer_relayd_sock_pair *relayd; assert(stream); + assert(ht); DBG3("Adding consumer stream %d", stream->key); pthread_mutex_lock(&consumer_data.lock); rcu_read_lock(); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - stream->cpu = stream->chan->cpucount++; - ret = lttng_ustconsumer_add_stream(stream); - if (ret) { - ret = -EINVAL; - goto error; - } + /* Steal stream identifier to avoid having streams with the same key */ + consumer_steal_stream_key(stream->key, ht); - /* Steal stream identifier only for UST */ - consumer_steal_stream_key(stream->key, consumer_data.stream_ht); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - goto error; - } - - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); + lttng_ht_add_unique_ulong(ht, &stream->node); /* Check and cleanup relayd */ relayd = consumer_find_relayd(stream->net_seq_idx); @@ -485,7 +485,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) consumer_data.stream_count++; consumer_data.need_update = 1; -error: rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); @@ -798,9 +797,9 @@ end: * * Returns the number of fds in the structures. */ -int consumer_update_poll_array( +static int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_stream) + struct lttng_consumer_stream **local_stream, struct lttng_ht *ht) { int i = 0; struct lttng_ht_iter iter; @@ -808,8 +807,7 @@ int consumer_update_poll_array( DBG("Updating poll fd array"); rcu_read_lock(); - cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, - node.node) { + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) { continue; } @@ -1533,6 +1531,33 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } +/* + * Iterate over all streams of the hashtable and free them properly. + * + * WARNING: *MUST* be used with data stream only. + */ +static void destroy_data_stream_ht(struct lttng_ht *ht) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + ret = lttng_ht_del(ht, &iter); + assert(!ret); + + call_rcu(&stream->node.head, consumer_free_stream); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + /* * Iterate over all streams of the hashtable and free them properly. * @@ -1549,11 +1574,11 @@ static void destroy_stream_ht(struct lttng_ht *ht) } rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, waitfd_node.node) { + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { ret = lttng_ht_del(ht, &iter); assert(!ret); - call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream); + call_rcu(&stream->node.head, consumer_free_stream); } rcu_read_unlock(); @@ -1585,12 +1610,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto free_stream; } - rcu_read_lock(); - iter.iter.node = &stream->waitfd_node.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); - rcu_read_unlock(); - pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -1611,6 +1630,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto end; } + rcu_read_lock(); + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(ht, &iter); + assert(!ret); + rcu_read_unlock(); + if (stream->out_fd >= 0) { ret = close(stream->out_fd); if (ret) { @@ -1677,7 +1702,7 @@ end: } free_stream: - call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream); + call_rcu(&stream->node.head, consumer_free_stream); } /* @@ -1697,27 +1722,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_add_stream(stream); - if (ret) { - ret = -EINVAL; - goto error; - } - - /* Steal stream identifier only for UST */ - consumer_steal_stream_key(stream->wait_fd, ht); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - goto error; - } - /* * From here, refcounts are updated so be _careful_ when returning an error * after this point. @@ -1744,10 +1748,12 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, uatomic_dec(&stream->chan->nb_init_streams); } - lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); + /* Steal stream identifier to avoid having streams with the same key */ + consumer_steal_stream_key(stream->key, ht); + + lttng_ht_add_unique_ulong(ht, &stream->node); rcu_read_unlock(); -error: pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -1756,14 +1762,13 @@ error: * Thread polls on metadata file descriptor and write them on disk or on the * network. */ -void *lttng_consumer_thread_poll_metadata(void *data) +void *consumer_thread_metadata_poll(void *data) { int ret, i, pollfd; uint32_t revents, nb_fd; struct lttng_consumer_stream *stream = NULL; struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; - struct lttng_ht *metadata_ht = NULL; struct lttng_poll_event events; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -1772,11 +1777,6 @@ void *lttng_consumer_thread_poll_metadata(void *data) DBG("Thread metadata poll started"); - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - if (metadata_ht == NULL) { - goto end; - } - /* Size is set to 1 for the consumer_metadata pipe */ ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); if (ret < 0) { @@ -1876,7 +1876,7 @@ restart: assert(node); stream = caa_container_of(node, struct lttng_consumer_stream, - waitfd_node); + node); /* Check for error event */ if (revents & (LPOLLERR | LPOLLHUP)) { @@ -1912,7 +1912,7 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* It's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { rcu_read_unlock(); goto end; } else if (len > 0) { @@ -1942,26 +1942,21 @@ end: * This thread polls the fds in the set to consume the data and write * it to tracefile if necessary. */ -void *lttng_consumer_thread_poll_fds(void *data) +void *consumer_thread_data_poll(void *data) { int num_rdy, num_hup, high_prio, ret, i; struct pollfd *pollfd = NULL; /* local view of the streams */ - struct lttng_consumer_stream **local_stream = NULL; + struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; - pthread_t metadata_thread; - void *status; rcu_register_thread(); - /* Start metadata polling thread */ - ret = pthread_create(&metadata_thread, NULL, - lttng_consumer_thread_poll_metadata, (void *) ctx); - if (ret < 0) { - PERROR("pthread_create metadata thread"); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (data_ht == NULL) { goto end; } @@ -2002,7 +1997,8 @@ void *lttng_consumer_thread_poll_fds(void *data) pthread_mutex_unlock(&consumer_data.lock); goto end; } - ret = consumer_update_poll_array(ctx, &pollfd, local_stream); + ret = consumer_update_poll_array(ctx, &pollfd, local_stream, + data_ht); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2045,13 +2041,35 @@ void *lttng_consumer_thread_poll_fds(void *data) */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { size_t pipe_readlen; - char tmp; DBG("consumer_poll_pipe wake up"); /* Consume 1 byte of pipe data */ do { - pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1); + pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream, + sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); + + /* + * If the stream is NULL, just ignore it. It's also possible that + * the sessiond poll thread changed the consumer_quit state and is + * waking us up to test it. + */ + if (new_stream == NULL) { + continue; + } + + ret = consumer_add_stream(new_stream, data_ht); + if (ret) { + ERR("Consumer add stream %d failed. Continuing", + new_stream->key); + /* + * At this point, if the add_stream fails, it is not in the + * hash table thus passing the NULL value here. + */ + consumer_del_stream(new_stream, NULL); + } + + /* Continue to update the local streams and handle prio ones */ continue; } @@ -2062,7 +2080,7 @@ void *lttng_consumer_thread_poll_fds(void *data) high_prio = 1; len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { goto end; } else if (len > 0) { local_stream[i]->data_read = 1; @@ -2085,7 +2103,7 @@ void *lttng_consumer_thread_poll_fds(void *data) DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { goto end; } else if (len > 0) { local_stream[i]->data_read = 1; @@ -2113,22 +2131,19 @@ void *lttng_consumer_thread_poll_fds(void *data) if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } @@ -2148,18 +2163,16 @@ end: /* * Close the write side of the pipe so epoll_wait() in - * lttng_consumer_thread_poll_metadata can catch it. The thread is - * monitoring the read side of the pipe. If we close them both, epoll_wait - * strangely does not return and could create a endless wait period if the - * pipe is the only tracked fd in the poll set. The thread will take care - * of closing the read side. + * consumer_thread_metadata_poll can catch it. The thread is monitoring the + * read side of the pipe. If we close them both, epoll_wait strangely does + * not return and could create a endless wait period if the pipe is the + * only tracked fd in the poll set. The thread will take care of closing + * the read side. */ close(ctx->consumer_metadata_pipe[1]); - if (ret) { - ret = pthread_join(metadata_thread, &status); - if (ret < 0) { - PERROR("pthread_join metadata thread"); - } + + if (data_ht) { + destroy_data_stream_ht(data_ht); } rcu_unregister_thread(); @@ -2170,7 +2183,7 @@ end: * This thread listens on the consumerd socket and receives the file * descriptors from the session daemon. */ -void *lttng_consumer_thread_receive_fds(void *data) +void *consumer_thread_sessiond_poll(void *data) { int sock, client_socket, ret; /* @@ -2277,19 +2290,16 @@ end: consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). + * Notify the data poll thread to poll back again and test the + * consumer_quit state to quit gracefully. */ do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); + struct lttng_consumer_stream *null_stream = NULL; + + ret = write(ctx->consumer_poll_pipe[1], &null_stream, + sizeof(null_stream)); } while (ret < 0 && errno == EINTR); + rcu_unregister_thread(); return NULL; } @@ -2333,6 +2343,11 @@ void lttng_consumer_init(void) consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + assert(metadata_ht); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + assert(data_ht); } /*