Fix data pending for inflight streaming
[lttng-tools.git] / src / common / consumer.c
index b5e212f27d61c5f08f53ef32fb19ac91c1e55917..3ecb72e21e52af045b3fdb956e0ec614a4c570dd 100644 (file)
@@ -65,7 +65,8 @@ static struct lttng_ht *data_ht;
 
 /*
  * This hash table contains the mapping between the session id of the sessiond
- * and the relayd session id. Element of the ht are indexed by sessiond_id.
+ * and the relayd session id. Element of the ht are indexed by sessiond session
+ * id.
  *
  * Node can be added when a relayd communication is opened in the sessiond
  * thread.
@@ -238,19 +239,21 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 
        DBG("Consumer destroy and close relayd socket pair");
 
+       /* Loockup for a relayd node in the session id map hash table. */
        lttng_ht_lookup(relayd_session_id_ht,
-                       (void *)((unsigned long) relayd->session_id), &iter);
+                       (void *)((unsigned long) relayd->sessiond_session_id), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
                /* We assume the relayd is being or is destroyed */
                return;
        }
 
-       ret = lttng_ht_del(relayd_session_id_ht, &iter);
-       if (ret != 0) {
-               /* We assume the relayd is being or is destroyed */
-               return;
-       }
+       /*
+        * Try to delete it from the relayd session id ht. The return value is of
+        * no importance since either way we are going to try to delete the relayd
+        * from the global relayd_ht.
+        */
+       lttng_ht_del(relayd_session_id_ht, &iter);
 
        iter.iter.node = &relayd->node.node;
        ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
@@ -280,6 +283,8 @@ static void cleanup_relayd_ht(void)
        }
 
        lttng_ht_destroy(consumer_data.relayd_ht);
+       /* The destroy_relayd call makes sure that this ht is empty here. */
+       lttng_ht_destroy(relayd_session_id_ht);
 
        rcu_read_unlock();
 }
@@ -2732,6 +2737,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto error;
                }
+               relayd->sessiond_session_id = (uint64_t) sessiond_id;
        }
 
        /* Poll on consumer socket. */
@@ -2781,7 +2787,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                 * grab the socket lock since the relayd object is not yet visible.
                 */
                ret = relayd_create_session(&relayd->control_sock,
-                               &relayd->session_id);
+                               &relayd->relayd_session_id);
                if (ret < 0) {
                        goto error;
                }
@@ -2793,10 +2799,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                        goto error;
                }
 
-               relayd_id_node->relayd_id = relayd->session_id;
+               relayd_id_node->relayd_id = relayd->relayd_session_id;
                relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
 
-               /* Indexed by session id of the session daemon. */
+               /* Indexed by session id of the sessiond. */
                lttng_ht_node_init_ulong(&relayd_id_node->node,
                                relayd_id_node->sessiond_id);
                rcu_read_lock();
@@ -2878,6 +2884,42 @@ end:
        return ret;
 }
 
+/*
+ * Search for a relayd associated to the session id and return the reference.
+ *
+ * A rcu read side lock MUST be acquire before calling this function and locked
+ * until the relayd object is no longer necessary.
+ */
+static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_session_id *session_id_map;
+
+       /* Get the session id map. */
+       lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node == NULL) {
+               goto end;
+       }
+
+       session_id_map = caa_container_of(node, struct consumer_relayd_session_id,
+                       node);
+
+       /* Iterate over all relayd since they are indexed by net_seq_idx. */
+       cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+                       node.node) {
+               if (relayd->relayd_session_id == session_id_map->relayd_id) {
+                       /* Found the relayd. There can be only one per id. */
+                       break;
+               }
+       }
+
+end:
+       return relayd;
+}
+
 /*
  * Check if for a given session id there is still data needed to be extract
  * from the buffers.
@@ -2890,7 +2932,7 @@ int consumer_data_pending(uint64_t id)
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
-       struct consumer_relayd_sock_pair *relayd;
+       struct consumer_relayd_sock_pair *relayd = NULL;
        int (*data_pending)(struct lttng_consumer_stream *);
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
@@ -2914,6 +2956,19 @@ int consumer_data_pending(uint64_t id)
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
+       relayd = find_relayd_by_session_id(id);
+       if (relayd) {
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_begin_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       /* Communication error thus the relayd so no data pending. */
+                       goto data_not_pending;
+               }
+       }
+
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
                        ht->match_fct, (void *)((unsigned long) id),
@@ -2921,7 +2976,7 @@ int consumer_data_pending(uint64_t id)
                /* If this call fails, the stream is being used hence data pending. */
                ret = stream_try_lock(stream);
                if (!ret) {
-                       goto data_not_pending;
+                       goto data_pending;
                }
 
                /*
@@ -2936,24 +2991,12 @@ int consumer_data_pending(uint64_t id)
                        ret = data_pending(stream);
                        if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_not_pending;
+                               goto data_pending;
                        }
                }
 
                /* Relayd check */
-               if (stream->net_seq_idx != -1) {
-                       relayd = consumer_find_relayd(stream->net_seq_idx);
-                       if (!relayd) {
-                               /*
-                                * At this point, if the relayd object is not available for the
-                                * given stream, it is because the relayd is being cleaned up
-                                * so every stream associated with it (for a session id value)
-                                * are or will be marked for deletion hence no data pending.
-                                */
-                               pthread_mutex_unlock(&stream->lock);
-                               goto data_not_pending;
-                       }
-
+               if (relayd) {
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock);
@@ -2964,25 +3007,39 @@ int consumer_data_pending(uint64_t id)
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_not_pending;
+                               goto data_pending;
                        }
                }
                pthread_mutex_unlock(&stream->lock);
        }
 
+       if (relayd) {
+               unsigned int is_data_inflight = 0;
+
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_end_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id, &is_data_inflight);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0 || !is_data_inflight) {
+                       /* On error or if NO data inflight, no data is pending. */
+                       goto data_not_pending;
+               }
+       }
+
        /*
-        * Finding _no_ node in the hash table means that the stream(s) have been
-        * removed thus data is guaranteed to be available for analysis from the
-        * trace files. This is *only* true for local consumer and not network
-        * streaming.
+        * Finding _no_ node in the hash table and no inflight data means that the
+        * stream(s) have been removed thus data is guaranteed to be available for
+        * analysis from the trace files.
         */
 
+data_not_pending:
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
        rcu_read_unlock();
        return 0;
 
-data_not_pending:
+data_pending:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
        rcu_read_unlock();
This page took 0.025898 seconds and 4 git commands to generate.