+
+/*
+ * Try to lock the stream mutex.
+ *
+ * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
+ */
+static int stream_try_lock(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ /*
+ * Try to lock the stream mutex. On failure, we know that the stream is
+ * being used else where hence there is data still being extracted.
+ */
+ ret = pthread_mutex_trylock(&stream->lock);
+ if (ret) {
+ /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
+ ret = 0;
+ goto end;
+ }
+
+ ret = 1;
+
+end:
+ return ret;
+}
+
+/*
+ * Check if for a given session id there is still data needed to be extract
+ * from the buffers.
+ *
+ * Return 1 if data is pending or else 0 meaning ready to be read.
+ */
+int consumer_data_pending(uint64_t id)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ struct consumer_relayd_sock_pair *relayd;
+ int (*data_pending)(struct lttng_consumer_stream *);
+
+ DBG("Consumer data pending command on session id %" PRIu64, id);
+
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ data_pending = lttng_kconsumer_data_pending;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ data_pending = lttng_ustconsumer_data_pending;
+ break;
+ default:
+ ERR("Unknown consumer data type");
+ assert(0);
+ }
+
+ /* Ease our life a bit */
+ ht = consumer_data.stream_list_ht;
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
+ ht->match_fct, (void *)((unsigned long) id),
+ &iter.iter, stream, node_session_id.node) {
+ /* If this call fails, the stream is being used hence data pending. */
+ ret = stream_try_lock(stream);
+ if (!ret) {
+ goto data_not_pending;
+ }
+
+ /*
+ * A removed node from the hash table indicates that the stream has
+ * been deleted thus having a guarantee that the buffers are closed
+ * on the consumer side. However, data can still be transmitted
+ * over the network so don't skip the relayd check.
+ */
+ ret = cds_lfht_is_node_deleted(&stream->node.node);
+ if (!ret) {
+ /* Check the stream if there is data in the buffers. */
+ ret = data_pending(stream);
+ if (ret == 1) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_pending;
+ }
+ }
+
+ /* Relayd check */
+ if (stream->net_seq_idx != -1) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (!relayd) {
+ /*
+ * At this point, if the relayd object is not available for the
+ * given stream, it is because the relayd is being cleaned up
+ * so every stream associated with it (for a session id value)
+ * are or will be marked for deletion hence no data pending.
+ */
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_pending;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ if (stream->metadata_flag) {
+ ret = relayd_quiescent_control(&relayd->control_sock);
+ } else {
+ ret = relayd_data_pending(&relayd->control_sock,
+ stream->relayd_stream_id, stream->next_net_seq_num);
+ }
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret == 1) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_pending;
+ }
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ /*
+ * Finding _no_ node in the hash table means that the stream(s) have been
+ * removed thus data is guaranteed to be available for analysis from the
+ * trace files. This is *only* true for local consumer and not network
+ * streaming.
+ */
+
+ /* Data is available to be read by a viewer. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+ return 0;
+
+data_not_pending:
+ /* Data is still being extracted from buffers. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+ return 1;
+}