X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=afc346333c5dc780a57c2bb86a548a9b0c60d82f;hp=8e6b63b011436a06a80a455aad802d23276a4470;hb=0b50e4b3fb9859af7072adcca784684834e5f8d1;hpb=d73bf3d793ee0b0c5b56cb47cb50c27d1789d3bd diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 8e6b63b01..afc346333 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -562,7 +562,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor) + unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttng_consumer_stream *stream; @@ -589,6 +590,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_file = NULL; stream->last_sequence_number = -1ULL; + stream->trace_archive_id = trace_archive_id; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -806,7 +808,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, stream->name, path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count); + stream->chan->tracefile_size, stream->chan->tracefile_count, + stream->trace_archive_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { goto end; @@ -1475,7 +1478,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) */ static int write_relayd_metadata_id(int fd, struct lttng_consumer_stream *stream, - struct consumer_relayd_sock_pair *relayd, unsigned long padding) + unsigned long padding) { ssize_t ret; struct lttcomm_relayd_metadata_payload hdr; @@ -1610,7 +1613,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* Write metadata stream id before payload */ if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd, padding); + ret = write_relayd_metadata_id(outfd, stream, padding); if (ret < 0) { relayd_hang_up = 1; goto write_error; @@ -1799,7 +1802,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } stream->reset_metadata_flag = 0; } - ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, + ret = write_relayd_metadata_id(splice_pipe[1], stream, padding); if (ret < 0) { written = ret; @@ -3392,6 +3395,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotate_ret; + bool rotated = false; pthread_mutex_lock(&stream->lock); if (stream->metadata_flag) { @@ -3400,11 +3405,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx); + ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx); + ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated); break; default: ERR("Unknown consumer_data type"); @@ -3418,6 +3423,14 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); + if (rotated) { + rotate_ret = consumer_post_rotation(stream, ctx); + if (rotate_ret < 0) { + ERR("Failed after a rotation"); + ret = -1; + } + } + return ret; } @@ -3903,6 +3916,198 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, return start_pos; } +static +int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustctl_flush_buffer(stream, producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} + +/* + * Sample the rotate position for all the streams of a channel. If a stream + * is already at the rotate position (produced == consumed), we flag it as + * ready for rotation. The rotation of ready streams occurs after we have + * replied to the session daemon that we have finished sampling the positions. + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_rotate_channel(uint64_t key, const char *path, + uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + DBG("Consumer sample rotate position for channel %" PRIu64, key); + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + + pthread_mutex_lock(&channel->lock); + channel->current_chunk_id = new_chunk_id; + + ret = lttng_strncpy(channel->pathname, path, sizeof(channel->pathname)); + if (ret) { + ERR("Failed to copy new path to channel during channel rotation"); + ret = -1; + goto end_unlock_channel; + } + + if (relayd_id == -1ULL) { + /* + * The domain path (/ust or /kernel) has been created before, we + * now need to create the last part of the path: the application/user + * specific section (uid/1000/64-bit). + */ + ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG, + channel->uid, channel->gid); + if (ret < 0) { + ERR("Failed to create trace directory at %s during rotation", + channel->pathname); + ret = -1; + goto end_unlock_channel; + } + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + unsigned long consumed_pos; + + health_code_update(); + + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + + ret = lttng_strncpy(stream->channel_read_only_attributes.path, + channel->pathname, + sizeof(stream->channel_read_only_attributes.path)); + if (ret) { + ERR("Failed to sample channel path name during channel rotation"); + goto end_unlock_stream; + } + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to sample snapshot position during channel rotation"); + goto end_unlock_stream; + } + + ret = lttng_consumer_get_produced_snapshot(stream, + &stream->rotate_position); + if (ret < 0) { + ERR("Failed to sample produced position during channel rotation"); + goto end_unlock_stream; + } + + lttng_consumer_get_consumed_snapshot(stream, + &consumed_pos); + if (consumed_pos == stream->rotate_position) { + stream->rotate_ready = true; + } + channel->nr_stream_rotate_pending++; + + ret = consumer_flush_buffer(stream, 1); + if (ret < 0) { + ERR("Failed to flush stream %" PRIu64 " during channel rotation", + stream->key); + goto end_unlock_stream; + } + + pthread_mutex_unlock(&stream->lock); + } + pthread_mutex_unlock(&channel->lock); + + ret = 0; + goto end; + +end_unlock_stream: + pthread_mutex_unlock(&stream->lock); +end_unlock_channel: + pthread_mutex_unlock(&channel->lock); +end: + rcu_read_unlock(); + return ret; +} + +/* + * Check if a stream is ready to be rotated after extracting it. + * + * Return 1 if it is ready for rotation, 0 if it is not, a negative value on + * error. Stream lock must be held. + */ +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) +{ + int ret; + unsigned long consumed_pos; + + if (!stream->rotate_position && !stream->rotate_ready) { + ret = 0; + goto end; + } + + if (stream->rotate_ready) { + ret = 1; + goto end; + } + + /* + * If we don't have the rotate_ready flag, check the consumed position + * to determine if we need to rotate. + */ + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Taking snapshot positions"); + goto end; + } + + ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumed snapshot position"); + goto end; + } + + /* Rotate position not reached yet (with check for overflow). */ + if ((long) (consumed_pos - stream->rotate_position) < 0) { + ret = 0; + goto end; + } + ret = 1; + +end: + return ret; +} + /* * Reset the state for a stream after a rotation occurred. */ @@ -4014,7 +4219,7 @@ end: * Return 0 on success, a negative number of error. */ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) + struct lttng_consumer_stream *stream, bool *rotated) { int ret; @@ -4058,12 +4263,78 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, } lttng_consumer_reset_stream_rotate_state(stream); + if (rotated) { + *rotated = true; + } + ret = 0; error: return ret; } +/* + * Rotate all the ready streams now. + * + * This is especially important for low throughput streams that have already + * been consumed, we cannot wait for their next packet to perform the + * rotation. + * + * Returns 0 on success, < 0 on error + */ +int lttng_consumer_rotate_ready_streams(uint64_t key, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + + DBG("Consumer rotate ready streams in channel %" PRIu64, key); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + health_code_update(); + + pthread_mutex_lock(&stream->lock); + + if (!stream->rotate_ready) { + pthread_mutex_unlock(&stream->lock); + continue; + } + DBG("Consumer rotate ready stream %" PRIu64, stream->key); + + ret = lttng_consumer_rotate_stream(ctx, stream, NULL); + pthread_mutex_unlock(&stream->lock); + if (ret) { + goto end; + } + + ret = consumer_post_rotation(stream, ctx); + if (ret) { + goto end; + } + } + + ret = 0; + +end: + rcu_read_unlock(); + return ret; +} + static int rotate_rename_local(const char *old_path, const char *new_path, uid_t uid, gid_t gid) @@ -4121,6 +4392,27 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, } } +int lttng_consumer_rotate_pending_relay(uint64_t session_id, + uint64_t relayd_id, uint64_t chunk_id) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd"); + ret = -1; + goto end; + } + + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + +end: + return ret; +} + static int mkdir_local(const char *path, uid_t uid, gid_t gid) {