X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=7a7f7954be3c843df01bfea1af6d3c815e345773;hp=3c20c57bda9d68ac1967de0cf433901ab8cce7e6;hb=d295668767ac8234e83984e1812d342d03293d88;hpb=fb9a95c4d6242bd8336b638c90a7d8f846125659 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 3c20c57bd..7a7f7954b 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,9 @@ #include #include #include +#include +#include +#include struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -358,7 +362,6 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) */ void consumer_del_channel(struct lttng_consumer_channel *channel) { - int ret; struct lttng_ht_iter iter; DBG("Consumer delete channel key %" PRIu64, channel->key); @@ -389,17 +392,25 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } - rcu_read_lock(); - iter.iter.node = &channel->node.node; - ret = lttng_ht_del(consumer_data.channel_ht, &iter); - assert(!ret); + lttng_trace_chunk_put(channel->trace_chunk); + channel->trace_chunk = NULL; - iter.iter.node = &channel->channels_by_session_id_ht_node.node; - ret = lttng_ht_del(consumer_data.channels_by_session_id_ht, &iter); - assert(!ret); - rcu_read_unlock(); + if (channel->is_published) { + int ret; + + rcu_read_lock(); + iter.iter.node = &channel->node.node; + ret = lttng_ht_del(consumer_data.channel_ht, &iter); + assert(!ret); - call_rcu(&channel->node.head, free_channel_rcu); + iter.iter.node = &channel->channels_by_session_id_ht_node.node; + ret = lttng_ht_del(consumer_data.channels_by_session_id_ht, + &iter); + assert(!ret); + rcu_read_unlock(); + } + + call_rcu(&channel->node.head, free_channel_rcu); end: pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); @@ -549,23 +560,18 @@ void consumer_stream_update_channel_attributes( { stream->channel_read_only_attributes.tracefile_size = channel->tracefile_size; - memcpy(stream->channel_read_only_attributes.path, channel->pathname, - sizeof(stream->channel_read_only_attributes.path)); } struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, - enum lttng_consumer_stream_state state, const char *channel_name, - uid_t uid, - gid_t gid, uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor, - uint64_t trace_archive_id) + unsigned int monitor) { int ret; struct lttng_consumer_stream *stream; @@ -577,22 +583,24 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, goto end; } - rcu_read_lock(); + if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) { + ERR("Failed to acquire trace chunk reference during the creation of a stream"); + ret = -1; + goto error; + } + rcu_read_lock(); stream->key = stream_key; + stream->trace_chunk = trace_chunk; stream->out_fd = -1; stream->out_fd_offset = 0; stream->output_written = 0; - stream->state = state; - stream->uid = uid; - stream->gid = gid; stream->net_seq_idx = relayd_id; stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_file = NULL; stream->last_sequence_number = -1ULL; - stream->trace_archive_id = trace_archive_id; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -633,6 +641,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, error: rcu_read_unlock(); + lttng_trace_chunk_put(stream->trace_chunk); free(stream); end: if (alloc_ret) { @@ -810,8 +819,9 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, stream->name, path, &stream->relayd_stream_id, - stream->chan->tracefile_size, stream->chan->tracefile_count, - stream->trace_archive_id); + stream->chan->tracefile_size, + stream->chan->tracefile_count, + stream->trace_chunk); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); @@ -952,6 +962,151 @@ error: return outfd; } +/* + * Trigger a dump of the metadata content. Following/during the succesful + * completion of this call, the metadata poll thread will start receiving + * metadata packets to consume. + * + * The caller must hold the channel and stream locks. + */ +static +int consumer_metadata_stream_dump(struct lttng_consumer_stream *stream) +{ + int ret; + + ASSERT_LOCKED(stream->chan->lock); + ASSERT_LOCKED(stream->lock); + assert(stream->metadata_flag); + assert(stream->chan->trace_chunk); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Reset the position of what has been read from the + * metadata cache to 0 so we can dump it again. + */ + ret = kernctl_metadata_cache_dump(stream->wait_fd); + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Reset the position pushed from the metadata cache so it + * will write from the beginning on the next push. + */ + stream->ust_metadata_pushed = 0; + ret = consumer_metadata_wakeup_pipe(stream->chan); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + if (ret < 0) { + ERR("Failed to dump the metadata cache"); + } + return ret; +} + +static +int lttng_consumer_channel_set_trace_chunk( + struct lttng_consumer_channel *channel, + struct lttng_trace_chunk *new_trace_chunk) +{ + int ret = 0; + const bool is_local_trace = channel->relayd_id == -1ULL; + bool update_stream_trace_chunk; + struct cds_lfht_iter iter; + struct lttng_consumer_stream *stream; + unsigned long channel_hash; + + pthread_mutex_lock(&channel->lock); + /* + * A stream can transition to a state where it and its channel + * no longer belong to a trace chunk. For instance, this happens when + * a session is rotated while it is inactive. After the rotation + * of an inactive session completes, the channel and its streams no + * longer belong to a trace chunk. + * + * However, if a session is stopped, rotated, and started again, + * the session daemon will create a new chunk and send it to its peers. + * In that case, the streams' transition to a new chunk can be performed + * immediately. + * + * This trace chunk transition could also be performed lazily when + * a buffer is consumed. However, creating the files here allows the + * consumer daemon to report any creation error to the session daemon + * and cause the start of the tracing session to fail. + */ + update_stream_trace_chunk = !channel->trace_chunk && new_trace_chunk; + + /* + * The acquisition of the reference cannot fail (barring + * a severe internal error) since a reference to the published + * chunk is already held by the caller. + */ + if (new_trace_chunk) { + const bool acquired_reference = lttng_trace_chunk_get( + new_trace_chunk); + + assert(acquired_reference); + } + + lttng_trace_chunk_put(channel->trace_chunk); + channel->trace_chunk = new_trace_chunk; + if (!is_local_trace || !new_trace_chunk) { + /* Not an error. */ + goto end; + } + + if (!update_stream_trace_chunk) { + goto end; + } + + channel_hash = consumer_data.stream_per_chan_id_ht->hash_fct( + &channel->key, lttng_ht_seed); + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(consumer_data.stream_per_chan_id_ht->ht, + channel_hash, + consumer_data.stream_per_chan_id_ht->match_fct, + &channel->key, &iter, stream, node_channel_id.node) { + bool acquired_reference, should_regenerate_metadata = false; + + acquired_reference = lttng_trace_chunk_get(channel->trace_chunk); + assert(acquired_reference); + + pthread_mutex_lock(&stream->lock); + + /* + * On a transition from "no-chunk" to a new chunk, a metadata + * stream's content must be entirely dumped. This must occcur + * _after_ the creation of the metadata stream's output files + * as the consumption thread (not necessarily the one executing + * this) may start to consume during the call to + * consumer_metadata_stream_dump(). + */ + should_regenerate_metadata = + stream->metadata_flag && + !stream->trace_chunk && channel->trace_chunk; + stream->trace_chunk = channel->trace_chunk; + ret = consumer_stream_create_output_files(stream, true); + if (ret) { + pthread_mutex_unlock(&stream->lock); + goto end_rcu_unlock; + } + if (should_regenerate_metadata) { + ret = consumer_metadata_stream_dump(stream); + } + pthread_mutex_unlock(&stream->lock); + if (ret) { + goto end_rcu_unlock; + } + } +end_rcu_unlock: + rcu_read_unlock(); +end: + pthread_mutex_unlock(&channel->lock); + return ret; +} + /* * Allocate and return a new lttng_consumer_channel object using the given key * to initialize the hash table node. @@ -960,10 +1115,9 @@ error: */ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id, + const uint64_t *chunk_id, const char *pathname, const char *name, - uid_t uid, - gid_t gid, uint64_t relayd_id, enum lttng_event_output output, uint64_t tracefile_size, @@ -974,7 +1128,18 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, const char *root_shm_path, const char *shm_path) { - struct lttng_consumer_channel *channel; + struct lttng_consumer_channel *channel = NULL; + struct lttng_trace_chunk *trace_chunk = NULL; + + if (chunk_id) { + trace_chunk = lttng_trace_chunk_registry_find_chunk( + consumer_data.chunk_registry, session_id, + *chunk_id); + if (!trace_chunk) { + ERR("Failed to find trace chunk reference during creation of channel"); + goto end; + } + } channel = zmalloc(sizeof(*channel)); if (channel == NULL) { @@ -986,8 +1151,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->refcount = 0; channel->session_id = session_id; channel->session_id_per_pid = session_id_per_pid; - channel->uid = uid; - channel->gid = gid; channel->relayd_id = relayd_id; channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; @@ -1043,13 +1206,25 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->session_id); channel->wait_fd = -1; - CDS_INIT_LIST_HEAD(&channel->streams.head); + if (trace_chunk) { + int ret = lttng_consumer_channel_set_trace_chunk(channel, + trace_chunk); + if (ret) { + goto error; + } + } + DBG("Allocated channel (key %" PRIu64 ")", channel->key); end: + lttng_trace_chunk_put(trace_chunk); return channel; +error: + consumer_del_channel(channel); + channel = NULL; + goto end; } /* @@ -1076,6 +1251,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, lttng_ht_add_u64(consumer_data.channels_by_session_id_ht, &channel->channels_by_session_id_ht_node); rcu_read_unlock(); + channel->is_published = true; pthread_mutex_unlock(&channel->timer_lock); pthread_mutex_unlock(&channel->lock); @@ -1125,16 +1301,13 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * closed by the polling thread after a wakeup on the data_pipe or * metadata_pipe. */ - if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || - stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { (*nb_inactive_fd)++; continue; } /* * This clobbers way too much the debug output. Uncomment that if you * need it for debugging purposes. - * - * DBG("Active FD %d", stream->wait_fd); */ (*pollfd)[i].fd = stream->wait_fd; (*pollfd)[i].events = POLLIN | POLLPRI; @@ -1529,7 +1702,7 @@ end: * core function for writing trace buffers to either the local filesystem or * the network. * - * It must be called with the stream lock held. + * It must be called with the stream and the channel lock held. * * Careful review MUST be put if any changes occur! * @@ -1553,6 +1726,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* RCU lock for the relayd pointer */ rcu_read_lock(); + assert(stream->chan->trace_chunk); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1651,32 +1826,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->chan->tracefile_size > 0 && (stream->tracefile_size_current + len) > stream->chan->tracefile_size) { - ret = utils_rotate_stream_file(stream->chan->pathname, - stream->name, stream->chan->tracefile_size, - stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current), - &stream->out_fd); - if (ret < 0) { - ERR("Rotating output file"); + ret = consumer_stream_rotate_output_files(stream); + if (ret) { goto end; } outfd = stream->out_fd; - - if (stream->index_file) { - lttng_index_file_put(stream->index_file); - stream->index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!stream->index_file) { - goto end; - } - } - - /* Reset current size because we just perform a rotation. */ - stream->tracefile_size_current = 0; - stream->out_fd_offset = 0; orig_offset = 0; } stream->tracefile_size_current += len; @@ -1853,33 +2007,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( if (stream->chan->tracefile_size > 0 && (stream->tracefile_size_current + len) > stream->chan->tracefile_size) { - ret = utils_rotate_stream_file(stream->chan->pathname, - stream->name, stream->chan->tracefile_size, - stream->chan->tracefile_count, stream->uid, stream->gid, - stream->out_fd, &(stream->tracefile_count_current), - &stream->out_fd); + ret = consumer_stream_rotate_output_files(stream); if (ret < 0) { written = ret; - ERR("Rotating output file"); goto end; } outfd = stream->out_fd; - - if (stream->index_file) { - lttng_index_file_put(stream->index_file); - stream->index_file = lttng_index_file_create(stream->chan->pathname, - stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!stream->index_file) { - goto end; - } - } - - /* Reset current size because we just perform a rotation. */ - stream->tracefile_size_current = 0; - stream->out_fd_offset = 0; orig_offset = 0; } stream->tracefile_size_current += len; @@ -2170,6 +2303,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, consumer_del_channel(free_chan); } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; consumer_stream_free(stream); } @@ -2294,46 +2429,6 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } -/* - * Perform operations that need to be done after a stream has - * rotated and released the stream lock. - * - * Multiple rotations cannot occur simultaneously, so we know the state of the - * "rotated" stream flag cannot change. - * - * This MUST be called WITHOUT the stream lock held. - */ -static -int consumer_post_rotation(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) -{ - int ret = 0; - - pthread_mutex_lock(&stream->chan->lock); - - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - /* - * The ust_metadata_pushed counter has been reset to 0, so now - * we can wakeup the metadata thread so it dumps the metadata - * cache to the new file. - */ - if (stream->metadata_flag) { - consumer_metadata_wakeup_pipe(stream->chan); - } - break; - default: - ERR("Unknown consumer_data type"); - abort(); - } - - pthread_mutex_unlock(&stream->chan->lock); - return ret; -} - /* * Thread polls on metadata file descriptor and write them on disk or on the * network. @@ -3370,9 +3465,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; - int rotate_ret; - bool rotated = false; + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); if (stream->metadata_flag) { pthread_mutex_lock(&stream->metadata_rdv_lock); @@ -3380,11 +3474,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated); + ret = lttng_kconsumer_read_subbuffer(stream, ctx); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated); + ret = lttng_ustconsumer_read_subbuffer(stream, ctx); break; default: ERR("Unknown consumer_data type"); @@ -3398,13 +3492,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); - if (rotated) { - rotate_ret = consumer_post_rotation(stream, ctx); - if (rotate_ret < 0) { - ERR("Failed after a rotation"); - ret = -1; - } - } + pthread_mutex_unlock(&stream->chan->lock); return ret; } @@ -3916,8 +4004,7 @@ end: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, const char *path, uint64_t relayd_id, - uint32_t metadata, uint64_t new_chunk_id, + uint64_t key, uint64_t relayd_id, uint32_t metadata, struct lttng_consumer_local_data *ctx) { int ret; @@ -3930,30 +4017,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, rcu_read_lock(); pthread_mutex_lock(&channel->lock); - channel->current_chunk_id = new_chunk_id; - - ret = lttng_strncpy(channel->pathname, path, sizeof(channel->pathname)); - if (ret) { - ERR("Failed to copy new path to channel during channel rotation"); - ret = -1; - goto end_unlock_channel; - } - - if (relayd_id == -1ULL) { - /* - * The domain path (/ust or /kernel) has been created before, we - * now need to create the last part of the path: the application/user - * specific section (uid/1000/64-bit). - */ - ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG, - channel->uid, channel->gid); - if (ret < 0) { - ERR("Failed to create trace directory at %s during rotation", - channel->pathname); - ret = -1; - goto end_unlock_channel; - } - } cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), @@ -3968,13 +4031,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, */ pthread_mutex_lock(&stream->lock); - ret = lttng_strncpy(stream->channel_read_only_attributes.path, - channel->pathname, - sizeof(stream->channel_read_only_attributes.path)); - if (ret) { - ERR("Failed to sample channel path name during channel rotation"); - goto end_unlock_stream; - } ret = lttng_consumer_sample_snapshot_positions(stream); if (ret < 0) { ERR("Failed to sample snapshot position during channel rotation"); @@ -4010,7 +4066,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, end_unlock_stream: pthread_mutex_unlock(&stream->lock); -end_unlock_channel: pthread_mutex_unlock(&channel->lock); end: rcu_read_unlock(); @@ -4077,64 +4132,39 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre /* * Perform the rotation a local stream file. */ +static int rotate_local_stream(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream) { - int ret; + int ret = 0; - DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64 " at path %s", + DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64, stream->key, - stream->chan->key, - stream->channel_read_only_attributes.path); - - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Closing trace file (fd %d), stream %" PRIu64, - stream->out_fd, stream->key); - assert(0); - goto error; - } - - ret = utils_create_stream_file( - stream->channel_read_only_attributes.path, - stream->name, - stream->channel_read_only_attributes.tracefile_size, - stream->tracefile_count_current, - stream->uid, stream->gid, NULL); - if (ret < 0) { - ERR("Rotate create stream file"); - goto error; - } - stream->out_fd = ret; + stream->chan->key); stream->tracefile_size_current = 0; + stream->tracefile_count_current = 0; - if (!stream->metadata_flag) { - struct lttng_index_file *index_file; + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret) { + PERROR("Failed to close stream out_fd of channel \"%s\"", + stream->chan->name); + } + stream->out_fd = -1; + } + if (stream->index_file) { lttng_index_file_put(stream->index_file); - - index_file = lttng_index_file_create( - stream->channel_read_only_attributes.path, - stream->name, stream->uid, stream->gid, - stream->channel_read_only_attributes.tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR); - if (!index_file) { - ERR("Create index file during rotation"); - goto error; - } - stream->index_file = index_file; - stream->out_fd_offset = 0; + stream->index_file = NULL; } - ret = 0; - goto end; + if (!stream->trace_chunk) { + goto end; + } -error: - ret = -1; + ret = consumer_stream_create_output_files(stream, true); end: return ret; - } /* @@ -4145,6 +4175,8 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx, { int ret; struct consumer_relayd_sock_pair *relayd; + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; DBG("Rotate relay stream"); relayd = consumer_find_relayd(stream->net_seq_idx); @@ -4154,11 +4186,19 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx, goto end; } + chunk_status = lttng_trace_chunk_get_id(stream->chan->trace_chunk, + &chunk_id); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to retrieve the id of the current trace chunk of channel \"%s\"", + stream->chan->name); + ret = -1; + goto end; + } + pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_rotate_stream(&relayd->control_sock, stream->relayd_stream_id, - stream->channel_read_only_attributes.path, - stream->chan->current_chunk_id, + chunk_id, stream->last_sequence_number); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { @@ -4175,60 +4215,74 @@ end: /* * Performs the stream rotation for the rotate session feature if needed. - * It must be called with the stream lock held. + * It must be called with the channel and stream locks held. * * Return 0 on success, a negative number of error. */ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, bool *rotated) + struct lttng_consumer_stream *stream) { int ret; DBG("Consumer rotate stream %" PRIu64, stream->key); + /* + * Update the stream's 'current' chunk to the session's (channel) + * now-current chunk. + */ + lttng_trace_chunk_put(stream->trace_chunk); + if (stream->chan->trace_chunk == stream->trace_chunk) { + /* + * A channel can be rotated and not have a "next" chunk + * to transition to. In that case, the channel's "current chunk" + * has not been closed yet, but it has not been updated to + * a "next" trace chunk either. Hence, the stream, like its + * parent channel, becomes part of no chunk and can't output + * anything until a new trace chunk is created. + */ + stream->trace_chunk = NULL; + } else if (stream->chan->trace_chunk && + !lttng_trace_chunk_get(stream->chan->trace_chunk)) { + ERR("Failed to acquire a reference to channel's trace chunk during stream rotation"); + ret = -1; + goto error; + } else { + /* + * Update the stream's trace chunk to its parent channel's + * current trace chunk. + */ + stream->trace_chunk = stream->chan->trace_chunk; + } + if (stream->net_seq_idx != (uint64_t) -1ULL) { ret = rotate_relay_stream(ctx, stream); } else { ret = rotate_local_stream(ctx, stream); } - stream->trace_archive_id++; if (ret < 0) { ERR("Failed to rotate stream, ret = %i", ret); goto error; } - if (stream->metadata_flag) { - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - /* - * Reset the position of what has been read from the metadata - * cache to 0 so we can dump it again. - */ - ret = kernctl_metadata_cache_dump(stream->wait_fd); - if (ret < 0) { - ERR("Failed to dump the kernel metadata cache after rotation"); - goto error; - } - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - /* - * Reset the position pushed from the metadata cache so it - * will write from the beginning on the next push. - */ - stream->ust_metadata_pushed = 0; - break; - default: - ERR("Unknown consumer_data type"); - abort(); + if (stream->metadata_flag && stream->trace_chunk) { + /* + * If the stream has transitioned to a new trace + * chunk, the metadata should be re-dumped to the + * newest chunk. + * + * However, it is possible for a stream to transition to + * a "no-chunk" state. This can happen if a rotation + * occurs on an inactive session. In such cases, the metadata + * regeneration will happen when the next trace chunk is + * created. + */ + ret = consumer_metadata_stream_dump(stream); + if (ret) { + goto error; } } lttng_consumer_reset_stream_rotate_state(stream); - if (rotated) { - *rotated = true; - } - ret = 0; error: @@ -4264,21 +4318,19 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, stream, node_channel_id.node) { health_code_update(); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); if (!stream->rotate_ready) { pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); continue; } DBG("Consumer rotate ready stream %" PRIu64, stream->key); - ret = lttng_consumer_rotate_stream(ctx, stream, NULL); + ret = lttng_consumer_rotate_stream(ctx, stream); pthread_mutex_unlock(&stream->lock); - if (ret) { - goto end; - } - - ret = consumer_post_rotation(stream, ctx); + pthread_mutex_unlock(&stream->chan->lock); if (ret) { goto end; } @@ -4291,218 +4343,298 @@ end: return ret; } -static -int rotate_rename_local(const char *old_path, const char *new_path, - uid_t uid, gid_t gid) +enum lttcomm_return_code lttng_consumer_init_command( + struct lttng_consumer_local_data *ctx, + const lttng_uuid sessiond_uuid) { - int ret; - - assert(old_path); - assert(new_path); - - ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG, uid, gid); - if (ret < 0) { - ERR("Create directory on rotate"); - goto end; - } + enum lttcomm_return_code ret; + char uuid_str[UUID_STR_LEN]; - ret = rename(old_path, new_path); - if (ret < 0 && errno != ENOENT) { - PERROR("Rename completed rotation chunk"); + if (ctx->sessiond_uuid.is_set) { + ret = LTTCOMM_CONSUMERD_ALREADY_SET; goto end; } - ret = 0; + ctx->sessiond_uuid.is_set = true; + memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid)); + ret = LTTCOMM_CONSUMERD_SUCCESS; + lttng_uuid_to_str(sessiond_uuid, uuid_str); + DBG("Received session daemon UUID: %s", uuid_str); end: return ret; } -static -int rotate_rename_relay(const char *old_path, const char *new_path, - uint64_t relayd_id) +enum lttcomm_return_code lttng_consumer_create_trace_chunk( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id, + time_t chunk_creation_timestamp, + const char *chunk_override_name, + const struct lttng_credentials *credentials, + struct lttng_directory_handle *chunk_directory_handle) { int ret; - struct consumer_relayd_sock_pair *relayd; - - relayd = consumer_find_relayd(relayd_id); - if (!relayd) { - ERR("Failed to find relayd while running rotate_rename_relay command"); - ret = -1; - goto end; - } - - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path); - if (ret < 0) { - ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); - } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); -end: - return ret; -} - -int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, - uid_t uid, gid_t gid, uint64_t relayd_id) -{ - if (relayd_id != -1ULL) { - return rotate_rename_relay(old_path, new_path, relayd_id); - } else { - return rotate_rename_local(old_path, new_path, uid, gid); - } -} - -/* Stream lock must be acquired by the caller. */ -static -bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream, - uint64_t session_id, uint64_t chunk_id) -{ - bool pending = false; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; + struct lttng_trace_chunk *created_chunk, *published_chunk; + enum lttng_trace_chunk_status chunk_status; + char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; + char creation_timestamp_buffer[ISO8601_STR_LEN]; + const char *relayd_id_str = "(none)"; + const char *creation_timestamp_str; + struct lttng_ht_iter iter; + struct lttng_consumer_channel *channel; - if (stream->session_id != session_id) { - /* Skip. */ - goto end; - } + if (relayd_id) { + /* Only used for logging purposes. */ + ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), + "%" PRIu64, *relayd_id); + if (ret > 0 && ret < sizeof(relayd_id_buffer)) { + relayd_id_str = relayd_id_buffer; + } else { + relayd_id_str = "(formatting error)"; + } + } + + /* Local protocol error. */ + assert(chunk_creation_timestamp); + ret = time_to_iso8601_str(chunk_creation_timestamp, + creation_timestamp_buffer, + sizeof(creation_timestamp_buffer)); + creation_timestamp_str = !ret ? creation_timestamp_buffer : + "(formatting error)"; + + DBG("Consumer create trace chunk command: relay_id = %s" + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 + ", chunk_override_name = %s" + ", chunk_creation_timestamp = %s", + relayd_id_str, session_id, chunk_id, + chunk_override_name ? : "(none)", + creation_timestamp_str); /* - * If the stream's archive_id belongs to the chunk being rotated (or an - * even older one), it means that the consumer has not consumed all the - * buffers that belong to the chunk being rotated. Therefore, the - * rotation is considered as ongoing/pending. + * The trace chunk registry, as used by the consumer daemon, implicitly + * owns the trace chunks. This is only needed in the consumer since + * the consumer has no notion of a session beyond session IDs being + * used to identify other objects. + * + * The lttng_trace_chunk_registry_publish() call below provides a + * reference which is not released; it implicitly becomes the session + * daemon's reference to the chunk in the consumer daemon. + * + * The lifetime of trace chunks in the consumer daemon is managed by + * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK + * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands. */ - pending = stream->trace_archive_id <= chunk_id; -end: - return pending; -} - -/* RCU read lock must be acquired by the caller. */ -int lttng_consumer_check_rotation_pending_local(uint64_t session_id, - uint64_t chunk_id) -{ - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - bool rotation_pending = false; + created_chunk = lttng_trace_chunk_create(chunk_id, + chunk_creation_timestamp); + if (!created_chunk) { + ERR("Failed to create trace chunk"); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + goto end; + } - /* Start with the metadata streams... */ - cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { - pthread_mutex_lock(&stream->lock); - rotation_pending = check_stream_rotation_pending(stream, - session_id, chunk_id); - pthread_mutex_unlock(&stream->lock); - if (rotation_pending) { + if (chunk_override_name) { + chunk_status = lttng_trace_chunk_override_name(created_chunk, + chunk_override_name); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; goto end; } } - /* ... followed by the data streams. */ - cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { - pthread_mutex_lock(&stream->lock); - rotation_pending = check_stream_rotation_pending(stream, - session_id, chunk_id); - pthread_mutex_unlock(&stream->lock); - if (rotation_pending) { + if (chunk_directory_handle) { + chunk_status = lttng_trace_chunk_set_credentials(created_chunk, + credentials); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set trace chunk credentials"); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + goto end; + } + /* + * The consumer daemon has no ownership of the chunk output + * directory. + */ + chunk_status = lttng_trace_chunk_set_as_user(created_chunk, + chunk_directory_handle); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to set trace chunk's directory handle"); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; goto end; } } -end: - return !!rotation_pending; -} - -int lttng_consumer_check_rotation_pending_relay(uint64_t session_id, - uint64_t relayd_id, uint64_t chunk_id) -{ - int ret; - struct consumer_relayd_sock_pair *relayd; - - relayd = consumer_find_relayd(relayd_id); - if (!relayd) { - ERR("Failed to find relayd id %" PRIu64, relayd_id); - ret = -1; + published_chunk = lttng_trace_chunk_registry_publish_chunk( + consumer_data.chunk_registry, session_id, + created_chunk); + lttng_trace_chunk_put(created_chunk); + created_chunk = NULL; + if (!published_chunk) { + ERR("Failed to publish trace chunk"); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; goto end; } - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_rotate_pending(&relayd->control_sock, chunk_id); - if (ret < 0) { - ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); - } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - -end: - return ret; -} - -static -int mkdir_local(const char *path, uid_t uid, gid_t gid) -{ - int ret; + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht, + consumer_data.channels_by_session_id_ht->hash_fct( + &session_id, lttng_ht_seed), + consumer_data.channels_by_session_id_ht->match_fct, + &session_id, &iter.iter, channel, + channels_by_session_id_ht_node.node) { + ret = lttng_consumer_channel_set_trace_chunk(channel, + published_chunk); + if (ret) { + /* + * Roll-back the creation of this chunk. + * + * This is important since the session daemon will + * assume that the creation of this chunk failed and + * will never ask for it to be closed, resulting + * in a leak and an inconsistent state for some + * channels. + */ + enum lttcomm_return_code close_ret; + + DBG("Failed to set new trace chunk on existing channels, rolling back"); + close_ret = lttng_consumer_close_trace_chunk(relayd_id, + session_id, chunk_id, + chunk_creation_timestamp); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, + session_id, chunk_id); + } - ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, uid, gid); - if (ret < 0) { - /* utils_mkdir_recursive logs an error. */ - goto end; + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + break; + } } + rcu_read_unlock(); - ret = 0; + /* Release the reference returned by the "publish" operation. */ + lttng_trace_chunk_put(published_chunk); end: - return ret; + return ret_code; } -static -int mkdir_relay(const char *path, uint64_t relayd_id) +enum lttcomm_return_code lttng_consumer_close_trace_chunk( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id, time_t chunk_close_timestamp) { - int ret; - struct consumer_relayd_sock_pair *relayd; + enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; + struct lttng_trace_chunk *chunk; + char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; + const char *relayd_id_str = "(none)"; + struct lttng_ht_iter iter; + struct lttng_consumer_channel *channel; + enum lttng_trace_chunk_status chunk_status; - relayd = consumer_find_relayd(relayd_id); - if (!relayd) { - ERR("Failed to find relayd"); - ret = -1; + if (relayd_id) { + int ret; + + /* Only used for logging purposes. */ + ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), + "%" PRIu64, *relayd_id); + if (ret > 0 && ret < sizeof(relayd_id_buffer)) { + relayd_id_str = relayd_id_buffer; + } else { + relayd_id_str = "(formatting error)"; + } + } + + DBG("Consumer close trace chunk command: relayd_id = %s" + ", session_id = %" PRIu64 + ", chunk_id = %" PRIu64, relayd_id_str, + session_id, chunk_id); + chunk = lttng_trace_chunk_registry_find_chunk( + consumer_data.chunk_registry, session_id, + chunk_id); + if (!chunk) { + ERR("Failed to find chunk: session_id = %" PRIu64 + ", chunk_id = %" PRIu64, + session_id, chunk_id); + ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK; goto end; } - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_mkdir(&relayd->control_sock, path); - if (ret < 0) { - ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); + chunk_status = lttng_trace_chunk_set_close_timestamp(chunk, + chunk_close_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + goto end; } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - -end: - return ret; + /* + * Release the reference returned by the "find" operation and + * the session daemon's implicit reference to the chunk. + */ + lttng_trace_chunk_put(chunk); + lttng_trace_chunk_put(chunk); -} + /* + * chunk is now invalid to access as we no longer hold a reference to + * it; it is only kept around to compare it (by address) to the + * current chunk found in the session's channels. + */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, + channel, node.node) { + int ret; -int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid, - uint64_t relayd_id) -{ - if (relayd_id != -1ULL) { - return mkdir_relay(path, relayd_id); - } else { - return mkdir_local(path, uid, gid); + /* + * Only change the channel's chunk to NULL if it still + * references the chunk being closed. The channel may + * reference a newer channel in the case of a session + * rotation. When a session rotation occurs, the "next" + * chunk is created before the "current" chunk is closed. + */ + if (channel->trace_chunk != chunk) { + continue; + } + ret = lttng_consumer_channel_set_trace_chunk(channel, NULL); + if (ret) { + /* + * Attempt to close the chunk on as many channels as + * possible. + */ + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + } } + rcu_read_unlock(); +end: + return ret_code; } -enum lttcomm_return_code lttng_consumer_init_command( - struct lttng_consumer_local_data *ctx, - const lttng_uuid sessiond_uuid) +enum lttcomm_return_code lttng_consumer_trace_chunk_exists( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id) { - enum lttcomm_return_code ret; - char uuid_str[UUID_STR_LEN]; - - if (ctx->sessiond_uuid.is_set) { - ret = LTTCOMM_CONSUMERD_ALREADY_SET; - goto end; - } - - ctx->sessiond_uuid.is_set = true; - memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid)); - ret = LTTCOMM_CONSUMERD_SUCCESS; - lttng_uuid_to_str(sessiond_uuid, uuid_str); - DBG("Received session daemon UUID: %s", uuid_str); -end: - return ret; + enum lttcomm_return_code ret_code; + struct lttng_trace_chunk *chunk; + char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; + const char *relayd_id_str = "(none)"; + + if (relayd_id) { + int ret; + + /* Only used for logging purposes. */ + ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer), + "%" PRIu64, *relayd_id); + if (ret > 0 && ret < sizeof(relayd_id_buffer)) { + relayd_id_str = relayd_id_buffer; + } else { + relayd_id_str = "(formatting error)"; + } + } + + DBG("Consumer trace chunk exists command: relayd_id = %s" + ", session_id = %" PRIu64 + ", chunk_id = %" PRIu64, relayd_id_str, + session_id, chunk_id); + chunk = lttng_trace_chunk_registry_find_chunk( + consumer_data.chunk_registry, session_id, + chunk_id); + DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist"); + ret_code = chunk ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL : + LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK; + + lttng_trace_chunk_put(chunk); + return ret_code; }