iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
+
+ /* Remove node session id from the consumer_data stream ht */
+ iter.iter.node = &stream->node_session_id.node;
+ ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ assert(!ret);
rcu_read_unlock();
assert(consumer_data.stream_count > 0);
gid_t gid,
int net_index,
int metadata_flag,
+ uint64_t session_id,
int *alloc_ret)
{
struct lttng_consumer_stream *stream;
stream->gid = gid;
stream->net_seq_idx = net_index;
stream->metadata_flag = metadata_flag;
+ stream->session_id = session_id;
strncpy(stream->path_name, path_name, sizeof(stream->path_name));
stream->path_name[sizeof(stream->path_name) - 1] = '\0';
+ pthread_mutex_init(&stream->lock, NULL);
/*
* Index differently the metadata node because the thread is using an
lttng_ht_node_init_ulong(&stream->node, stream->key);
}
+ /* Init session id node with the stream session id */
+ lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+
/*
* The cpu number is needed before using any ustctl_* actions. Ignored for
* the kernel so the value does not matter.
pthread_mutex_unlock(&consumer_data.lock);
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
- " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
- stream->shm_fd, stream->wait_fd,
+ " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
+ stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
- stream->net_seq_idx);
+ stream->net_seq_idx, stream->session_id);
return stream;
error:
lttng_ht_add_unique_ulong(ht, &stream->node);
+ /*
+ * Add stream to the stream_list_ht of the consumer data. No need to steal
+ * the key since the HT does not use it and we allow to add redundant keys
+ * into this table.
+ */
+ lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ pthread_mutex_lock(&stream->lock);
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
lttng_consumer_sync_trace_file(stream, orig_offset);
end:
+ pthread_mutex_unlock(&stream->lock);
/* Unlock only if ctrl socket used */
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ pthread_mutex_lock(&stream->lock);
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
}
end:
+ pthread_mutex_unlock(&stream->lock);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
+
+ /* Remove node session id from the consumer_data stream ht */
+ iter.iter.node = &stream->node_session_id.node;
+ ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ assert(!ret);
rcu_read_unlock();
if (stream->out_fd >= 0) {
consumer_steal_stream_key(stream->key, ht);
lttng_ht_add_unique_ulong(ht, &stream->node);
+
+ /*
+ * Add stream to the stream_list_ht of the consumer data. No need to steal
+ * the key since the HT does not use it and we allow to add redundant keys
+ * into this table.
+ */
+ lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
{
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
assert(metadata_ht);
error:
return ret;
}
+
+/*
+ * Check if for a given session id there is still data needed to be extract
+ * from the buffers.
+ *
+ * Return 1 if data is in fact available to be read or else 0.
+ */
+int consumer_data_available(uint64_t id)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ struct consumer_relayd_sock_pair *relayd;
+ int (*data_available)(struct lttng_consumer_stream *);
+
+ DBG("Consumer data available command on session id %" PRIu64, id);
+
+ pthread_mutex_lock(&consumer_data.lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ data_available = lttng_kconsumer_data_available;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ data_available = lttng_ustconsumer_data_available;
+ break;
+ default:
+ ERR("Unknown consumer data type");
+ assert(0);
+ }
+
+ rcu_read_lock();
+
+ /* Ease our life a bit */
+ ht = consumer_data.stream_list_ht;
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct((void *)((unsigned long) id), 0x42UL),
+ ht->match_fct, (void *)((unsigned long) id),
+ &iter.iter, stream, node_session_id.node) {
+ /* Check the stream for data. */
+ ret = data_available(stream);
+ if (ret == 0) {
+ goto data_not_available;
+ }
+
+ if (stream->net_seq_idx != -1) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ assert(relayd);
+
+ pthread_mutex_lock(&stream->lock);
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ if (stream->metadata_flag) {
+ ret = relayd_quiescent_control(&relayd->control_sock);
+ } else {
+ ret = relayd_data_available(&relayd->control_sock,
+ stream->relayd_stream_id, stream->next_net_seq_num);
+ }
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret == 0) {
+ goto data_not_available;
+ }
+ }
+ }
+
+ /*
+ * Finding _no_ node in the hash table means that the stream(s) have been
+ * removed thus data is guaranteed to be available for analysis from the
+ * trace files. This is *only* true for local consumer and not network
+ * streaming.
+ */
+
+ /* Data is available to be read by a viewer. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+ return 1;
+
+data_not_available:
+ /* Data is still being extracted from buffers. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+ return 0;
+}