X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=ede214c8025b264fcdf5a538a565b2ab5b710eed;hb=6347a8b5c62a93c52cb11a68598068abb85ad930;hp=e56afa78c78f020e6fc8d71a1d5b8a3c1a6ec5f6;hpb=ad0b0d2321fc4e798a2aaf7c17568e3afdf41834;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index e56afa78c..ede214c80 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -250,6 +250,32 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) return channel; } +/* + * There is a possibility that the consumer does not have enough time between + * the close of the channel on the session daemon and the cleanup in here thus + * once we have a channel add with an existing key, we know for sure that this + * channel will eventually get cleaned up by all streams being closed. + * + * This function just nullifies the already existing channel key. + */ +static void steal_channel_key(uint64_t key) +{ + struct lttng_consumer_channel *channel; + + rcu_read_lock(); + channel = consumer_find_channel(key); + if (channel) { + channel->key = (uint64_t) -1ULL; + /* + * We don't want the lookup to match, but we still need to iterate on + * this channel when iterating over the hash table. Just change the + * node key. + */ + channel->node.key = (uint64_t) -1ULL; + } + rcu_read_unlock(); +} + static void free_channel_rcu(struct rcu_head *head) { struct lttng_ht_node_u64 *node = @@ -979,43 +1005,35 @@ end: /* * Add a channel to the global list protected by a mutex. * - * On success 0 is returned else a negative value. + * Always return 0 indicating success. */ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx) { - int ret = 0; - struct lttng_ht_node_u64 *node; - struct lttng_ht_iter iter; - pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->timer_lock); - rcu_read_lock(); - lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); - if (node != NULL) { - /* Channel already exist. Ignore the insertion */ - ERR("Consumer add channel key %" PRIu64 " already exists!", - channel->key); - ret = -EEXIST; - goto end; - } + /* + * This gives us a guarantee that the channel we are about to add to the + * channel hash table will be unique. See this function comment on the why + * we need to steel the channel key at this stage. + */ + steal_channel_key(channel->key); + rcu_read_lock(); lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node); - -end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->timer_lock); pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); - if (!ret && channel->wait_fd != -1 && - channel->type == CONSUMER_CHANNEL_TYPE_DATA) { + if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) { notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD); } - return ret; + + return 0; } /* @@ -1077,8 +1095,8 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, } /* - * Poll on the should_quit pipe and the command socket return -1 on error and - * should exit, 0 if data is available on the command socket + * Poll on the should_quit pipe and the command socket return -1 on + * error, 1 if should exit, 0 if data is available on the command socket */ int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll) { @@ -1094,16 +1112,13 @@ restart: goto restart; } PERROR("Poll error"); - goto exit; + return -1; } if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) { DBG("consumer_should_quit wake up"); - goto exit; + return 1; } return 0; - -exit: - return -1; } /* @@ -2926,8 +2941,8 @@ static int set_metadata_socket(struct lttng_consumer_local_data *ctx, assert(ctx); assert(sockpoll); - if (lttng_consumer_poll_socket(sockpoll) < 0) { - ret = -1; + ret = lttng_consumer_poll_socket(sockpoll); + if (ret) { goto error; } DBG("Metadata connection on client_socket"); @@ -2996,7 +3011,12 @@ void *consumer_thread_sessiond_poll(void *data) consumer_sockpoll[1].fd = client_socket; consumer_sockpoll[1].events = POLLIN | POLLPRI; - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + ret = lttng_consumer_poll_socket(consumer_sockpoll); + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } DBG("Connection on client_socket"); @@ -3013,7 +3033,11 @@ void *consumer_thread_sessiond_poll(void *data) * command unix socket. */ ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket); - if (ret < 0) { + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } @@ -3034,15 +3058,15 @@ void *consumer_thread_sessiond_poll(void *data) health_poll_entry(); ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); - if (ret < 0) { + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } DBG("Incoming command on sock"); ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll); - if (ret == -ENOENT) { - DBG("Received STOP command"); - goto end; - } if (ret <= 0) { /* * This could simply be a session daemon quitting. Don't output @@ -3260,7 +3284,9 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, } /* Poll on consumer socket. */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + ret = lttng_consumer_poll_socket(consumer_sockpoll); + if (ret) { + /* Needing to exit in the middle of a command: error. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); ret = -EINTR; goto error_nosignal; @@ -3595,6 +3621,7 @@ int consumer_send_status_msg(int sock, int ret_code) { struct lttcomm_consumer_status_msg msg; + memset(&msg, 0, sizeof(msg)); msg.ret_code = ret_code; return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); @@ -3612,6 +3639,7 @@ int consumer_send_status_channel(int sock, assert(sock >= 0); + memset(&msg, 0, sizeof(msg)); if (!channel) { msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; } else {