X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=8bcfa45ebd746fd6be782c6259790ba9391168f8;hp=1045bfb46101d8a6bb63a8c91157f3ffbf5199a2;hb=0e428499a49b2335f4859058739fa2b20f4c410f;hpb=534d2592c0566f7c07cd9e9f9385781a65566e16 diff --git a/src/common/consumer.c b/src/common/consumer.c index 1045bfb46..8bcfa45eb 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -63,21 +63,6 @@ volatile int consumer_quit; static struct lttng_ht *metadata_ht; static struct lttng_ht *data_ht; -/* - * This hash table contains the mapping between the session id of the sessiond - * and the relayd session id. Element of the ht are indexed by sessiond session - * id. - * - * Node can be added when a relayd communication is opened in the sessiond - * thread. - * - * Note that a session id of the session daemon is unique to a tracing session - * and not to a domain session. However, a domain session has one consumer - * which forces the 1-1 mapping between a consumer and a domain session (ex: - * UST). This means that we can't have duplicate in this ht. - */ -static struct lttng_ht *relayd_session_id_ht; - /* * Notify a thread pipe to poll back again. This usually means that some global * state has changed so we just send back the thread in a poll wait call. @@ -231,7 +216,6 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) { int ret; struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; if (relayd == NULL) { return; @@ -239,22 +223,6 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd) DBG("Consumer destroy and close relayd socket pair"); - /* Loockup for a relayd node in the session id map hash table. */ - lttng_ht_lookup(relayd_session_id_ht, - (void *)((unsigned long) relayd->sessiond_session_id), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - /* We assume the relayd is being or is destroyed */ - return; - } - - /* - * Try to delete it from the relayd session id ht. The return value is of - * no importance since either way we are going to try to delete the relayd - * from the global relayd_ht. - */ - lttng_ht_del(relayd_session_id_ht, &iter); - iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); if (ret != 0) { @@ -283,8 +251,6 @@ static void cleanup_relayd_ht(void) } lttng_ht_destroy(consumer_data.relayd_ht); - /* The destroy_relayd call makes sure that this ht is empty here. */ - lttng_ht_destroy(relayd_session_id_ht); rcu_read_unlock(); } @@ -789,7 +755,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, * this next value, 1 should always be substracted in order to compare * the last seen sequence number on the relayd side to the last sent. */ - data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); + data_hdr.net_seq_num = htobe64(stream->next_net_seq_num); /* Other fields are zeroed previously */ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, @@ -798,6 +764,8 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, goto error; } + ++stream->next_net_seq_num; + /* Set to go on data socket */ outfd = relayd->data_sock.fd; } @@ -2189,7 +2157,7 @@ restart: if (stream == NULL) { /* Check for deleted streams. */ validate_endpoint_status_metadata_stream(&events); - continue; + goto restart; } DBG("Adding metadata stream %d to poll set", @@ -2315,14 +2283,11 @@ void *consumer_thread_data_poll(void *data) */ pthread_mutex_lock(&consumer_data.lock); if (consumer_data.need_update) { - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_stream != NULL) { - free(local_stream); - local_stream = NULL; - } + free(pollfd); + pollfd = NULL; + + free(local_stream); + local_stream = NULL; /* allocate for all fds + 1 for the consumer_data_pipe */ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); @@ -2519,14 +2484,8 @@ void *consumer_thread_data_poll(void *data) } end: DBG("polling thread exiting"); - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_stream != NULL) { - free(local_stream); - local_stream = NULL; - } + free(pollfd); + free(local_stream); /* * Close the write side of the pipe so epoll_wait() in @@ -2730,7 +2689,6 @@ void lttng_consumer_init(void) consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } /* @@ -2747,7 +2705,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; - struct consumer_relayd_session_id *relayd_id_node; DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); @@ -2830,24 +2787,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, goto error; } - /* Set up a relayd session id node. */ - relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id)); - if (!relayd_id_node) { - PERROR("zmalloc relayd id node"); - ret = -1; - goto error; - } - - relayd_id_node->relayd_id = relayd->relayd_session_id; - relayd_id_node->sessiond_id = (uint64_t) sessiond_id; - - /* Indexed by session id of the sessiond. */ - lttng_ht_node_init_ulong(&relayd_id_node->node, - relayd_id_node->sessiond_id); - rcu_read_lock(); - lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node); - rcu_read_unlock(); - break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ @@ -2941,30 +2880,24 @@ end: static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) { struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; struct consumer_relayd_sock_pair *relayd = NULL; - struct consumer_relayd_session_id *session_id_map; - - /* Get the session id map. */ - lttng_ht_lookup(relayd_session_id_ht, (void *)((unsigned long) id), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - goto end; - } - - session_id_map = caa_container_of(node, struct consumer_relayd_session_id, - node); /* Iterate over all relayd since they are indexed by net_seq_idx. */ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { - if (relayd->relayd_session_id == session_id_map->relayd_id) { + /* + * Check by sessiond id which is unique here where the relayd session + * id might not be when having multiple relayd. + */ + if (relayd->sessiond_session_id == id) { /* Found the relayd. There can be only one per id. */ - break; + goto found; } } -end: + return NULL; + +found: return relayd; }