X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=8d897e5df1e20e20c67ddbc6ed8f99af7e7de2aa;hp=8b43257bd134987a3d0811320276010b724579b3;hb=c8f59ee5fc11492ef472dc5cfd2fd2c4926b1787;hpb=4bb94b7597f56f5200ebd6a88e906488172241fb diff --git a/src/common/consumer.c b/src/common/consumer.c index 8b43257bd..8d897e5df 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -58,6 +58,17 @@ int consumer_poll_timeout = -1; */ volatile int consumer_quit = 0; +/* + * The following two hash tables are visible by all threads which are separated + * in different source files. + * + * Global hash table containing respectively metadata and data streams. The + * stream element in this ht should only be updated by the metadata poll thread + * for the metadata and the data poll thread for the data. + */ +struct lttng_ht *metadata_ht = NULL; +struct lttng_ht *data_ht = NULL; + /* * Find a stream. The consumer_data.lock must be locked during this * call. @@ -89,7 +100,7 @@ static struct lttng_consumer_stream *consumer_find_stream(int key, return stream; } -static void consumer_steal_stream_key(int key, struct lttng_ht *ht) +void consumer_steal_stream_key(int key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; @@ -161,17 +172,6 @@ void consumer_free_stream(struct rcu_head *head) free(stream); } -static -void consumer_free_metadata_stream(struct rcu_head *head) -{ - struct lttng_ht_node_ulong *node = - caa_container_of(head, struct lttng_ht_node_ulong, head); - struct lttng_consumer_stream *stream = - caa_container_of(node, struct lttng_consumer_stream, waitfd_node); - - free(stream); -} - /* * RCU protected relayd socket pair free. */ @@ -282,15 +282,15 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, ret = lttng_ht_del(ht, &iter); assert(!ret); + /* Remove node session id from the consumer_data stream ht */ + iter.iter.node = &stream->node_session_id.node; + ret = lttng_ht_del(consumer_data.stream_list_ht, &iter); + assert(!ret); rcu_read_unlock(); - if (consumer_data.stream_count <= 0) { - goto end; - } + assert(consumer_data.stream_count > 0); consumer_data.stream_count--; - if (!stream) { - goto end; - } + if (stream->out_fd >= 0) { ret = close(stream->out_fd); if (ret) { @@ -369,6 +369,7 @@ struct lttng_consumer_stream *consumer_allocate_stream( gid_t gid, int net_index, int metadata_flag, + uint64_t session_id, int *alloc_ret) { struct lttng_consumer_stream *stream; @@ -404,16 +405,38 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->gid = gid; stream->net_seq_idx = net_index; stream->metadata_flag = metadata_flag; + stream->session_id = session_id; strncpy(stream->path_name, path_name, sizeof(stream->path_name)); stream->path_name[sizeof(stream->path_name) - 1] = '\0'; - lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); - lttng_ht_node_init_ulong(&stream->node, stream->key); + pthread_mutex_init(&stream->lock, NULL); + + /* + * Index differently the metadata node because the thread is using an + * internal hash table to match streams in the metadata_ht to the epoll set + * file descriptor. + */ + if (metadata_flag) { + lttng_ht_node_init_ulong(&stream->node, stream->wait_fd); + } else { + lttng_ht_node_init_ulong(&stream->node, stream->key); + } + + /* Init session id node with the stream session id */ + lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id); + + /* + * The cpu number is needed before using any ustctl_* actions. Ignored for + * the kernel so the value does not matter. + */ + pthread_mutex_lock(&consumer_data.lock); + stream->cpu = stream->chan->cpucount++; + pthread_mutex_unlock(&consumer_data.lock); DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu," - " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, - stream->shm_fd, stream->wait_fd, + " out_fd %d, net_seq_idx %d, session_id %" PRIu64, + stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, (unsigned long long) stream->mmap_len, stream->out_fd, - stream->net_seq_idx); + stream->net_seq_idx, stream->session_id); return stream; error: @@ -425,41 +448,31 @@ end: /* * Add a stream to the global list protected by a mutex. */ -int consumer_add_stream(struct lttng_consumer_stream *stream) +static int consumer_add_stream(struct lttng_consumer_stream *stream, + struct lttng_ht *ht) { int ret = 0; struct consumer_relayd_sock_pair *relayd; assert(stream); + assert(ht); DBG3("Adding consumer stream %d", stream->key); pthread_mutex_lock(&consumer_data.lock); rcu_read_lock(); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - stream->cpu = stream->chan->cpucount++; - ret = lttng_ustconsumer_add_stream(stream); - if (ret) { - ret = -EINVAL; - goto error; - } + /* Steal stream identifier to avoid having streams with the same key */ + consumer_steal_stream_key(stream->key, ht); - /* Steal stream identifier only for UST */ - consumer_steal_stream_key(stream->key, consumer_data.stream_ht); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - goto error; - } + lttng_ht_add_unique_ulong(ht, &stream->node); - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); + /* + * Add stream to the stream_list_ht of the consumer data. No need to steal + * the key since the HT does not use it and we allow to add redundant keys + * into this table. + */ + lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id); /* Check and cleanup relayd */ relayd = consumer_find_relayd(stream->net_seq_idx); @@ -485,7 +498,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) consumer_data.stream_count++; consumer_data.need_update = 1; -error: rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); @@ -629,23 +641,6 @@ error: return outfd; } -/* - * Update a stream according to what we just received. - */ -void consumer_change_stream_state(int stream_key, - enum lttng_consumer_stream_state state) -{ - struct lttng_consumer_stream *stream; - - pthread_mutex_lock(&consumer_data.lock); - stream = consumer_find_stream(stream_key, consumer_data.stream_ht); - if (stream) { - stream->state = state; - } - consumer_data.need_update = 1; - pthread_mutex_unlock(&consumer_data.lock); -} - static void consumer_free_channel(struct rcu_head *head) { @@ -798,9 +793,9 @@ end: * * Returns the number of fds in the structures. */ -int consumer_update_poll_array( +static int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_stream) + struct lttng_consumer_stream **local_stream, struct lttng_ht *ht) { int i = 0; struct lttng_ht_iter iter; @@ -808,8 +803,7 @@ int consumer_update_poll_array( DBG("Updating poll fd array"); rcu_read_lock(); - cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, - node.node) { + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) { continue; } @@ -822,10 +816,10 @@ int consumer_update_poll_array( rcu_read_unlock(); /* - * Insert the consumer_poll_pipe at the end of the array and don't + * Insert the consumer_data_pipe at the end of the array and don't * increment i so nb_fd is the number of real FD. */ - (*pollfd)[i].fd = ctx->consumer_poll_pipe[0]; + (*pollfd)[i].fd = ctx->consumer_data_pipe[0]; (*pollfd)[i].events = POLLIN | POLLPRI; return i; } @@ -904,17 +898,6 @@ void lttng_consumer_cleanup(void) rcu_read_lock(); - /* - * close all outfd. Called when there are no more threads running (after - * joining on the threads), no need to protect list iteration with mutex. - */ - cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node, - node) { - struct lttng_consumer_stream *stream = - caa_container_of(node, struct lttng_consumer_stream, node); - consumer_del_stream(stream, consumer_data.stream_ht); - } - cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node, node) { struct lttng_consumer_channel *channel = @@ -924,7 +907,6 @@ void lttng_consumer_cleanup(void) rcu_read_unlock(); - lttng_ht_destroy(consumer_data.stream_ht); lttng_ht_destroy(consumer_data.channel_ht); } @@ -1022,21 +1004,21 @@ struct lttng_consumer_local_data *lttng_consumer_create( ctx->on_recv_stream = recv_stream; ctx->on_update_stream = update_stream; - ret = pipe(ctx->consumer_poll_pipe); + ret = pipe(ctx->consumer_data_pipe); if (ret < 0) { PERROR("Error creating poll pipe"); goto error_poll_pipe; } /* set read end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK); + ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK); if (ret < 0) { PERROR("fcntl O_NONBLOCK"); goto error_poll_fcntl; } /* set write end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK); + ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK); if (ret < 0) { PERROR("fcntl O_NONBLOCK"); goto error_poll_fcntl; @@ -1084,7 +1066,7 @@ error_quit_pipe: for (i = 0; i < 2; i++) { int err; - err = close(ctx->consumer_poll_pipe[i]); + err = close(ctx->consumer_data_pipe[i]); if (err) { PERROR("close"); } @@ -1114,11 +1096,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) if (ret) { PERROR("close"); } - ret = close(ctx->consumer_poll_pipe[0]); + ret = close(ctx->consumer_data_pipe[0]); if (ret) { PERROR("close"); } - ret = close(ctx->consumer_poll_pipe[1]); + ret = close(ctx->consumer_data_pipe[1]); if (ret) { PERROR("close"); } @@ -1187,6 +1169,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1282,6 +1266,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( lttng_consumer_sync_trace_file(stream, orig_offset); end: + pthread_mutex_unlock(&stream->lock); /* Unlock only if ctrl socket used */ if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); @@ -1325,6 +1310,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1464,6 +1451,7 @@ splice_error: } end: + pthread_mutex_unlock(&stream->lock); if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } @@ -1533,6 +1521,33 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } +/* + * Iterate over all streams of the hashtable and free them properly. + * + * WARNING: *MUST* be used with data stream only. + */ +static void destroy_data_stream_ht(struct lttng_ht *ht) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + + if (ht == NULL) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { + ret = lttng_ht_del(ht, &iter); + assert(!ret); + + call_rcu(&stream->node.head, consumer_free_stream); + } + rcu_read_unlock(); + + lttng_ht_destroy(ht); +} + /* * Iterate over all streams of the hashtable and free them properly. * @@ -1549,11 +1564,11 @@ static void destroy_stream_ht(struct lttng_ht *ht) } rcu_read_lock(); - cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, waitfd_node.node) { + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { ret = lttng_ht_del(ht, &iter); assert(!ret); - call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream); + call_rcu(&stream->node.head, consumer_free_stream); } rcu_read_unlock(); @@ -1585,12 +1600,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto free_stream; } - rcu_read_lock(); - iter.iter.node = &stream->waitfd_node.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); - rcu_read_unlock(); - pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -1611,6 +1620,17 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, goto end; } + rcu_read_lock(); + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(ht, &iter); + assert(!ret); + + /* Remove node session id from the consumer_data stream ht */ + iter.iter.node = &stream->node_session_id.node; + ret = lttng_ht_del(consumer_data.stream_list_ht, &iter); + assert(!ret); + rcu_read_unlock(); + if (stream->out_fd >= 0) { ret = close(stream->out_fd); if (ret) { @@ -1677,7 +1697,7 @@ end: } free_stream: - call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream); + call_rcu(&stream->node.head, consumer_free_stream); } /* @@ -1697,27 +1717,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_add_stream(stream); - if (ret) { - ret = -EINVAL; - goto error; - } - - /* Steal stream identifier only for UST */ - consumer_steal_stream_key(stream->wait_fd, ht); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - goto error; - } - /* * From here, refcounts are updated so be _careful_ when returning an error * after this point. @@ -1744,10 +1743,20 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, uatomic_dec(&stream->chan->nb_init_streams); } - lttng_ht_add_unique_ulong(ht, &stream->waitfd_node); + /* Steal stream identifier to avoid having streams with the same key */ + consumer_steal_stream_key(stream->key, ht); + + lttng_ht_add_unique_ulong(ht, &stream->node); + + /* + * Add stream to the stream_list_ht of the consumer data. No need to steal + * the key since the HT does not use it and we allow to add redundant keys + * into this table. + */ + lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id); + rcu_read_unlock(); -error: pthread_mutex_unlock(&consumer_data.lock); return ret; } @@ -1756,14 +1765,13 @@ error: * Thread polls on metadata file descriptor and write them on disk or on the * network. */ -void *lttng_consumer_thread_poll_metadata(void *data) +void *consumer_thread_metadata_poll(void *data) { int ret, i, pollfd; uint32_t revents, nb_fd; struct lttng_consumer_stream *stream = NULL; struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; - struct lttng_ht *metadata_ht = NULL; struct lttng_poll_event events; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -1772,11 +1780,6 @@ void *lttng_consumer_thread_poll_metadata(void *data) DBG("Thread metadata poll started"); - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - if (metadata_ht == NULL) { - goto end; - } - /* Size is set to 1 for the consumer_metadata pipe */ ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); if (ret < 0) { @@ -1876,7 +1879,7 @@ restart: assert(node); stream = caa_container_of(node, struct lttng_consumer_stream, - waitfd_node); + node); /* Check for error event */ if (revents & (LPOLLERR | LPOLLHUP)) { @@ -1912,7 +1915,7 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* It's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { rcu_read_unlock(); goto end; } else if (len > 0) { @@ -1942,26 +1945,21 @@ end: * This thread polls the fds in the set to consume the data and write * it to tracefile if necessary. */ -void *lttng_consumer_thread_poll_fds(void *data) +void *consumer_thread_data_poll(void *data) { int num_rdy, num_hup, high_prio, ret, i; struct pollfd *pollfd = NULL; /* local view of the streams */ - struct lttng_consumer_stream **local_stream = NULL; + struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; - pthread_t metadata_thread; - void *status; rcu_register_thread(); - /* Start metadata polling thread */ - ret = pthread_create(&metadata_thread, NULL, - lttng_consumer_thread_poll_metadata, (void *) ctx); - if (ret < 0) { - PERROR("pthread_create metadata thread"); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (data_ht == NULL) { goto end; } @@ -1986,7 +1984,7 @@ void *lttng_consumer_thread_poll_fds(void *data) local_stream = NULL; } - /* allocate for all fds + 1 for the consumer_poll_pipe */ + /* allocate for all fds + 1 for the consumer_data_pipe */ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); @@ -1994,7 +1992,7 @@ void *lttng_consumer_thread_poll_fds(void *data) goto end; } - /* allocate for all fds + 1 for the consumer_poll_pipe */ + /* allocate for all fds + 1 for the consumer_data_pipe */ local_stream = zmalloc((consumer_data.stream_count + 1) * sizeof(struct lttng_consumer_stream)); if (local_stream == NULL) { @@ -2002,7 +2000,8 @@ void *lttng_consumer_thread_poll_fds(void *data) pthread_mutex_unlock(&consumer_data.lock); goto end; } - ret = consumer_update_poll_array(ctx, &pollfd, local_stream); + ret = consumer_update_poll_array(ctx, &pollfd, local_stream, + data_ht); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2039,19 +2038,41 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* - * If the consumer_poll_pipe triggered poll go directly to the + * If the consumer_data_pipe triggered poll go directly to the * beginning of the loop to update the array. We want to prioritize * array update over low-priority reads. */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { size_t pipe_readlen; - char tmp; - DBG("consumer_poll_pipe wake up"); + DBG("consumer_data_pipe wake up"); /* Consume 1 byte of pipe data */ do { - pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1); + pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, + sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); + + /* + * If the stream is NULL, just ignore it. It's also possible that + * the sessiond poll thread changed the consumer_quit state and is + * waking us up to test it. + */ + if (new_stream == NULL) { + continue; + } + + ret = consumer_add_stream(new_stream, data_ht); + if (ret) { + ERR("Consumer add stream %d failed. Continuing", + new_stream->key); + /* + * At this point, if the add_stream fails, it is not in the + * hash table thus passing the NULL value here. + */ + consumer_del_stream(new_stream, NULL); + } + + /* Continue to update the local streams and handle prio ones */ continue; } @@ -2062,7 +2083,7 @@ void *lttng_consumer_thread_poll_fds(void *data) high_prio = 1; len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { goto end; } else if (len > 0) { local_stream[i]->data_read = 1; @@ -2085,7 +2106,7 @@ void *lttng_consumer_thread_poll_fds(void *data) DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN) { + if (len < 0 && len != -EAGAIN && len != -ENODATA) { goto end; } else if (len > 0) { local_stream[i]->data_read = 1; @@ -2113,22 +2134,19 @@ void *lttng_consumer_thread_poll_fds(void *data) if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->data_read) { - consumer_del_stream(local_stream[i], - consumer_data.stream_ht); + consumer_del_stream(local_stream[i], data_ht); num_hup++; } } @@ -2148,18 +2166,16 @@ end: /* * Close the write side of the pipe so epoll_wait() in - * lttng_consumer_thread_poll_metadata can catch it. The thread is - * monitoring the read side of the pipe. If we close them both, epoll_wait - * strangely does not return and could create a endless wait period if the - * pipe is the only tracked fd in the poll set. The thread will take care - * of closing the read side. + * consumer_thread_metadata_poll can catch it. The thread is monitoring the + * read side of the pipe. If we close them both, epoll_wait strangely does + * not return and could create a endless wait period if the pipe is the + * only tracked fd in the poll set. The thread will take care of closing + * the read side. */ close(ctx->consumer_metadata_pipe[1]); - if (ret) { - ret = pthread_join(metadata_thread, &status); - if (ret < 0) { - PERROR("pthread_join metadata thread"); - } + + if (data_ht) { + destroy_data_stream_ht(data_ht); } rcu_unregister_thread(); @@ -2170,7 +2186,7 @@ end: * This thread listens on the consumerd socket and receives the file * descriptors from the session daemon. */ -void *lttng_consumer_thread_receive_fds(void *data) +void *consumer_thread_sessiond_poll(void *data) { int sock, client_socket, ret; /* @@ -2277,19 +2293,16 @@ end: consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT; /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). + * Notify the data poll thread to poll back again and test the + * consumer_quit state to quit gracefully. */ do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); + struct lttng_consumer_stream *null_stream = NULL; + + ret = write(ctx->consumer_data_pipe[1], &null_stream, + sizeof(null_stream)); } while (ret < 0 && errno == EINTR); + rcu_unregister_thread(); return NULL; } @@ -2330,9 +2343,14 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) */ void lttng_consumer_init(void) { - consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + assert(metadata_ht); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + assert(data_ht); } /* @@ -2426,3 +2444,89 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, error: return ret; } + +/* + * Check if for a given session id there is still data needed to be extract + * from the buffers. + * + * Return 1 if data is in fact available to be read or else 0. + */ +int consumer_data_available(uint64_t id) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + struct lttng_consumer_stream *stream; + struct consumer_relayd_sock_pair *relayd; + int (*data_available)(struct lttng_consumer_stream *); + + DBG("Consumer data available command on session id %" PRIu64, id); + + pthread_mutex_lock(&consumer_data.lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + data_available = lttng_kconsumer_data_available; + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + data_available = lttng_ustconsumer_data_available; + break; + default: + ERR("Unknown consumer data type"); + assert(0); + } + + rcu_read_lock(); + + /* Ease our life a bit */ + ht = consumer_data.stream_list_ht; + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct((void *)((unsigned long) id), 0x42UL), + ht->match_fct, (void *)((unsigned long) id), + &iter.iter, stream, node_session_id.node) { + /* Check the stream for data. */ + ret = data_available(stream); + if (ret == 0) { + goto data_not_available; + } + + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + assert(relayd); + + pthread_mutex_lock(&stream->lock); + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->metadata_flag) { + ret = relayd_quiescent_control(&relayd->control_sock); + } else { + ret = relayd_data_available(&relayd->control_sock, + stream->relayd_stream_id, stream->next_net_seq_num); + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + if (ret == 0) { + goto data_not_available; + } + } + } + + /* + * Finding _no_ node in the hash table means that the stream(s) have been + * removed thus data is guaranteed to be available for analysis from the + * trace files. This is *only* true for local consumer and not network + * streaming. + */ + + /* Data is available to be read by a viewer. */ + pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); + return 1; + +data_not_available: + /* Data is still being extracted from buffers. */ + pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); + return 0; +}