From: David Goulet Date: Thu, 13 Dec 2012 01:16:33 +0000 (-0500) Subject: Fix data pending for inflight streaming X-Git-Tag: v2.1.0~60 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=f7079f6790ccfb78ec7115ccb5b1706f5c18ebfe Fix data pending for inflight streaming The consumer_data_pending() function call had a bad label naming. The goto label data_not_pending was actually going to the return value of pending data (1). So, this patch fixes that by renaming the label to the right meaning. Add a missing destroy of the relayd session id mapping hash table. Acked-by: Mathieu Desnoyers Signed-off-by: David Goulet --- diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index 2a442b095..edd32d6ab 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -40,6 +40,11 @@ enum connection_type { * Represents a session for the relay point of view */ struct relay_session { + /* + * This session id is used to identify a set of stream to a tracing session + * but also make sure we have a unique session id associated with a session + * daemon which can provide multiple data source. + */ uint64_t id; struct lttcomm_sock *sock; }; @@ -58,6 +63,8 @@ struct relay_stream { /* Information telling us when to close the stream */ unsigned int close_flag:1; uint64_t last_net_seq_num; + /* Indicate if the stream was initialized for a data pending command. */ + unsigned int data_pending_check_done:1; }; /* diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 74e785e2f..71753c03d 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -455,6 +455,13 @@ int close_stream_check(struct relay_stream *stream) { if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) { + /* + * We are about to close the stream so set the data pending flag to 1 + * which will make the end data pending command skip the stream which + * is now closed and ready. Note that after proceeding to a file close, + * the written file is ready for reading. + */ + stream->data_pending_check_done = 1; return 1; } return 0; @@ -1426,6 +1433,9 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, ret = 1; } + /* Pending check is now done. */ + stream->data_pending_check_done = 1; + end_unlock: rcu_read_unlock(); @@ -1464,6 +1474,141 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, return ret; } +/* + * Initialize a data pending command. This means that a client is about to ask + * for data pending for each stream he/she holds. Simply iterate over all + * streams of a session and set the data_pending_check_done flag. + * + * This command returns to the client a LTTNG_OK code. + */ +static +int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + int ret; + struct lttng_ht_iter iter; + struct lttcomm_relayd_begin_data_pending msg; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + uint64_t session_id; + + assert(recv_hdr); + assert(cmd); + assert(streams_ht); + + DBG("Init streams for data pending"); + + if (!cmd->session || cmd->version_check_done == 0) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); + if (ret < sizeof(msg)) { + ERR("Relay didn't receive valid begin data_pending struct size: %d", + ret); + ret = -1; + goto end_no_session; + } + + session_id = be64toh(msg.session_id); + + /* + * Iterate over all streams to set the begin data pending flag. For now, the + * streams are indexed by stream handle so we have to iterate over all + * streams to find the one associated with the right session_id. + */ + rcu_read_lock(); + cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) { + if (stream->session->id == session_id) { + stream->data_pending_check_done = 0; + DBG("Set begin data pending flag to stream %" PRIu64, + stream->stream_handle); + } + } + rcu_read_unlock(); + + /* All good, send back reply. */ + reply.ret_code = htobe32(LTTNG_OK); + + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (ret < 0) { + ERR("Relay begin data pending send reply failed"); + } + +end_no_session: + return ret; +} + +/* + * End data pending command. This will check, for a given session id, if each + * stream associated with it has its data_pending_check_done flag set. If not, + * this means that the client lost track of the stream but the data is still + * being streamed on our side. In this case, we inform the client that data is + * inflight. + * + * Return to the client if there is data in flight or not with a ret_code. + */ +static +int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + int ret; + struct lttng_ht_iter iter; + struct lttcomm_relayd_end_data_pending msg; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + uint64_t session_id; + uint32_t is_data_inflight = 0; + + assert(recv_hdr); + assert(cmd); + assert(streams_ht); + + DBG("End data pending command"); + + if (!cmd->session || cmd->version_check_done == 0) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); + if (ret < sizeof(msg)) { + ERR("Relay didn't receive valid end data_pending struct size: %d", + ret); + ret = -1; + goto end_no_session; + } + + session_id = be64toh(msg.session_id); + + /* Iterate over all streams to see if the begin data pending flag is set. */ + rcu_read_lock(); + cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) { + if (stream->session->id == session_id && + !stream->data_pending_check_done) { + is_data_inflight = 1; + DBG("Data is still in flight for stream %" PRIu64, + stream->stream_handle); + break; + } + } + rcu_read_unlock(); + + /* All good, send back reply. */ + reply.ret_code = htobe32(is_data_inflight); + + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (ret < 0) { + ERR("Relay end data pending send reply failed"); + } + +end_no_session: + return ret; +} + /* * relay_process_control: Process the commands received on the control socket */ @@ -1498,6 +1643,12 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_QUIESCENT_CONTROL: ret = relay_quiescent_control(recv_hdr, cmd); break; + case RELAYD_BEGIN_DATA_PENDING: + ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht); + break; + case RELAYD_END_DATA_PENDING: + ret = relay_end_data_pending(recv_hdr, cmd, streams_ht); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); diff --git a/src/common/consumer.c b/src/common/consumer.c index b5e212f27..3ecb72e21 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -65,7 +65,8 @@ static struct lttng_ht *data_ht; /* * This hash table contains the mapping between the session id of the sessiond - * and the relayd session id. Element of the ht are indexed by sessiond_id. + * and the relayd session id. Element of the ht are indexed by sessiond session + * id. * * Node can be added when a relayd communication is opened in the sessiond * thread. @@ -238,19 +239,21 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) DBG("Consumer destroy and close relayd socket pair"); + /* Loockup for a relayd node in the session id map hash table. */ lttng_ht_lookup(relayd_session_id_ht, - (void *)((unsigned long) relayd->session_id), &iter); + (void *)((unsigned long) relayd->sessiond_session_id), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node != NULL) { /* We assume the relayd is being or is destroyed */ return; } - ret = lttng_ht_del(relayd_session_id_ht, &iter); - if (ret != 0) { - /* We assume the relayd is being or is destroyed */ - return; - } + /* + * Try to delete it from the relayd session id ht. The return value is of + * no importance since either way we are going to try to delete the relayd + * from the global relayd_ht. + */ + lttng_ht_del(relayd_session_id_ht, &iter); iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); @@ -280,6 +283,8 @@ static void cleanup_relayd_ht(void) } lttng_ht_destroy(consumer_data.relayd_ht); + /* The destroy_relayd call makes sure that this ht is empty here. */ + lttng_ht_destroy(relayd_session_id_ht); rcu_read_unlock(); } @@ -2732,6 +2737,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto error; } + relayd->sessiond_session_id = (uint64_t) sessiond_id; } /* Poll on consumer socket. */ @@ -2781,7 +2787,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, * grab the socket lock since the relayd object is not yet visible. */ ret = relayd_create_session(&relayd->control_sock, - &relayd->session_id); + &relayd->relayd_session_id); if (ret < 0) { goto error; } @@ -2793,10 +2799,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, goto error; } - relayd_id_node->relayd_id = relayd->session_id; + relayd_id_node->relayd_id = relayd->relayd_session_id; relayd_id_node->sessiond_id = (uint64_t) sessiond_id; - /* Indexed by session id of the session daemon. */ + /* Indexed by session id of the sessiond. */ lttng_ht_node_init_ulong(&relayd_id_node->node, relayd_id_node->sessiond_id); rcu_read_lock(); @@ -2878,6 +2884,42 @@ end: return ret; } +/* + * Search for a relayd associated to the session id and return the reference. + * + * A rcu read side lock MUST be acquire before calling this function and locked + * until the relayd object is no longer necessary. + */ +static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) +{ + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_session_id *session_id_map; + + /* Get the session id map. */ + lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + goto end; + } + + session_id_map = caa_container_of(node, struct consumer_relayd_session_id, + node); + + /* Iterate over all relayd since they are indexed by net_seq_idx. */ + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + if (relayd->relayd_session_id == session_id_map->relayd_id) { + /* Found the relayd. There can be only one per id. */ + break; + } + } + +end: + return relayd; +} + /* * Check if for a given session id there is still data needed to be extract * from the buffers. @@ -2890,7 +2932,7 @@ int consumer_data_pending(uint64_t id) struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; - struct consumer_relayd_sock_pair *relayd; + struct consumer_relayd_sock_pair *relayd = NULL; int (*data_pending)(struct lttng_consumer_stream *); DBG("Consumer data pending command on session id %" PRIu64, id); @@ -2914,6 +2956,19 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; + relayd = find_relayd_by_session_id(id); + if (relayd) { + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + /* Communication error thus the relayd so no data pending. */ + goto data_not_pending; + } + } + cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed), ht->match_fct, (void *)((unsigned long) id), @@ -2921,7 +2976,7 @@ int consumer_data_pending(uint64_t id) /* If this call fails, the stream is being used hence data pending. */ ret = stream_try_lock(stream); if (!ret) { - goto data_not_pending; + goto data_pending; } /* @@ -2936,24 +2991,12 @@ int consumer_data_pending(uint64_t id) ret = data_pending(stream); if (ret == 1) { pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } } /* Relayd check */ - if (stream->net_seq_idx != -1) { - relayd = consumer_find_relayd(stream->net_seq_idx); - if (!relayd) { - /* - * At this point, if the relayd object is not available for the - * given stream, it is because the relayd is being cleaned up - * so every stream associated with it (for a session id value) - * are or will be marked for deletion hence no data pending. - */ - pthread_mutex_unlock(&stream->lock); - goto data_not_pending; - } - + if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock); @@ -2964,25 +3007,39 @@ int consumer_data_pending(uint64_t id) pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } } pthread_mutex_unlock(&stream->lock); } + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_end_data_pending(&relayd->control_sock, + relayd->relayd_session_id, &is_data_inflight); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0 || !is_data_inflight) { + /* On error or if NO data inflight, no data is pending. */ + goto data_not_pending; + } + } + /* - * Finding _no_ node in the hash table means that the stream(s) have been - * removed thus data is guaranteed to be available for analysis from the - * trace files. This is *only* true for local consumer and not network - * streaming. + * Finding _no_ node in the hash table and no inflight data means that the + * stream(s) have been removed thus data is guaranteed to be available for + * analysis from the trace files. */ +data_not_pending: /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); return 0; -data_not_pending: +data_pending: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); diff --git a/src/common/consumer.h b/src/common/consumer.h index e9927c7d4..7f0d0cc21 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -187,8 +187,9 @@ struct consumer_relayd_sock_pair { struct lttcomm_sock data_sock; struct lttng_ht_node_ulong node; - /* Session id on the relayd side for the sockets. */ - uint64_t session_id; + /* Session id on both sides for the sockets. */ + uint64_t relayd_session_id; + uint64_t sessiond_session_id; }; /* diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 56ca98223..43f910823 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -503,3 +503,94 @@ int relayd_quiescent_control(struct lttcomm_sock *sock) error: return ret; } + +/* + * Begin a data pending command for a specific session id. + */ +int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id) +{ + int ret; + struct lttcomm_relayd_begin_data_pending msg; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(sock); + + DBG("Relayd begin data pending"); + + msg.session_id = htobe64(id); + + /* Send command */ + ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -reply.ret_code; + ERR("Relayd begin data pending replied error %d", ret); + goto error; + } + + return 0; + +error: + return ret; +} + +/* + * End a data pending command for a specific session id. + * + * Return 0 on success and set is_data_inflight to 0 if no data is being + * streamed or 1 if it is the case. + */ +int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id, + unsigned int *is_data_inflight) +{ + int ret; + struct lttcomm_relayd_end_data_pending msg; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(sock); + + DBG("Relayd end data pending"); + + msg.session_id = htobe64(id); + + /* Send command */ + ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + if (reply.ret_code < 0) { + ret = reply.ret_code; + goto error; + } + + *is_data_inflight = reply.ret_code; + + DBG("Relayd end data pending is data inflight: %d", reply.ret_code); + + return 0; + +error: + return ret; +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 9d65c3b8a..2a46cd3eb 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -39,5 +39,8 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock, int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id, uint64_t last_net_seq_num); int relayd_quiescent_control(struct lttcomm_sock *sock); +int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id); +int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id, + unsigned int *is_data_inflight); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index 6cd9a21f4..525ec1a5c 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -125,4 +125,12 @@ struct lttcomm_relayd_data_pending { uint64_t last_net_seq_num; /* Sequence number of the last packet */ } __attribute__ ((__packed__)); +struct lttcomm_relayd_begin_data_pending { + uint64_t session_id; +} __attribute__ ((__packed__)); + +struct lttcomm_relayd_end_data_pending { + uint64_t session_id; +} __attribute__ ((__packed__)); + #endif /* _RELAYD_COMM */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 2cd1d5407..3842558e5 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -91,6 +91,8 @@ enum lttcomm_sessiond_command { LTTNG_ENABLE_EVENT_WITH_FILTER = 32, LTTNG_HEALTH_CHECK = 33, LTTNG_DATA_PENDING = 34, + RELAYD_BEGIN_DATA_PENDING = 35, + RELAYD_END_DATA_PENDING = 36, }; /*