X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=53c618067f8d6546b91ccc8f77854d7d2b36ed72;hp=0f0e60ad839a9beab57168634a8d7220a86fae62;hb=ab1027f48fa7e2dd29b3c85548ec42f26d06be25;hpb=3cc2f24a5cdabfbcb1022c0798f6b4845f72b498 diff --git a/src/common/consumer.c b/src/common/consumer.c index 0f0e60ad8..53c618067 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -56,7 +56,7 @@ int consumer_poll_timeout = -1; * Also updated by the signal handler (consumer_should_exit()). Read by the * polling threads. */ -volatile int consumer_quit = 0; +volatile int consumer_quit; /* * The following two hash tables are visible by all threads which are separated @@ -66,8 +66,8 @@ volatile int consumer_quit = 0; * stream element in this ht should only be updated by the metadata poll thread * for the metadata and the data poll thread for the data. */ -struct lttng_ht *metadata_ht = NULL; -struct lttng_ht *data_ht = NULL; +struct lttng_ht *metadata_ht; +struct lttng_ht *data_ht; /* * Find a stream. The consumer_data.lock must be locked during this @@ -281,6 +281,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, iter.iter.node = &stream->node.node; ret = lttng_ht_del(ht, &iter); assert(!ret); + + /* Remove node session id from the consumer_data stream ht */ + iter.iter.node = &stream->node_session_id.node; + ret = lttng_ht_del(consumer_data.stream_list_ht, &iter); + assert(!ret); rcu_read_unlock(); assert(consumer_data.stream_count > 0); @@ -364,6 +369,7 @@ struct lttng_consumer_stream *consumer_allocate_stream( gid_t gid, int net_index, int metadata_flag, + uint64_t session_id, int *alloc_ret) { struct lttng_consumer_stream *stream; @@ -399,8 +405,10 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->gid = gid; stream->net_seq_idx = net_index; stream->metadata_flag = metadata_flag; + stream->session_id = session_id; strncpy(stream->path_name, path_name, sizeof(stream->path_name)); stream->path_name[sizeof(stream->path_name) - 1] = '\0'; + pthread_mutex_init(&stream->lock, NULL); /* * Index differently the metadata node because the thread is using an @@ -413,6 +421,9 @@ struct lttng_consumer_stream *consumer_allocate_stream( lttng_ht_node_init_ulong(&stream->node, stream->key); } + /* Init session id node with the stream session id */ + lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id); + /* * The cpu number is needed before using any ustctl_* actions. Ignored for * the kernel so the value does not matter. @@ -422,10 +433,10 @@ struct lttng_consumer_stream *consumer_allocate_stream( pthread_mutex_unlock(&consumer_data.lock); DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu," - " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, - stream->shm_fd, stream->wait_fd, + " out_fd %d, net_seq_idx %d, session_id %" PRIu64, + stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, (unsigned long long) stream->mmap_len, stream->out_fd, - stream->net_seq_idx); + stream->net_seq_idx, stream->session_id); return stream; error: @@ -456,6 +467,13 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream, lttng_ht_add_unique_ulong(ht, &stream->node); + /* + * Add stream to the stream_list_ht of the consumer data. No need to steal + * the key since the HT does not use it and we allow to add redundant keys + * into this table. + */ + lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id); + /* Check and cleanup relayd */ relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != NULL) { @@ -905,6 +923,8 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) if (ret < 0) { PERROR("write consumer quit"); } + + DBG("Consumer flag that it should quit"); } void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, @@ -1066,6 +1086,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) { int ret; + DBG("Consumer destroying it. Closing everything."); + ret = close(ctx->consumer_error_socket); if (ret) { PERROR("close"); @@ -1151,6 +1173,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1250,6 +1274,7 @@ end: if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } + pthread_mutex_unlock(&stream->lock); rcu_read_unlock(); return written; @@ -1289,6 +1314,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1431,6 +1458,7 @@ end: if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } + pthread_mutex_unlock(&stream->lock); rcu_read_unlock(); return written; @@ -1600,6 +1628,11 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, iter.iter.node = &stream->node.node; ret = lttng_ht_del(ht, &iter); assert(!ret); + + /* Remove node session id from the consumer_data stream ht */ + iter.iter.node = &stream->node_session_id.node; + ret = lttng_ht_del(consumer_data.stream_list_ht, &iter); + assert(!ret); rcu_read_unlock(); if (stream->out_fd >= 0) { @@ -1718,6 +1751,14 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream, consumer_steal_stream_key(stream->key, ht); lttng_ht_add_unique_ulong(ht, &stream->node); + + /* + * Add stream to the stream_list_ht of the consumer data. No need to steal + * the key since the HT does not use it and we allow to add redundant keys + * into this table. + */ + lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id); + rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); @@ -1879,8 +1920,9 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* It's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { - rcu_read_unlock(); - goto end; + /* Clean up stream from consumer and free it. */ + lttng_poll_del(&events, stream->wait_fd); + consumer_del_metadata_stream(stream, metadata_ht); } else if (len > 0) { stream->data_read = 1; } @@ -2047,7 +2089,8 @@ void *consumer_thread_data_poll(void *data) len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { - goto end; + /* Clean the stream and free it. */ + consumer_del_stream(local_stream[i], data_ht); } else if (len > 0) { local_stream[i]->data_read = 1; } @@ -2070,7 +2113,8 @@ void *consumer_thread_data_poll(void *data) len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { - goto end; + /* Clean the stream and free it. */ + consumer_del_stream(local_stream[i], data_ht); } else if (len > 0) { local_stream[i]->data_read = 1; } @@ -2308,6 +2352,7 @@ void lttng_consumer_init(void) { consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); assert(metadata_ht); @@ -2406,3 +2451,88 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, error: return ret; } + +/* + * Check if for a given session id there is still data needed to be extract + * from the buffers. + * + * Return 1 if data is in fact available to be read or else 0. + */ +int consumer_data_available(uint64_t id) +{ + int ret; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + struct lttng_consumer_stream *stream; + struct consumer_relayd_sock_pair *relayd; + int (*data_available)(struct lttng_consumer_stream *); + + DBG("Consumer data available command on session id %" PRIu64, id); + + rcu_read_lock(); + pthread_mutex_lock(&consumer_data.lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + data_available = lttng_kconsumer_data_available; + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + data_available = lttng_ustconsumer_data_available; + break; + default: + ERR("Unknown consumer data type"); + assert(0); + } + + /* Ease our life a bit */ + ht = consumer_data.stream_list_ht; + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct((void *)((unsigned long) id), 0x42UL), + ht->match_fct, (void *)((unsigned long) id), + &iter.iter, stream, node_session_id.node) { + /* Check the stream for data. */ + ret = data_available(stream); + if (ret == 0) { + goto data_not_available; + } + + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + assert(relayd); + + pthread_mutex_lock(&stream->lock); + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->metadata_flag) { + ret = relayd_quiescent_control(&relayd->control_sock); + } else { + ret = relayd_data_available(&relayd->control_sock, + stream->relayd_stream_id, stream->next_net_seq_num); + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + if (ret == 0) { + goto data_not_available; + } + } + } + + /* + * Finding _no_ node in the hash table means that the stream(s) have been + * removed thus data is guaranteed to be available for analysis from the + * trace files. This is *only* true for local consumer and not network + * streaming. + */ + + /* Data is available to be read by a viewer. */ + pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); + return 1; + +data_not_available: + /* Data is still being extracted from buffers. */ + pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); + return 0; +}