X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=c70d95a58bfc91a3a053c1d160be6ec1f8a0bba6;hp=6de72e2758b5086f35c36e96f69276eaa8dfbdf0;hb=ea6bf30f3684cf71633cb9f2c70ea0bc22b664c4;hpb=92816cc33a1add3c8276839bd6335e17423577dd diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 6de72e275..c70d95a58 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -2396,11 +2396,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & LPOLLIN) { ssize_t pipe_len; @@ -2990,11 +2985,6 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - if (pollfd == ctx->consumer_channel_pipe[0]) { if (revents & LPOLLIN) { enum consumer_channel_action action; @@ -3568,7 +3558,6 @@ error: /* Assign new file descriptor */ relayd->control_sock.sock.fd = fd; - fd = -1; /* For error path */ /* Assign version values. */ relayd->control_sock.major = relayd_sock->major; relayd->control_sock.minor = relayd_sock->minor; @@ -3596,7 +3585,6 @@ error: /* Assign new file descriptor */ relayd->data_sock.sock.fd = fd; - fd = -1; /* for eventual error paths */ /* Assign version values. */ relayd->data_sock.major = relayd_sock->major; relayd->data_sock.minor = relayd_sock->minor; @@ -3610,6 +3598,11 @@ error: DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); + /* + * We gave the ownership of the fd to the relayd structure. Set the + * fd to -1 so we don't call close() on it in the error path below. + */ + fd = -1; /* We successfully added the socket. Send status back. */ ret = consumer_send_status_msg(sock, ret_code); @@ -3896,15 +3889,16 @@ end: * is already at the rotate position (produced == consumed), we flag it as * ready for rotation. The rotation of ready streams occurs after we have * replied to the session daemon that we have finished sampling the positions. + * Must be called with RCU read-side lock held to ensure existence of channel. * * Returns 0 on success, < 0 on error */ -int lttng_consumer_rotate_channel(uint64_t key, const char *path, - uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id, +int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, + uint64_t key, const char *path, uint64_t relayd_id, + uint32_t metadata, uint64_t new_chunk_id, struct lttng_consumer_local_data *ctx) { int ret; - struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; struct lttng_ht_iter iter; struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; @@ -3913,13 +3907,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path, rcu_read_lock(); - channel = consumer_find_channel(key); - if (!channel) { - ERR("No channel found for key %" PRIu64, key); - ret = -1; - goto end; - } - pthread_mutex_lock(&channel->lock); channel->current_chunk_id = new_chunk_id; @@ -3984,7 +3971,6 @@ int lttng_consumer_rotate_channel(uint64_t key, const char *path, if (consumed_pos == stream->rotate_position) { stream->rotate_ready = true; } - channel->nr_stream_rotate_pending++; ret = consumer_flush_buffer(stream, 1); if (ret < 0) { @@ -4233,14 +4219,15 @@ error: * This is especially important for low throughput streams that have already * been consumed, we cannot wait for their next packet to perform the * rotation. + * Need to be called with RCU read-side lock held to ensure existence of + * channel. * * Returns 0 on success, < 0 on error */ -int lttng_consumer_rotate_ready_streams(uint64_t key, - struct lttng_consumer_local_data *ctx) +int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, + uint64_t key, struct lttng_consumer_local_data *ctx) { int ret; - struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; struct lttng_ht_iter iter; struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; @@ -4249,13 +4236,6 @@ int lttng_consumer_rotate_ready_streams(uint64_t key, DBG("Consumer rotate ready streams in channel %" PRIu64, key); - channel = consumer_find_channel(key); - if (!channel) { - ERR("No channel found for key %" PRIu64, key); - ret = -1; - goto end; - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter,