X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=3ecb72e21e52af045b3fdb956e0ec614a4c570dd;hp=b5e212f27d61c5f08f53ef32fb19ac91c1e55917;hb=f7079f6790ccfb78ec7115ccb5b1706f5c18ebfe;hpb=46e6455f9dbe3bbe9b39f9e7b55dde228f6e3dbd diff --git a/src/common/consumer.c b/src/common/consumer.c index b5e212f27..3ecb72e21 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -65,7 +65,8 @@ static struct lttng_ht *data_ht; /* * This hash table contains the mapping between the session id of the sessiond - * and the relayd session id. Element of the ht are indexed by sessiond_id. + * and the relayd session id. Element of the ht are indexed by sessiond session + * id. * * Node can be added when a relayd communication is opened in the sessiond * thread. @@ -238,19 +239,21 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) DBG("Consumer destroy and close relayd socket pair"); + /* Loockup for a relayd node in the session id map hash table. */ lttng_ht_lookup(relayd_session_id_ht, - (void *)((unsigned long) relayd->session_id), &iter); + (void *)((unsigned long) relayd->sessiond_session_id), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node != NULL) { /* We assume the relayd is being or is destroyed */ return; } - ret = lttng_ht_del(relayd_session_id_ht, &iter); - if (ret != 0) { - /* We assume the relayd is being or is destroyed */ - return; - } + /* + * Try to delete it from the relayd session id ht. The return value is of + * no importance since either way we are going to try to delete the relayd + * from the global relayd_ht. + */ + lttng_ht_del(relayd_session_id_ht, &iter); iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); @@ -280,6 +283,8 @@ static void cleanup_relayd_ht(void) } lttng_ht_destroy(consumer_data.relayd_ht); + /* The destroy_relayd call makes sure that this ht is empty here. */ + lttng_ht_destroy(relayd_session_id_ht); rcu_read_unlock(); } @@ -2732,6 +2737,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto error; } + relayd->sessiond_session_id = (uint64_t) sessiond_id; } /* Poll on consumer socket. */ @@ -2781,7 +2787,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, * grab the socket lock since the relayd object is not yet visible. */ ret = relayd_create_session(&relayd->control_sock, - &relayd->session_id); + &relayd->relayd_session_id); if (ret < 0) { goto error; } @@ -2793,10 +2799,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, goto error; } - relayd_id_node->relayd_id = relayd->session_id; + relayd_id_node->relayd_id = relayd->relayd_session_id; relayd_id_node->sessiond_id = (uint64_t) sessiond_id; - /* Indexed by session id of the session daemon. */ + /* Indexed by session id of the sessiond. */ lttng_ht_node_init_ulong(&relayd_id_node->node, relayd_id_node->sessiond_id); rcu_read_lock(); @@ -2878,6 +2884,42 @@ end: return ret; } +/* + * Search for a relayd associated to the session id and return the reference. + * + * A rcu read side lock MUST be acquire before calling this function and locked + * until the relayd object is no longer necessary. + */ +static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) +{ + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_session_id *session_id_map; + + /* Get the session id map. */ + lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + goto end; + } + + session_id_map = caa_container_of(node, struct consumer_relayd_session_id, + node); + + /* Iterate over all relayd since they are indexed by net_seq_idx. */ + cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, + node.node) { + if (relayd->relayd_session_id == session_id_map->relayd_id) { + /* Found the relayd. There can be only one per id. */ + break; + } + } + +end: + return relayd; +} + /* * Check if for a given session id there is still data needed to be extract * from the buffers. @@ -2890,7 +2932,7 @@ int consumer_data_pending(uint64_t id) struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; - struct consumer_relayd_sock_pair *relayd; + struct consumer_relayd_sock_pair *relayd = NULL; int (*data_pending)(struct lttng_consumer_stream *); DBG("Consumer data pending command on session id %" PRIu64, id); @@ -2914,6 +2956,19 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; + relayd = find_relayd_by_session_id(id); + if (relayd) { + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + /* Communication error thus the relayd so no data pending. */ + goto data_not_pending; + } + } + cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed), ht->match_fct, (void *)((unsigned long) id), @@ -2921,7 +2976,7 @@ int consumer_data_pending(uint64_t id) /* If this call fails, the stream is being used hence data pending. */ ret = stream_try_lock(stream); if (!ret) { - goto data_not_pending; + goto data_pending; } /* @@ -2936,24 +2991,12 @@ int consumer_data_pending(uint64_t id) ret = data_pending(stream); if (ret == 1) { pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } } /* Relayd check */ - if (stream->net_seq_idx != -1) { - relayd = consumer_find_relayd(stream->net_seq_idx); - if (!relayd) { - /* - * At this point, if the relayd object is not available for the - * given stream, it is because the relayd is being cleaned up - * so every stream associated with it (for a session id value) - * are or will be marked for deletion hence no data pending. - */ - pthread_mutex_unlock(&stream->lock); - goto data_not_pending; - } - + if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock); @@ -2964,25 +3007,39 @@ int consumer_data_pending(uint64_t id) pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { pthread_mutex_unlock(&stream->lock); - goto data_not_pending; + goto data_pending; } } pthread_mutex_unlock(&stream->lock); } + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_end_data_pending(&relayd->control_sock, + relayd->relayd_session_id, &is_data_inflight); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0 || !is_data_inflight) { + /* On error or if NO data inflight, no data is pending. */ + goto data_not_pending; + } + } + /* - * Finding _no_ node in the hash table means that the stream(s) have been - * removed thus data is guaranteed to be available for analysis from the - * trace files. This is *only* true for local consumer and not network - * streaming. + * Finding _no_ node in the hash table and no inflight data means that the + * stream(s) have been removed thus data is guaranteed to be available for + * analysis from the trace files. */ +data_not_pending: /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock(); return 0; -data_not_pending: +data_pending: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&consumer_data.lock); rcu_read_unlock();