Relayd data available command support
authorDavid Goulet <dgoulet@efficios.com>
Fri, 19 Oct 2012 16:48:45 +0000 (12:48 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 19 Oct 2012 18:55:15 +0000 (14:55 -0400)
Add the data available support on the relayd side and use it if the
tracing session is streaming on the network.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/main.c
src/bin/lttng-sessiond/consumer.c
src/common/consumer.c
src/common/kernel-consumer/kernel-consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 60908bfedc5aad85ed309b6be98d3560e831b18e..0f81d556dd4df73a910471756b75cf6567581a36 100644 (file)
@@ -1305,6 +1305,102 @@ end:
        return ret;
 }
 
        return ret;
 }
 
+/*
+ * Check for data availability for a given stream id from the session daemon.
+ */
+static
+int relay_data_available(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+       struct relay_session *session = cmd->session;
+       struct lttcomm_relayd_data_available msg;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+       int ret;
+       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_iter iter;
+       uint64_t last_net_seq_num, stream_id;
+
+       DBG("Data available command received");
+
+       if (!session || session->version_check_done == 0) {
+               ERR("Trying to check for data before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), MSG_WAITALL);
+       if (ret < sizeof(msg)) {
+               ERR("Relay didn't receive valid data_available struct size : %d", ret);
+               ret = -1;
+               goto end_no_session;
+       }
+
+       stream_id = be64toh(msg.stream_id);
+       last_net_seq_num = be64toh(msg.last_net_seq_num);
+
+       rcu_read_lock();
+       lttng_ht_lookup(streams_ht, (void *)((unsigned long) stream_id), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node == NULL) {
+               DBG("Relay stream %" PRIu64 " not found", stream_id);
+               ret = -1;
+               goto end_unlock;
+       }
+
+       stream = caa_container_of(node, struct relay_stream, stream_n);
+       assert(stream);
+
+       DBG("Data available for stream id %" PRIu64 " prev_seq %" PRIu64
+                       " and last_seq %" PRIu64, stream_id, stream->prev_seq,
+                       last_net_seq_num);
+
+       if (stream->prev_seq == -1UL || stream->prev_seq <= last_net_seq_num) {
+               /* Data has in fact been written and is available */
+               ret = 1;
+       } else {
+               /* Data still being streamed. */
+               ret = 0;
+       }
+
+end_unlock:
+       rcu_read_unlock();
+
+       reply.ret_code = htobe32(ret);
+       ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+       if (ret < 0) {
+               ERR("Relay data available ret code failed");
+       }
+
+end_no_session:
+       return ret;
+}
+
+/*
+ * Wait for the control socket to reach a quiescent state.
+ *
+ * Note that for now, when receiving this command from the session daemon, this
+ * means that every subsequent commands or data received on the control socket
+ * has been handled. So, this is why we simply return OK here.
+ */
+static
+int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_command *cmd)
+{
+       int ret;
+       struct lttcomm_relayd_generic_reply reply;
+
+       DBG("Checking quiescent state on control socket");
+
+       reply.ret_code = htobe32(LTTNG_OK);
+       ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+       if (ret < 0) {
+               ERR("Relay data available ret code failed");
+       }
+
+       return ret;
+}
+
 /*
  * relay_process_control: Process the commands received on the control socket
  */
 /*
  * relay_process_control: Process the commands received on the control socket
  */
@@ -1335,6 +1431,12 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_CLOSE_STREAM:
                ret = relay_close_stream(recv_hdr, cmd, streams_ht);
                break;
        case RELAYD_CLOSE_STREAM:
                ret = relay_close_stream(recv_hdr, cmd, streams_ht);
                break;
+       case RELAYD_DATA_AVAILABLE:
+               ret = relay_data_available(recv_hdr, cmd, streams_ht);
+               break;
+       case RELAYD_QUIESCENT_CONTROL:
+               ret = relay_quiescent_control(recv_hdr, cmd);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
index 3a4577ab0bb36dfa98aa655447526bfdf8717131..061ec1219c90ad5ef3bd0ea4e217424927864358 100644 (file)
@@ -720,6 +720,7 @@ int consumer_is_data_available(unsigned int id,
 
        DBG3("Consumer data available for id %u", id);
 
 
        DBG3("Consumer data available for id %u", id);
 
+       /* Send command for each consumer */
        cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
                        node.node) {
                /* Code flow error */
        cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
                        node.node) {
                /* Code flow error */
index 8d1a34025687da37b7c3c1555d80441c93d8761d..8d897e5df1e20e20c67ddbc6ed8f99af7e7de2aa 100644 (file)
@@ -1169,6 +1169,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
+       pthread_mutex_lock(&stream->lock);
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1264,6 +1266,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
 end:
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
 end:
+       pthread_mutex_unlock(&stream->lock);
        /* Unlock only if ctrl socket used */
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        /* Unlock only if ctrl socket used */
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
@@ -1307,6 +1310,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
+       pthread_mutex_lock(&stream->lock);
+
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != -1) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -1446,6 +1451,7 @@ splice_error:
        }
 
 end:
        }
 
 end:
+       pthread_mutex_unlock(&stream->lock);
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
        if (relayd && stream->metadata_flag) {
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
@@ -2451,6 +2457,7 @@ int consumer_data_available(uint64_t id)
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht;
        struct lttng_consumer_stream *stream;
+       struct consumer_relayd_sock_pair *relayd;
        int (*data_available)(struct lttng_consumer_stream *);
 
        DBG("Consumer data available command on session id %" PRIu64, id);
        int (*data_available)(struct lttng_consumer_stream *);
 
        DBG("Consumer data available command on session id %" PRIu64, id);
@@ -2470,10 +2477,13 @@ int consumer_data_available(uint64_t id)
                assert(0);
        }
 
                assert(0);
        }
 
+       rcu_read_lock();
+
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
        /* Ease our life a bit */
        ht = consumer_data.stream_list_ht;
 
-       cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct,
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct((void *)((unsigned long) id), 0x42UL),
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* Check the stream for data. */
                        ht->match_fct, (void *)((unsigned long) id),
                        &iter.iter, stream, node_session_id.node) {
                /* Check the stream for data. */
@@ -2481,9 +2491,26 @@ int consumer_data_available(uint64_t id)
                if (ret == 0) {
                        goto data_not_available;
                }
                if (ret == 0) {
                        goto data_not_available;
                }
-       }
 
 
-       /* TODO: Support to ask the relayd if the streams are remote */
+               if (stream->net_seq_idx != -1) {
+                       relayd = consumer_find_relayd(stream->net_seq_idx);
+                       assert(relayd);
+
+                       pthread_mutex_lock(&stream->lock);
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       if (stream->metadata_flag) {
+                               ret = relayd_quiescent_control(&relayd->control_sock);
+                       } else {
+                               ret = relayd_data_available(&relayd->control_sock,
+                                               stream->relayd_stream_id, stream->next_net_seq_num);
+                       }
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       pthread_mutex_unlock(&stream->lock);
+                       if (ret == 0) {
+                               goto data_not_available;
+                       }
+               }
+       }
 
        /*
         * Finding _no_ node in the hash table means that the stream(s) have been
 
        /*
         * Finding _no_ node in the hash table means that the stream(s) have been
@@ -2494,10 +2521,12 @@ int consumer_data_available(uint64_t id)
 
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
 
        /* Data is available to be read by a viewer. */
        pthread_mutex_unlock(&consumer_data.lock);
+       rcu_read_unlock();
        return 1;
 
 data_not_available:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
        return 1;
 
 data_not_available:
        /* Data is still being extracted from buffers. */
        pthread_mutex_unlock(&consumer_data.lock);
+       rcu_read_unlock();
        return 0;
 }
        return 0;
 }
index 46413eda6e710d9e2f934df9e49336a5fd6fcef0..4e000fbd167e294c6b9139b6b7bdd2cf0098d6f1 100644 (file)
@@ -282,8 +282,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_DATA_AVAILABLE:
        {
        }
        case LTTNG_CONSUMER_DATA_AVAILABLE:
        {
-               rcu_read_unlock();
-               return -ENOSYS;
+               int32_t ret;
+               uint64_t id = msg.u.data_available.session_id;
+
+               DBG("Kernel consumer data available command for id %" PRIu64, id);
+
+               ret = consumer_data_available(id);
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               if (ret < 0) {
+                       PERROR("send data available ret code");
+               }
+               break;
        }
        default:
                goto end_nosignal;
        }
        default:
                goto end_nosignal;
index cf3649397425840b9c0c0e0ae07d22073255f0fa..785d3dc584bd66d2d825a0df09f9e63d0e49cc02 100644 (file)
@@ -341,3 +341,95 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
 error:
        return ret;
 }
 error:
        return ret;
 }
+
+/*
+ * Check for data availability for a given stream id.
+ *
+ * Return 0 if NOT available, 1 if so and a negative value on error.
+ */
+int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id,
+               uint64_t last_net_seq_num)
+{
+       int ret;
+       struct lttcomm_relayd_data_available msg;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(sock);
+
+       DBG("Relayd data available for stream id %" PRIu64, stream_id);
+
+       msg.stream_id = htobe64(stream_id);
+       msg.last_net_seq_num = htobe64(last_net_seq_num);
+
+       /* Send command */
+       ret = send_command(sock, RELAYD_DATA_AVAILABLE, (void *) &msg,
+                       sizeof(msg), 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Recevie response */
+       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code >= LTTNG_OK) {
+               ret = -reply.ret_code;
+               ERR("Relayd data available replied error %d", ret);
+       }
+
+       /* At this point, the ret code is either 1 or 0 */
+       ret = reply.ret_code;
+
+       DBG("Relayd data is %s available for stream id %" PRIu64,
+                       ret == 1 ? "" : "NOT", stream_id);
+
+error:
+       return ret;
+}
+
+/*
+ * Check on the relayd side for a quiescent state on the control socket.
+ */
+int relayd_quiescent_control(struct lttcomm_sock *sock)
+{
+       int ret;
+       struct lttcomm_relayd_generic_reply reply;
+
+       /* Code flow error. Safety net. */
+       assert(sock);
+
+       DBG("Relayd checking quiescent control state");
+
+       /* Send command */
+       ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, NULL, 0, 0);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Recevie response */
+       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -reply.ret_code;
+               ERR("Relayd quiescent control replied error %d", ret);
+               goto error;
+       }
+
+       /* Control socket is quiescent */
+       return 1;
+
+error:
+       return ret;
+}
index c6834df6114491f78c4e4c7960c54090d44b2831..29903cfae679e6bdb76017b4f23e7e2cc41d17a6 100644 (file)
@@ -39,5 +39,8 @@ int relayd_start_data(struct lttcomm_sock *sock);
 int relayd_send_metadata(struct lttcomm_sock *sock, size_t len);
 int relayd_send_data_hdr(struct lttcomm_sock *sock,
                struct lttcomm_relayd_data_hdr *hdr, size_t size);
 int relayd_send_metadata(struct lttcomm_sock *sock, size_t len);
 int relayd_send_data_hdr(struct lttcomm_sock *sock,
                struct lttcomm_relayd_data_hdr *hdr, size_t size);
+int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id,
+               uint64_t last_net_seq_num);
+int relayd_quiescent_control(struct lttcomm_sock *sock);
 
 #endif /* _RELAYD_H */
 
 #endif /* _RELAYD_H */
index 5d4fddf5a50ff06a4cabaad185c0a3642173decf..5be2328b9794676462f1c0cd96a588a0e97fc406 100644 (file)
@@ -107,4 +107,13 @@ struct lttcomm_relayd_close_stream {
        uint64_t last_net_seq_num;      /* sequence number of last packet */
 } __attribute__ ((__packed__));
 
        uint64_t last_net_seq_num;      /* sequence number of last packet */
 } __attribute__ ((__packed__));
 
+/*
+ * Used to test if for a given stream id the data is available on the relayd
+ * side for reading.
+ */
+struct lttcomm_relayd_data_available {
+       uint64_t stream_id;
+       uint64_t last_net_seq_num; /* Sequence number of the last packet */
+} __attribute__ ((__packed__));
+
 #endif /* _RELAYD_COMM */
 #endif /* _RELAYD_COMM */
index f60c5d238a364037fa21a75b42ac128910ce3d61..5884fb8423006c3b1ad2474d2689625b5e6b7018 100644 (file)
@@ -85,6 +85,8 @@ enum lttcomm_sessiond_command {
        RELAYD_VERSION,
        RELAYD_SEND_METADATA,
        RELAYD_CLOSE_STREAM,
        RELAYD_VERSION,
        RELAYD_SEND_METADATA,
        RELAYD_CLOSE_STREAM,
+       RELAYD_DATA_AVAILABLE,
+       RELAYD_QUIESCENT_CONTROL,
        LTTNG_SET_FILTER,
        LTTNG_HEALTH_CHECK,
        LTTNG_DATA_AVAILABLE,
        LTTNG_SET_FILTER,
        LTTNG_HEALTH_CHECK,
        LTTNG_DATA_AVAILABLE,
index 1bafeee07daccb79ef8f5377256ec6ed78257018..c2ad0fdd8cef613f170f4c43673d5d356fde8f93 100644 (file)
@@ -537,6 +537,8 @@ int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
 
        assert(stream);
 
+       DBG("UST consumer checking data availability");
+
        /*
         * Try to lock the stream mutex. On failure, we know that the stream is
         * being used else where hence there is data still being extracted.
        /*
         * Try to lock the stream mutex. On failure, we know that the stream is
         * being used else where hence there is data still being extracted.
This page took 0.046002 seconds and 4 git commands to generate.