Rename data_available to data_pending
[lttng-tools.git] / src / common / consumer.c
index d245ed7eecb385cb72b76841a14944810f9a72c1..41f8ae53fe67adde7552b54dfb31aa89d4abedcb 100644 (file)
@@ -2671,33 +2671,61 @@ error:
        return ret;
 }
 
+/*
+ * Try to lock the stream mutex.
+ *
+ * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
+ */
+static int stream_try_lock(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+
+       /*
+        * Try to lock the stream mutex. On failure, we know that the stream is
+        * being used else where hence there is data still being extracted.
+        */
+       ret = pthread_mutex_trylock(&stream->lock);
+       if (ret) {
+               /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
+               ret = 0;
+               goto end;
+       }
+
+       ret = 1;
+
+end:
+       return ret;
+}
+
 /*
  * Check if for a given session id there is still data needed to be extract
  * from the buffers.
  *
- * Return 1 if data is in fact available to be read or else 0.
+ * Return 1 if data is pending or else 0 meaning ready to be read.
  */
-int consumer_data_available(uint64_t id)
+int consumer_data_pending(uint64_t id)
 {
        int ret;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
        struct consumer_relayd_sock_pair *relayd;
-       int (*data_available)(struct lttng_consumer_stream *);
+       int (*data_pending)(struct lttng_consumer_stream *);
 
-       DBG("Consumer data available command on session id %" PRIu64, id);
+       DBG("Consumer data pending command on session id %" PRIu64, id);
 
        rcu_read_lock();
        pthread_mutex_lock(&consumer_data.lock);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               data_available = lttng_kconsumer_data_available;
+               data_pending = lttng_kconsumer_data_pending;
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               data_available = lttng_ustconsumer_data_available;
+               data_pending = lttng_ustconsumer_data_pending;
                break;
        default:
                ERR("Unknown consumer data type");
@@ -2711,30 +2739,56 @@ int consumer_data_available(uint64_t id)
                        ht->hash_fct((void *)((unsigned long) id), 0x42UL),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
-               /* Check the stream for data. */
-               ret = data_available(stream);
-               if (ret == 0) {
-                       goto data_not_available;
+               /* If this call fails, the stream is being used hence data pending. */
+               ret = stream_try_lock(stream);
+               if (!ret) {
+                       goto data_not_pending;
+               }
+
+               /*
+                * A removed node from the hash table indicates that the stream has
+                * been deleted thus having a guarantee that the buffers are closed
+                * on the consumer side. However, data can still be transmitted
+                * over the network so don't skip the relayd check.
+                */
+               ret = cds_lfht_is_node_deleted(&stream->node.node);
+               if (!ret) {
+                       /* Check the stream if there is data in the buffers. */
+                       ret = data_pending(stream);
+                       if (ret == 1) {
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_pending;
+                       }
                }
 
+               /* Relayd check */
                if (stream->net_seq_idx != -1) {
                        relayd = consumer_find_relayd(stream->net_seq_idx);
-                       assert(relayd);
+                       if (!relayd) {
+                               /*
+                                * At this point, if the relayd object is not available for the
+                                * given stream, it is because the relayd is being cleaned up
+                                * so every stream associated with it (for a session id value)
+                                * are or will be marked for deletion hence no data pending.
+                                */
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_pending;
+                       }
 
-                       pthread_mutex_lock(&stream->lock);
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock);
                        } else {
-                               ret = relayd_data_available(&relayd->control_sock,
+                               ret = relayd_data_pending(&relayd->control_sock,
                                                stream->relayd_stream_id, stream->next_net_seq_num);
                        }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-                       pthread_mutex_unlock(&stream->lock);
-                       if (ret == 0) {
-                               goto data_not_available;
+                       if (ret == 1) {
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_pending;
                        }
                }
+               pthread_mutex_unlock(&stream->lock);
        }
 
        /*
@@ -2747,11 +2801,11 @@ int consumer_data_available(uint64_t id)
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
        rcu_read_unlock();
-       return 1;
+       return 0;
 
-data_not_available:
+data_not_pending:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
        rcu_read_unlock();
-       return 0;
+       return 1;
 }
This page took 0.025286 seconds and 4 git commands to generate.