X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=1ea14f8ca5991af15f4d8b7d094ab9c85168c163;hb=02d02e31d47c091a38154c9c188c08387902d97b;hp=6b920b6b1ef63ee5d6e50cc4092b89bc498b7a9d;hpb=261de6373c70dcd52421642db5486747e9c10bae;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 6b920b6b1..1ea14f8ca 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -541,6 +541,16 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) consumer_stream_destroy(stream, metadata_ht); } +void consumer_stream_update_channel_attributes( + struct lttng_consumer_stream *stream, + struct lttng_consumer_channel *channel) +{ + stream->channel_read_only_attributes.tracefile_size = + channel->tracefile_size; + memcpy(stream->channel_read_only_attributes.path, channel->pathname, + sizeof(stream->channel_read_only_attributes.path)); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -1961,6 +1971,25 @@ end: return written; } +/* + * Sample the snapshot positions for a specific fd + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_sample_snapshot_positions(stream); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_sample_snapshot_positions(stream); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} /* * Take a snapshot for a specific fd * @@ -2002,6 +2031,27 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, } } +/* + * Get the consumed position (free-running counter position in bytes). + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_get_consumed_snapshot(stream, pos); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_get_consumed_snapshot(stream, pos); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -2226,6 +2276,73 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } +static +int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, + uint64_t key) +{ + ssize_t ret; + + do { + ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + PERROR("Failed to write to the channel rotation pipe"); + } else { + DBG("Sent channel rotation notification for channel key %" + PRIu64, key); + ret = 0; + } + + return (int) ret; +} + +/* + * Perform operations that need to be done after a stream has + * rotated and released the stream lock. + * + * Multiple rotations cannot occur simultaneously, so we know the state of the + * "rotated" stream flag cannot change. + * + * This MUST be called WITHOUT the stream lock held. + */ +static +int consumer_post_rotation(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + pthread_mutex_lock(&stream->chan->lock); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * The ust_metadata_pushed counter has been reset to 0, so now + * we can wakeup the metadata thread so it dumps the metadata + * cache to the new file. + */ + if (stream->metadata_flag) { + consumer_metadata_wakeup_pipe(stream->chan); + } + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + + if (--stream->chan->nr_stream_rotate_pending == 0) { + DBG("Rotation of channel \"%s\" completed, notifying the session daemon", + stream->chan->name); + ret = rotate_notify_sessiond(ctx, stream->chan->key); + } + assert(stream->chan->nr_stream_rotate_pending >= 0); + pthread_mutex_unlock(&stream->chan->lock); + + return ret; +} + /* * Thread polls on metadata file descriptor and write them on disk or on the * network. @@ -3275,6 +3392,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotate_ret; + bool rotated = false; pthread_mutex_lock(&stream->lock); if (stream->metadata_flag) { @@ -3283,11 +3402,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx); + ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx); + ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated); break; default: ERR("Unknown consumer_data type"); @@ -3301,6 +3420,14 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); + if (rotated) { + rotate_ret = consumer_post_rotation(stream, ctx); + if (rotate_ret < 0) { + ERR("Failed after a rotation"); + ret = -1; + } + } + return ret; } @@ -3786,6 +3913,219 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, return start_pos; } +/* + * Check if a stream is ready to be rotated after extracting it. + * + * Return 1 if it is ready for rotation, 0 if it is not, a negative value on + * error. Stream lock must be held. + */ +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) +{ + int ret; + unsigned long consumed_pos; + + if (!stream->rotate_position && !stream->rotate_ready) { + ret = 0; + goto end; + } + + if (stream->rotate_ready) { + ret = 1; + goto end; + } + + /* + * If we don't have the rotate_ready flag, check the consumed position + * to determine if we need to rotate. + */ + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Taking snapshot positions"); + goto end; + } + + ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumed snapshot position"); + goto end; + } + + /* Rotate position not reached yet (with check for overflow). */ + if ((long) (consumed_pos - stream->rotate_position) < 0) { + ret = 0; + goto end; + } + ret = 1; + +end: + return ret; +} + +/* + * Reset the state for a stream after a rotation occurred. + */ +void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream) +{ + stream->rotate_position = 0; + stream->rotate_ready = false; +} + +/* + * Perform the rotation a local stream file. + */ +int rotate_local_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + + DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64 " at path %s", + stream->key, + stream->chan->key, + stream->channel_read_only_attributes.path); + + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Closing trace file (fd %d), stream %" PRIu64, + stream->out_fd, stream->key); + assert(0); + goto error; + } + + ret = utils_create_stream_file( + stream->channel_read_only_attributes.path, + stream->name, + stream->channel_read_only_attributes.tracefile_size, + stream->tracefile_count_current, + stream->uid, stream->gid, NULL); + if (ret < 0) { + ERR("Rotate create stream file"); + goto error; + } + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + if (!stream->metadata_flag) { + struct lttng_index_file *index_file; + + lttng_index_file_put(stream->index_file); + + index_file = lttng_index_file_create( + stream->channel_read_only_attributes.path, + stream->name, stream->uid, stream->gid, + stream->channel_read_only_attributes.tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!index_file) { + ERR("Create index file during rotation"); + goto error; + } + stream->index_file = index_file; + stream->out_fd_offset = 0; + } + + ret = 0; + goto end; + +error: + ret = -1; +end: + return ret; + +} + +/* + * Perform the rotation a stream file on the relay. + */ +int rotate_relay_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + DBG("Rotate relay stream"); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_stream(&relayd->control_sock, + stream->relayd_stream_id, + stream->channel_read_only_attributes.path, + stream->chan->current_chunk_id, + stream->last_sequence_number); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret) { + ERR("Rotate relay stream"); + } + +end: + return ret; +} + +/* + * Performs the stream rotation for the rotate session feature if needed. + * It must be called with the stream lock held. + * + * Return 0 on success, a negative number of error. + */ +int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, bool *rotated) +{ + int ret; + + DBG("Consumer rotate stream %" PRIu64, stream->key); + + if (stream->net_seq_idx != (uint64_t) -1ULL) { + ret = rotate_relay_stream(ctx, stream); + } else { + ret = rotate_local_stream(ctx, stream); + } + if (ret < 0) { + ERR("Rotate stream"); + goto error; + } + + if (stream->metadata_flag) { + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Reset the position of what has been read from the metadata + * cache to 0 so we can dump it again. + */ + ret = kernctl_metadata_cache_dump(stream->wait_fd); + if (ret < 0) { + ERR("Failed to dump the kernel metadata cache after rotation"); + goto error; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Reset the position pushed from the metadata cache so it + * will write from the beginning on the next push. + */ + stream->ust_metadata_pushed = 0; + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + } + lttng_consumer_reset_stream_rotate_state(stream); + + if (rotated) { + *rotated = true; + } + + ret = 0; + +error: + return ret; +} + static int rotate_rename_local(const char *old_path, const char *new_path, uid_t uid, gid_t gid)