Relayd data available command support
[lttng-tools.git] / src / common / consumer.c
index fb0d7d05afb182992c97c789dcdd54544bb2fcea..8d897e5df1e20e20c67ddbc6ed8f99af7e7de2aa 100644 (file)
@@ -281,6 +281,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        iter.iter.node = &stream->node.node;
        ret = lttng_ht_del(ht, &iter);
        assert(!ret);
+
+       /* Remove node session id from the consumer_data stream ht */
+       iter.iter.node = &stream->node_session_id.node;
+       ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+       assert(!ret);
        rcu_read_unlock();
 
        assert(consumer_data.stream_count > 0);
@@ -364,6 +369,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                gid_t gid,
                int net_index,
                int metadata_flag,
+               uint64_t session_id,
                int *alloc_ret)
 {
        struct lttng_consumer_stream *stream;
@@ -399,8 +405,10 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        stream->gid = gid;
        stream->net_seq_idx = net_index;
        stream->metadata_flag = metadata_flag;
+       stream->session_id = session_id;
        strncpy(stream->path_name, path_name, sizeof(stream->path_name));
        stream->path_name[sizeof(stream->path_name) - 1] = '\0';
+       pthread_mutex_init(&stream->lock, NULL);
 
        /*
         * Index differently the metadata node because the thread is using an
@@ -413,6 +421,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                lttng_ht_node_init_ulong(&stream->node, stream->key);
        }
 
+       /* Init session id node with the stream session id */
+       lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+
        /*
         * The cpu number is needed before using any ustctl_* actions. Ignored for
         * the kernel so the value does not matter.
@@ -422,10 +433,10 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        pthread_mutex_unlock(&consumer_data.lock);
 
        DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
-                       " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
-                       stream->shm_fd, stream->wait_fd,
+                       " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
+                       stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
                        (unsigned long long) stream->mmap_len, stream->out_fd,
-                       stream->net_seq_idx);
+                       stream->net_seq_idx, stream->session_id);
        return stream;
 
 error:
@@ -456,6 +467,13 @@ static int consumer_add_stream(struct lttng_consumer_stream *stream,
 
        lttng_ht_add_unique_ulong(ht, &stream->node);
 
+       /*
+        * Add stream to the stream_list_ht of the consumer data. No need to steal
+        * the key since the HT does not use it and we allow to add redundant keys
+        * into this table.
+        */
+       lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
        /* Check and cleanup relayd */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
@@ -623,23 +641,6 @@ error:
        return outfd;
 }
 
-/*
- * Update a stream according to what we just received.
- */
-void consumer_change_stream_state(int stream_key,
-               enum lttng_consumer_stream_state state)
-{
-       struct lttng_consumer_stream *stream;
-
-       pthread_mutex_lock(&consumer_data.lock);
-       stream = consumer_find_stream(stream_key, consumer_data.stream_ht);
-       if (stream) {
-               stream->state = state;
-       }
-       consumer_data.need_update = 1;
-       pthread_mutex_unlock(&consumer_data.lock);
-}
-
 static
 void consumer_free_channel(struct rcu_head *head)
 {
@@ -897,17 +898,6 @@ void lttng_consumer_cleanup(void)
 
        rcu_read_lock();
 
-       /*
-        * close all outfd. Called when there are no more threads running (after
-        * joining on the threads), no need to protect list iteration with mutex.
-        */
-       cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
-                       node) {
-               struct lttng_consumer_stream *stream =
-                       caa_container_of(node, struct lttng_consumer_stream, node);
-               consumer_del_stream(stream, consumer_data.stream_ht);
-       }
-
        cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
                        node) {
                struct lttng_consumer_channel *channel =
@@ -917,7 +907,6 @@ void lttng_consumer_cleanup(void)
 
        rcu_read_unlock();
 
-       lttng_ht_destroy(consumer_data.stream_ht);
        lttng_ht_destroy(consumer_data.channel_ht);
 }
 
@@ -1180,6 +1169,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
+       pthread_mutex_lock(&stream->lock);
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1275,6 +1266,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
 end:
+       pthread_mutex_unlock(&stream->lock);
        /* Unlock only if ctrl socket used */
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
@@ -1318,6 +1310,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
+       pthread_mutex_lock(&stream->lock);
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1457,6 +1451,7 @@ splice_error:
        }
 
 end:
+       pthread_mutex_unlock(&stream->lock);
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
@@ -1629,6 +1624,11 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        iter.iter.node = &stream->node.node;
        ret = lttng_ht_del(ht, &iter);
        assert(!ret);
+
+       /* Remove node session id from the consumer_data stream ht */
+       iter.iter.node = &stream->node_session_id.node;
+       ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+       assert(!ret);
        rcu_read_unlock();
 
        if (stream->out_fd >= 0) {
@@ -1747,6 +1747,14 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        consumer_steal_stream_key(stream->key, ht);
 
        lttng_ht_add_unique_ulong(ht, &stream->node);
+
+       /*
+        * Add stream to the stream_list_ht of the consumer data. No need to steal
+        * the key since the HT does not use it and we allow to add redundant keys
+        * into this table.
+        */
+       lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
        rcu_read_unlock();
 
        pthread_mutex_unlock(&consumer_data.lock);
@@ -2335,9 +2343,9 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
  */
 void lttng_consumer_init(void)
 {
-       consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 
        metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        assert(metadata_ht);
@@ -2436,3 +2444,89 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 error:
        return ret;
 }
+
+/*
+ * Check if for a given session id there is still data needed to be extract
+ * from the buffers.
+ *
+ * Return 1 if data is in fact available to be read or else 0.
+ */
+int consumer_data_available(uint64_t id)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht;
+       struct lttng_consumer_stream *stream;
+       struct consumer_relayd_sock_pair *relayd;
+       int (*data_available)(struct lttng_consumer_stream *);
+
+       DBG("Consumer data available command on session id %" PRIu64, id);
+
+       pthread_mutex_lock(&consumer_data.lock);
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               data_available = lttng_kconsumer_data_available;
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               data_available = lttng_ustconsumer_data_available;
+               break;
+       default:
+               ERR("Unknown consumer data type");
+               assert(0);
+       }
+
+       rcu_read_lock();
+
+       /* Ease our life a bit */
+       ht = consumer_data.stream_list_ht;
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct((void *)((unsigned long) id), 0x42UL),
+                       ht->match_fct, (void *)((unsigned long) id),
+                       &iter.iter, stream, node_session_id.node) {
+               /* Check the stream for data. */
+               ret = data_available(stream);
+               if (ret == 0) {
+                       goto data_not_available;
+               }
+
+               if (stream->net_seq_idx != -1) {
+                       relayd = consumer_find_relayd(stream->net_seq_idx);
+                       assert(relayd);
+
+                       pthread_mutex_lock(&stream->lock);
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       if (stream->metadata_flag) {
+                               ret = relayd_quiescent_control(&relayd->control_sock);
+                       } else {
+                               ret = relayd_data_available(&relayd->control_sock,
+                                               stream->relayd_stream_id, stream->next_net_seq_num);
+                       }
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       pthread_mutex_unlock(&stream->lock);
+                       if (ret == 0) {
+                               goto data_not_available;
+                       }
+               }
+       }
+
+       /*
+        * Finding _no_ node in the hash table means that the stream(s) have been
+        * removed thus data is guaranteed to be available for analysis from the
+        * trace files. This is *only* true for local consumer and not network
+        * streaming.
+        */
+
+       /* Data is available to be read by a viewer. */
+       pthread_mutex_unlock(&consumer_data.lock);
+       rcu_read_unlock();
+       return 1;
+
+data_not_available:
+       /* Data is still being extracted from buffers. */
+       pthread_mutex_unlock(&consumer_data.lock);
+       rcu_read_unlock();
+       return 0;
+}
This page took 0.026122 seconds and 4 git commands to generate.