Relayd data available command support
[lttng-tools.git] / src / common / consumer.c
index 8d1a34025687da37b7c3c1555d80441c93d8761d..8d897e5df1e20e20c67ddbc6ed8f99af7e7de2aa 100644 (file)
@@ -1169,6 +1169,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
+       pthread_mutex_lock(&stream->lock);
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1264,6 +1266,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
 end:
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
 end:
+       pthread_mutex_unlock(&stream->lock);
        /* Unlock only if ctrl socket used */
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        /* Unlock only if ctrl socket used */
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
@@ -1307,6 +1310,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
+       pthread_mutex_lock(&stream->lock);
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1446,6 +1451,7 @@ splice_error:
        }
 
 end:
        }
 
 end:
+       pthread_mutex_unlock(&stream->lock);
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
@@ -2451,6 +2457,7 @@ int consumer_data_available(uint64_t id)
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
+       struct consumer_relayd_sock_pair *relayd;
        int (*data_available)(struct lttng_consumer_stream *);
 
        DBG("Consumer data available command on session id %" PRIu64, id);
        int (*data_available)(struct lttng_consumer_stream *);
 
        DBG("Consumer data available command on session id %" PRIu64, id);
@@ -2470,10 +2477,13 @@ int consumer_data_available(uint64_t id)
                assert(0);
        }
 
                assert(0);
        }
 
+       rcu_read_lock();
+
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
-       cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct,
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct((void *)((unsigned long) id), 0x42UL),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* Check the stream for data. */
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* Check the stream for data. */
@@ -2481,9 +2491,26 @@ int consumer_data_available(uint64_t id)
                if (ret == 0) {
                        goto data_not_available;
                }
                if (ret == 0) {
                        goto data_not_available;
                }
-       }
 
 
-       /* TODO: Support to ask the relayd if the streams are remote */
+               if (stream->net_seq_idx != -1) {
+                       relayd = consumer_find_relayd(stream->net_seq_idx);
+                       assert(relayd);
+
+                       pthread_mutex_lock(&stream->lock);
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       if (stream->metadata_flag) {
+                               ret = relayd_quiescent_control(&relayd->control_sock);
+                       } else {
+                               ret = relayd_data_available(&relayd->control_sock,
+                                               stream->relayd_stream_id, stream->next_net_seq_num);
+                       }
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       pthread_mutex_unlock(&stream->lock);
+                       if (ret == 0) {
+                               goto data_not_available;
+                       }
+               }
+       }
 
        /*
         * Finding _no_ node in the hash table means that the stream(s) have been
 
        /*
         * Finding _no_ node in the hash table means that the stream(s) have been
@@ -2494,10 +2521,12 @@ int consumer_data_available(uint64_t id)
 
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
 
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
+       rcu_read_unlock();
        return 1;
 
 data_not_available:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
        return 1;
 
 data_not_available:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
+       rcu_read_unlock();
        return 0;
 }
        return 0;
 }
This page took 0.02445 seconds and 4 git commands to generate.