Fix data pending for inflight streaming
authorDavid Goulet <dgoulet@efficios.com>
Thu, 13 Dec 2012 01:16:33 +0000 (20:16 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Thu, 13 Dec 2012 17:54:30 +0000 (12:54 -0500)
The consumer_data_pending() function call had a bad label naming. The
goto label data_not_pending was actually going to the return value of
pending data (1). So, this patch fixes that by renaming the label to the
right meaning.

Add a missing destroy of the relayd session id mapping hash table.

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/lttng-relayd.h
src/bin/lttng-relayd/main.c
src/common/consumer.c
src/common/consumer.h
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h

index 2a442b0952cbb4b14870434a78f0f0aa3253c105..edd32d6aba3770fb23cc4d720506eabde4062c5c 100644 (file)
@@ -40,6 +40,11 @@ enum connection_type {
  * Represents a session for the relay point of view
  */
 struct relay_session {
+       /*
+        * This session id is used to identify a set of stream to a tracing session
+        * but also make sure we have a unique session id associated with a session
+        * daemon which can provide multiple data source.
+        */
        uint64_t id;
        struct lttcomm_sock *sock;
 };
@@ -58,6 +63,8 @@ struct relay_stream {
        /* Information telling us when to close the stream  */
        unsigned int close_flag:1;
        uint64_t last_net_seq_num;
+       /* Indicate if the stream was initialized for a data pending command. */
+       unsigned int data_pending_check_done:1;
 };
 
 /*
index 74e785e2f52d62e0afd16fb46db0fbe37e9c7151..71753c03d4070888236ebcbca9ff59e7ed9a7013 100644 (file)
@@ -455,6 +455,13 @@ int close_stream_check(struct relay_stream *stream)
 {
 
        if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
+               /*
+                * We are about to close the stream so set the data pending flag to 1
+                * which will make the end data pending command skip the stream which
+                * is now closed and ready. Note that after proceeding to a file close,
+                * the written file is ready for reading.
+                */
+               stream->data_pending_check_done = 1;
                return 1;
        }
        return 0;
@@ -1426,6 +1433,9 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
                ret = 1;
        }
 
+       /* Pending check is now done. */
+       stream->data_pending_check_done = 1;
+
 end_unlock:
        rcu_read_unlock();
 
@@ -1464,6 +1474,141 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
        return ret;
 }
 
+/*
+ * Initialize a data pending command. This means that a client is about to ask
+ * for data pending for each stream he/she holds. Simply iterate over all
+ * streams of a session and set the data_pending_check_done flag.
+ *
+ * This command returns to the client a LTTNG_OK code.
+ */
+static
+int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttcomm_relayd_begin_data_pending msg;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+       uint64_t session_id;
+
+       assert(recv_hdr);
+       assert(cmd);
+       assert(streams_ht);
+
+       DBG("Init streams for data pending");
+
+       if (!cmd->session || cmd->version_check_done == 0) {
+               ERR("Trying to check for data before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+       if (ret < sizeof(msg)) {
+               ERR("Relay didn't receive valid begin data_pending struct size: %d",
+                               ret);
+               ret = -1;
+               goto end_no_session;
+       }
+
+       session_id = be64toh(msg.session_id);
+
+       /*
+        * Iterate over all streams to set the begin data pending flag. For now, the
+        * streams are indexed by stream handle so we have to iterate over all
+        * streams to find the one associated with the right session_id.
+        */
+       rcu_read_lock();
+       cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+               if (stream->session->id == session_id) {
+                       stream->data_pending_check_done = 0;
+                       DBG("Set begin data pending flag to stream %" PRIu64,
+                                       stream->stream_handle);
+               }
+       }
+       rcu_read_unlock();
+
+       /* All good, send back reply. */
+       reply.ret_code = htobe32(LTTNG_OK);
+
+       ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+       if (ret < 0) {
+               ERR("Relay begin data pending send reply failed");
+       }
+
+end_no_session:
+       return ret;
+}
+
+/*
+ * End data pending command. This will check, for a given session id, if each
+ * stream associated with it has its data_pending_check_done flag set. If not,
+ * this means that the client lost track of the stream but the data is still
+ * being streamed on our side. In this case, we inform the client that data is
+ * inflight.
+ *
+ * Return to the client if there is data in flight or not with a ret_code.
+ */
+static
+int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttcomm_relayd_end_data_pending msg;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+       uint64_t session_id;
+       uint32_t is_data_inflight = 0;
+
+       assert(recv_hdr);
+       assert(cmd);
+       assert(streams_ht);
+
+       DBG("End data pending command");
+
+       if (!cmd->session || cmd->version_check_done == 0) {
+               ERR("Trying to check for data before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
+       if (ret < sizeof(msg)) {
+               ERR("Relay didn't receive valid end data_pending struct size: %d",
+                               ret);
+               ret = -1;
+               goto end_no_session;
+       }
+
+       session_id = be64toh(msg.session_id);
+
+       /* Iterate over all streams to see if the begin data pending flag is set. */
+       rcu_read_lock();
+       cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
+               if (stream->session->id == session_id &&
+                               !stream->data_pending_check_done) {
+                       is_data_inflight = 1;
+                       DBG("Data is still in flight for stream %" PRIu64,
+                                       stream->stream_handle);
+                       break;
+               }
+       }
+       rcu_read_unlock();
+
+       /* All good, send back reply. */
+       reply.ret_code = htobe32(is_data_inflight);
+
+       ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+       if (ret < 0) {
+               ERR("Relay end data pending send reply failed");
+       }
+
+end_no_session:
+       return ret;
+}
+
 /*
  * relay_process_control: Process the commands received on the control socket
  */
@@ -1498,6 +1643,12 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_QUIESCENT_CONTROL:
                ret = relay_quiescent_control(recv_hdr, cmd);
                break;
+       case RELAYD_BEGIN_DATA_PENDING:
+               ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht);
+               break;
+       case RELAYD_END_DATA_PENDING:
+               ret = relay_end_data_pending(recv_hdr, cmd, streams_ht);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
index b5e212f27d61c5f08f53ef32fb19ac91c1e55917..3ecb72e21e52af045b3fdb956e0ec614a4c570dd 100644 (file)
@@ -65,7 +65,8 @@ static struct lttng_ht *data_ht;
 
 /*
  * This hash table contains the mapping between the session id of the sessiond
- * and the relayd session id. Element of the ht are indexed by sessiond_id.
+ * and the relayd session id. Element of the ht are indexed by sessiond session
+ * id.
  *
  * Node can be added when a relayd communication is opened in the sessiond
  * thread.
@@ -238,19 +239,21 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 
        DBG("Consumer destroy and close relayd socket pair");
 
+       /* Loockup for a relayd node in the session id map hash table. */
        lttng_ht_lookup(relayd_session_id_ht,
-                       (void *)((unsigned long) relayd->session_id), &iter);
+                       (void *)((unsigned long) relayd->sessiond_session_id), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
                /* We assume the relayd is being or is destroyed */
                return;
        }
 
-       ret = lttng_ht_del(relayd_session_id_ht, &iter);
-       if (ret != 0) {
-               /* We assume the relayd is being or is destroyed */
-               return;
-       }
+       /*
+        * Try to delete it from the relayd session id ht. The return value is of
+        * no importance since either way we are going to try to delete the relayd
+        * from the global relayd_ht.
+        */
+       lttng_ht_del(relayd_session_id_ht, &iter);
 
        iter.iter.node = &relayd->node.node;
        ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
@@ -280,6 +283,8 @@ static void cleanup_relayd_ht(void)
        }
 
        lttng_ht_destroy(consumer_data.relayd_ht);
+       /* The destroy_relayd call makes sure that this ht is empty here. */
+       lttng_ht_destroy(relayd_session_id_ht);
 
        rcu_read_unlock();
 }
@@ -2732,6 +2737,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto error;
                }
+               relayd->sessiond_session_id = (uint64_t) sessiond_id;
        }
 
        /* Poll on consumer socket. */
@@ -2781,7 +2787,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                 * grab the socket lock since the relayd object is not yet visible.
                 */
                ret = relayd_create_session(&relayd->control_sock,
-                               &relayd->session_id);
+                               &relayd->relayd_session_id);
                if (ret < 0) {
                        goto error;
                }
@@ -2793,10 +2799,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                        goto error;
                }
 
-               relayd_id_node->relayd_id = relayd->session_id;
+               relayd_id_node->relayd_id = relayd->relayd_session_id;
                relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
 
-               /* Indexed by session id of the session daemon. */
+               /* Indexed by session id of the sessiond. */
                lttng_ht_node_init_ulong(&relayd_id_node->node,
                                relayd_id_node->sessiond_id);
                rcu_read_lock();
@@ -2878,6 +2884,42 @@ end:
        return ret;
 }
 
+/*
+ * Search for a relayd associated to the session id and return the reference.
+ *
+ * A rcu read side lock MUST be acquire before calling this function and locked
+ * until the relayd object is no longer necessary.
+ */
+static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+       struct consumer_relayd_session_id *session_id_map;
+
+       /* Get the session id map. */
+       lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node == NULL) {
+               goto end;
+       }
+
+       session_id_map = caa_container_of(node, struct consumer_relayd_session_id,
+                       node);
+
+       /* Iterate over all relayd since they are indexed by net_seq_idx. */
+       cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+                       node.node) {
+               if (relayd->relayd_session_id == session_id_map->relayd_id) {
+                       /* Found the relayd. There can be only one per id. */
+                       break;
+               }
+       }
+
+end:
+       return relayd;
+}
+
 /*
  * Check if for a given session id there is still data needed to be extract
  * from the buffers.
@@ -2890,7 +2932,7 @@ int consumer_data_pending(uint64_t id)
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
-       struct consumer_relayd_sock_pair *relayd;
+       struct consumer_relayd_sock_pair *relayd = NULL;
        int (*data_pending)(struct lttng_consumer_stream *);
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
@@ -2914,6 +2956,19 @@ int consumer_data_pending(uint64_t id)
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
+       relayd = find_relayd_by_session_id(id);
+       if (relayd) {
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_begin_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       /* Communication error thus the relayd so no data pending. */
+                       goto data_not_pending;
+               }
+       }
+
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
                        ht->match_fct, (void *)((unsigned long) id),
@@ -2921,7 +2976,7 @@ int consumer_data_pending(uint64_t id)
                /* If this call fails, the stream is being used hence data pending. */
                ret = stream_try_lock(stream);
                if (!ret) {
-                       goto data_not_pending;
+                       goto data_pending;
                }
 
                /*
@@ -2936,24 +2991,12 @@ int consumer_data_pending(uint64_t id)
                        ret = data_pending(stream);
                        if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_not_pending;
+                               goto data_pending;
                        }
                }
 
                /* Relayd check */
-               if (stream->net_seq_idx != -1) {
-                       relayd = consumer_find_relayd(stream->net_seq_idx);
-                       if (!relayd) {
-                               /*
-                                * At this point, if the relayd object is not available for the
-                                * given stream, it is because the relayd is being cleaned up
-                                * so every stream associated with it (for a session id value)
-                                * are or will be marked for deletion hence no data pending.
-                                */
-                               pthread_mutex_unlock(&stream->lock);
-                               goto data_not_pending;
-                       }
-
+               if (relayd) {
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock);
@@ -2964,25 +3007,39 @@ int consumer_data_pending(uint64_t id)
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
-                               goto data_not_pending;
+                               goto data_pending;
                        }
                }
                pthread_mutex_unlock(&stream->lock);
        }
 
+       if (relayd) {
+               unsigned int is_data_inflight = 0;
+
+               /* Send init command for data pending. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_end_data_pending(&relayd->control_sock,
+                               relayd->relayd_session_id, &is_data_inflight);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0 || !is_data_inflight) {
+                       /* On error or if NO data inflight, no data is pending. */
+                       goto data_not_pending;
+               }
+       }
+
        /*
-        * Finding _no_ node in the hash table means that the stream(s) have been
-        * removed thus data is guaranteed to be available for analysis from the
-        * trace files. This is *only* true for local consumer and not network
-        * streaming.
+        * Finding _no_ node in the hash table and no inflight data means that the
+        * stream(s) have been removed thus data is guaranteed to be available for
+        * analysis from the trace files.
         */
 
+data_not_pending:
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
        rcu_read_unlock();
        return 0;
 
-data_not_pending:
+data_pending:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
        rcu_read_unlock();
index e9927c7d46556150eab2520af8fdf01f72c77e6e..7f0d0cc2134596ecf9a4debe2303add43579016c 100644 (file)
@@ -187,8 +187,9 @@ struct consumer_relayd_sock_pair {
        struct lttcomm_sock data_sock;
        struct lttng_ht_node_ulong node;
 
-       /* Session id on the relayd side for the sockets. */
-       uint64_t session_id;
+       /* Session id on both sides for the sockets. */
+       uint64_t relayd_session_id;
+       uint64_t sessiond_session_id;
 };
 
 /*
index 56ca98223a383bd6d9a36b288decea7c677505ac..43f91082385a951682f6ebaf6f839abf3e123a33 100644 (file)
@@ -503,3 +503,94 @@ int relayd_quiescent_control(struct lttcomm_sock *sock)
 error:
        return ret;
 }
+
+/*
+ * Begin a data pending command for a specific session id.
+ */
+int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id)
+{
+       int ret;
+       struct lttcomm_relayd_begin_data_pending msg;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(sock);
+
+       DBG("Relayd begin data pending");
+
+       msg.session_id = htobe64(id);
+
+       /* Send command */
+       ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Recevie response */
+       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -reply.ret_code;
+               ERR("Relayd begin data pending replied error %d", ret);
+               goto error;
+       }
+
+       return 0;
+
+error:
+       return ret;
+}
+
+/*
+ * End a data pending command for a specific session id.
+ *
+ * Return 0 on success and set is_data_inflight to 0 if no data is being
+ * streamed or 1 if it is the case.
+ */
+int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+               unsigned int *is_data_inflight)
+{
+       int ret;
+       struct lttcomm_relayd_end_data_pending msg;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(sock);
+
+       DBG("Relayd end data pending");
+
+       msg.session_id = htobe64(id);
+
+       /* Send command */
+       ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Recevie response */
+       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+       if (reply.ret_code < 0) {
+               ret = reply.ret_code;
+               goto error;
+       }
+
+       *is_data_inflight = reply.ret_code;
+
+       DBG("Relayd end data pending is data inflight: %d", reply.ret_code);
+
+       return 0;
+
+error:
+       return ret;
+}
index 9d65c3b8a2e71182c3400997530d78a829197d9d..2a46cd3eb117f3b4db23f308260953dfe97943e0 100644 (file)
@@ -39,5 +39,8 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
 int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
                uint64_t last_net_seq_num);
 int relayd_quiescent_control(struct lttcomm_sock *sock);
+int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id);
+int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+               unsigned int *is_data_inflight);
 
 #endif /* _RELAYD_H */
index 6cd9a21f4ca922b81cd8df4dd1ea41fb276a3321..525ec1a5c0271e1002ffb65be3b55bdf39657b8e 100644 (file)
@@ -125,4 +125,12 @@ struct lttcomm_relayd_data_pending {
        uint64_t last_net_seq_num; /* Sequence number of the last packet */
 } __attribute__ ((__packed__));
 
+struct lttcomm_relayd_begin_data_pending {
+       uint64_t session_id;
+} __attribute__ ((__packed__));
+
+struct lttcomm_relayd_end_data_pending {
+       uint64_t session_id;
+} __attribute__ ((__packed__));
+
 #endif /* _RELAYD_COMM */
index 2cd1d5407ea06b09d9fbbf685e973e0a08e42b9d..3842558e5a4b99bd524d50bdec37937e64aa7f0b 100644 (file)
@@ -91,6 +91,8 @@ enum lttcomm_sessiond_command {
        LTTNG_ENABLE_EVENT_WITH_FILTER      = 32,
        LTTNG_HEALTH_CHECK                  = 33,
        LTTNG_DATA_PENDING                  = 34,
+       RELAYD_BEGIN_DATA_PENDING           = 35,
+       RELAYD_END_DATA_PENDING             = 36,
 };
 
 /*
This page took 0.050285 seconds and 4 git commands to generate.