X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=8d897e5df1e20e20c67ddbc6ed8f99af7e7de2aa;hp=8d1a34025687da37b7c3c1555d80441c93d8761d;hb=c8f59ee5fc11492ef472dc5cfd2fd2c4926b1787;hpb=ca22feea083301934d1c8511851c86fb008c0697 diff --git a/src/common/consumer.c b/src/common/consumer.c index 8d1a34025..8d897e5df 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1169,6 +1169,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1264,6 +1266,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( lttng_consumer_sync_trace_file(stream, orig_offset); end: + pthread_mutex_unlock(&stream->lock); /* Unlock only if ctrl socket used */ if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); @@ -1307,6 +1310,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1446,6 +1451,7 @@ splice_error: } end: + pthread_mutex_unlock(&stream->lock); if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } @@ -2451,6 +2457,7 @@ int consumer_data_available(uint64_t id) struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; + struct consumer_relayd_sock_pair *relayd; int (*data_available)(struct lttng_consumer_stream *); DBG("Consumer data available command on session id %" PRIu64, id); @@ -2470,10 +2477,13 @@ int consumer_data_available(uint64_t id) assert(0); } + rcu_read_lock(); + /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct, + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct((void *)((unsigned long) id), 0x42UL), ht->match_fct, (void *)((unsigned long) id), &iter.iter, stream, node_session_id.node) { /* Check the stream for data. */ @@ -2481,9 +2491,26 @@ int consumer_data_available(uint64_t id) if (ret == 0) { goto data_not_available; } - } - /* TODO: Support to ask the relayd if the streams are remote */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + assert(relayd); + + pthread_mutex_lock(&stream->lock); + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->metadata_flag) { + ret = relayd_quiescent_control(&relayd->control_sock); + } else { + ret = relayd_data_available(&relayd->control_sock, + stream->relayd_stream_id, stream->next_net_seq_num); + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + if (ret == 0) { + goto data_not_available; + } + } + } /* * Finding _no_ node in the hash table means that the stream(s) have been @@ -2494,10 +2521,12 @@ int consumer_data_available(uint64_t id) /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); return 1; data_not_available: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); return 0; }