From: David Goulet Date: Fri, 19 Oct 2012 16:48:45 +0000 (-0400) Subject: Relayd data available command support X-Git-Tag: v2.1.0-rc5~9 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=c8f59ee5fc11492ef472dc5cfd2fd2c4926b1787;hp=806e2684ce24d3772af37ee46c5f0500c7a0723f Relayd data available command support Add the data available support on the relayd side and use it if the tracing session is streaming on the network. Signed-off-by: David Goulet --- diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 60908bfed..0f81d556d 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1305,6 +1305,102 @@ end: return ret; } +/* + * Check for data availability for a given stream id from the session daemon. + */ +static +int relay_data_available(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + struct relay_session *session = cmd->session; + struct lttcomm_relayd_data_available msg; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + int ret; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + uint64_t last_net_seq_num, stream_id; + + DBG("Data available command received"); + + if (!session || session->version_check_done == 0) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), MSG_WAITALL); + if (ret < sizeof(msg)) { + ERR("Relay didn't receive valid data_available struct size : %d", ret); + ret = -1; + goto end_no_session; + } + + stream_id = be64toh(msg.stream_id); + last_net_seq_num = be64toh(msg.last_net_seq_num); + + rcu_read_lock(); + lttng_ht_lookup(streams_ht, (void *)((unsigned long) stream_id), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay stream %" PRIu64 " not found", stream_id); + ret = -1; + goto end_unlock; + } + + stream = caa_container_of(node, struct relay_stream, stream_n); + assert(stream); + + DBG("Data available for stream id %" PRIu64 " prev_seq %" PRIu64 + " and last_seq %" PRIu64, stream_id, stream->prev_seq, + last_net_seq_num); + + if (stream->prev_seq == -1UL || stream->prev_seq <= last_net_seq_num) { + /* Data has in fact been written and is available */ + ret = 1; + } else { + /* Data still being streamed. */ + ret = 0; + } + +end_unlock: + rcu_read_unlock(); + + reply.ret_code = htobe32(ret); + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (ret < 0) { + ERR("Relay data available ret code failed"); + } + +end_no_session: + return ret; +} + +/* + * Wait for the control socket to reach a quiescent state. + * + * Note that for now, when receiving this command from the session daemon, this + * means that every subsequent commands or data received on the control socket + * has been handled. So, this is why we simply return OK here. + */ +static +int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd) +{ + int ret; + struct lttcomm_relayd_generic_reply reply; + + DBG("Checking quiescent state on control socket"); + + reply.ret_code = htobe32(LTTNG_OK); + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (ret < 0) { + ERR("Relay data available ret code failed"); + } + + return ret; +} + /* * relay_process_control: Process the commands received on the control socket */ @@ -1335,6 +1431,12 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_CLOSE_STREAM: ret = relay_close_stream(recv_hdr, cmd, streams_ht); break; + case RELAYD_DATA_AVAILABLE: + ret = relay_data_available(recv_hdr, cmd, streams_ht); + break; + case RELAYD_QUIESCENT_CONTROL: + ret = relay_quiescent_control(recv_hdr, cmd); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 3a4577ab0..061ec1219 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -720,6 +720,7 @@ int consumer_is_data_available(unsigned int id, DBG3("Consumer data available for id %u", id); + /* Send command for each consumer */ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, node.node) { /* Code flow error */ diff --git a/src/common/consumer.c b/src/common/consumer.c index 8d1a34025..8d897e5df 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1169,6 +1169,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1264,6 +1266,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( lttng_consumer_sync_trace_file(stream, orig_offset); end: + pthread_mutex_unlock(&stream->lock); /* Unlock only if ctrl socket used */ if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); @@ -1307,6 +1310,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* RCU lock for the relayd pointer */ rcu_read_lock(); + pthread_mutex_lock(&stream->lock); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1446,6 +1451,7 @@ splice_error: } end: + pthread_mutex_unlock(&stream->lock); if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } @@ -2451,6 +2457,7 @@ int consumer_data_available(uint64_t id) struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; + struct consumer_relayd_sock_pair *relayd; int (*data_available)(struct lttng_consumer_stream *); DBG("Consumer data available command on session id %" PRIu64, id); @@ -2470,10 +2477,13 @@ int consumer_data_available(uint64_t id) assert(0); } + rcu_read_lock(); + /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - cds_lfht_for_each_entry_duplicate(ht->ht, (long unsigned int) ht->hash_fct, + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct((void *)((unsigned long) id), 0x42UL), ht->match_fct, (void *)((unsigned long) id), &iter.iter, stream, node_session_id.node) { /* Check the stream for data. */ @@ -2481,9 +2491,26 @@ int consumer_data_available(uint64_t id) if (ret == 0) { goto data_not_available; } - } - /* TODO: Support to ask the relayd if the streams are remote */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + assert(relayd); + + pthread_mutex_lock(&stream->lock); + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->metadata_flag) { + ret = relayd_quiescent_control(&relayd->control_sock); + } else { + ret = relayd_data_available(&relayd->control_sock, + stream->relayd_stream_id, stream->next_net_seq_num); + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + if (ret == 0) { + goto data_not_available; + } + } + } /* * Finding _no_ node in the hash table means that the stream(s) have been @@ -2494,10 +2521,12 @@ int consumer_data_available(uint64_t id) /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); return 1; data_not_available: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); return 0; } diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 46413eda6..4e000fbd1 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -282,8 +282,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_DATA_AVAILABLE: { - rcu_read_unlock(); - return -ENOSYS; + int32_t ret; + uint64_t id = msg.u.data_available.session_id; + + DBG("Kernel consumer data available command for id %" PRIu64, id); + + ret = consumer_data_available(id); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send data available ret code"); + } + break; } default: goto end_nosignal; diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index cf3649397..785d3dc58 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -341,3 +341,95 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id, error: return ret; } + +/* + * Check for data availability for a given stream id. + * + * Return 0 if NOT available, 1 if so and a negative value on error. + */ +int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id, + uint64_t last_net_seq_num) +{ + int ret; + struct lttcomm_relayd_data_available msg; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(sock); + + DBG("Relayd data available for stream id %" PRIu64, stream_id); + + msg.stream_id = htobe64(stream_id); + msg.last_net_seq_num = htobe64(last_net_seq_num); + + /* Send command */ + ret = send_command(sock, RELAYD_DATA_AVAILABLE, (void *) &msg, + sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code >= LTTNG_OK) { + ret = -reply.ret_code; + ERR("Relayd data available replied error %d", ret); + } + + /* At this point, the ret code is either 1 or 0 */ + ret = reply.ret_code; + + DBG("Relayd data is %s available for stream id %" PRIu64, + ret == 1 ? "" : "NOT", stream_id); + +error: + return ret; +} + +/* + * Check on the relayd side for a quiescent state on the control socket. + */ +int relayd_quiescent_control(struct lttcomm_sock *sock) +{ + int ret; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(sock); + + DBG("Relayd checking quiescent control state"); + + /* Send command */ + ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, NULL, 0, 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -reply.ret_code; + ERR("Relayd quiescent control replied error %d", ret); + goto error; + } + + /* Control socket is quiescent */ + return 1; + +error: + return ret; +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index c6834df61..29903cfae 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -39,5 +39,8 @@ int relayd_start_data(struct lttcomm_sock *sock); int relayd_send_metadata(struct lttcomm_sock *sock, size_t len); int relayd_send_data_hdr(struct lttcomm_sock *sock, struct lttcomm_relayd_data_hdr *hdr, size_t size); +int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id, + uint64_t last_net_seq_num); +int relayd_quiescent_control(struct lttcomm_sock *sock); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index 5d4fddf5a..5be2328b9 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -107,4 +107,13 @@ struct lttcomm_relayd_close_stream { uint64_t last_net_seq_num; /* sequence number of last packet */ } __attribute__ ((__packed__)); +/* + * Used to test if for a given stream id the data is available on the relayd + * side for reading. + */ +struct lttcomm_relayd_data_available { + uint64_t stream_id; + uint64_t last_net_seq_num; /* Sequence number of the last packet */ +} __attribute__ ((__packed__)); + #endif /* _RELAYD_COMM */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index f60c5d238..5884fb842 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -85,6 +85,8 @@ enum lttcomm_sessiond_command { RELAYD_VERSION, RELAYD_SEND_METADATA, RELAYD_CLOSE_STREAM, + RELAYD_DATA_AVAILABLE, + RELAYD_QUIESCENT_CONTROL, LTTNG_SET_FILTER, LTTNG_HEALTH_CHECK, LTTNG_DATA_AVAILABLE, diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 1bafeee07..c2ad0fdd8 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -537,6 +537,8 @@ int lttng_ustconsumer_data_available(struct lttng_consumer_stream *stream) assert(stream); + DBG("UST consumer checking data availability"); + /* * Try to lock the stream mutex. On failure, we know that the stream is * being used else where hence there is data still being extracted.