X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.cpp;fp=src%2Fcommon%2Fconsumer%2Fconsumer-stream.cpp;h=2816d1a37cb28a9d544e61fb874dc4e3a2468a9a;hp=b30e9aac0680eb6795a763f836a5da8ff18349fa;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hpb=52e345b9ac912d033c2a2c25a170a01cf209839d diff --git a/src/common/consumer/consumer-stream.cpp b/src/common/consumer/consumer-stream.cpp index b30e9aac0..2816d1a37 100644 --- a/src/common/consumer/consumer-stream.cpp +++ b/src/common/consumer/consumer-stream.cpp @@ -8,17 +8,12 @@ */ #define _LGPL_SOURCE -#include -#include -#include +#include "consumer-stream.hpp" #include #include -#include -#include #include #include -#include #include #include #include @@ -27,15 +22,16 @@ #include #include -#include "consumer-stream.hpp" +#include +#include +#include /* * RCU call to free stream. MUST only be used with call_rcu(). */ static void free_stream_rcu(struct rcu_head *head) { - struct lttng_ht_node_u64 *node = - lttng::utils::container_of(head, <tng_ht_node_u64::head); + struct lttng_ht_node_u64 *node = lttng::utils::container_of(head, <tng_ht_node_u64::head); struct lttng_consumer_stream *stream = lttng::utils::container_of(node, <tng_consumer_stream::node); @@ -81,7 +77,7 @@ static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_str /* Only used for data streams. */ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuf) + const struct stream_subbuffer *subbuf) { int ret = 0; uint64_t sequence_number; @@ -103,13 +99,12 @@ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, if (stream->last_sequence_number == -1ULL) { stream->last_sequence_number = sequence_number; } else if (sequence_number > stream->last_sequence_number) { - stream->chan->lost_packets += sequence_number - - stream->last_sequence_number - 1; + stream->chan->lost_packets += sequence_number - stream->last_sequence_number - 1; } else { /* seq <= last_sequence_number */ - ERR("Sequence number inconsistent : prev = %" PRIu64 - ", current = %" PRIu64, - stream->last_sequence_number, sequence_number); + ERR("Sequence number inconsistent : prev = %" PRIu64 ", current = %" PRIu64, + stream->last_sequence_number, + sequence_number); ret = -1; goto end; } @@ -120,13 +115,10 @@ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, * Overflow has occurred. We assume only one wrap-around * has occurred. */ - stream->chan->discarded_events += - (1ULL << (CAA_BITS_PER_LONG - 1)) - - stream->last_discarded_events + - discarded_events; + stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) - + stream->last_discarded_events + discarded_events; } else { - stream->chan->discarded_events += discarded_events - - stream->last_discarded_events; + stream->chan->discarded_events += discarded_events - stream->last_discarded_events; } stream->last_discarded_events = discarded_events; ret = 0; @@ -135,40 +127,37 @@ end: return ret; } -static -void ctf_packet_index_populate(struct ctf_packet_index *index, - off_t offset, const struct stream_subbuffer *subbuffer) +static void ctf_packet_index_populate(struct ctf_packet_index *index, + off_t offset, + const struct stream_subbuffer *subbuffer) { *index = (typeof(*index)){ .offset = htobe64(offset), .packet_size = htobe64(subbuffer->info.data.packet_size), .content_size = htobe64(subbuffer->info.data.content_size), - .timestamp_begin = htobe64( - subbuffer->info.data.timestamp_begin), - .timestamp_end = htobe64( - subbuffer->info.data.timestamp_end), - .events_discarded = htobe64( - subbuffer->info.data.events_discarded), + .timestamp_begin = htobe64(subbuffer->info.data.timestamp_begin), + .timestamp_end = htobe64(subbuffer->info.data.timestamp_end), + .events_discarded = htobe64(subbuffer->info.data.events_discarded), .stream_id = htobe64(subbuffer->info.data.stream_id), - .stream_instance_id = htobe64( - subbuffer->info.data.stream_instance_id.is_set ? - subbuffer->info.data.stream_instance_id.value : -1ULL), - .packet_seq_num = htobe64( - subbuffer->info.data.sequence_number.is_set ? - subbuffer->info.data.sequence_number.value : -1ULL), + .stream_instance_id = + htobe64(subbuffer->info.data.stream_instance_id.is_set ? + subbuffer->info.data.stream_instance_id.value : + -1ULL), + .packet_seq_num = htobe64(subbuffer->info.data.sequence_number.is_set ? + subbuffer->info.data.sequence_number.value : + -1ULL), }; } -static ssize_t consumer_stream_consume_mmap( - struct lttng_consumer_local_data *ctx __attribute__((unused)), - struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuffer) +static ssize_t consumer_stream_consume_mmap(struct lttng_consumer_local_data *ctx + __attribute__((unused)), + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) { const unsigned long padding_size = - subbuffer->info.data.padded_subbuf_size - - subbuffer->info.data.subbuf_size; + subbuffer->info.data.padded_subbuf_size - subbuffer->info.data.subbuf_size; const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap( - stream, &subbuffer->buffer.buffer, padding_size); + stream, &subbuffer->buffer.buffer, padding_size); if (stream->net_seq_idx == -1ULL) { /* @@ -177,8 +166,8 @@ static ssize_t consumer_stream_consume_mmap( */ if (written_bytes != subbuffer->info.data.padded_subbuf_size) { DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)", - written_bytes, - subbuffer->info.data.padded_subbuf_size); + written_bytes, + subbuffer->info.data.padded_subbuf_size); } } else { /* @@ -187,8 +176,8 @@ static ssize_t consumer_stream_consume_mmap( */ if (written_bytes != subbuffer->info.data.subbuf_size) { DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)", - written_bytes, - subbuffer->info.data.subbuf_size); + written_bytes, + subbuffer->info.data.subbuf_size); } } @@ -203,18 +192,17 @@ static ssize_t consumer_stream_consume_mmap( return written_bytes; } -static ssize_t consumer_stream_consume_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuffer) +static ssize_t consumer_stream_consume_splice(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) { const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice( - ctx, stream, subbuffer->info.data.padded_subbuf_size, 0); + ctx, stream, subbuffer->info.data.padded_subbuf_size, 0); if (written_bytes != subbuffer->info.data.padded_subbuf_size) { DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)", - written_bytes, - subbuffer->info.data.padded_subbuf_size); + written_bytes, + subbuffer->info.data.padded_subbuf_size); } /* @@ -228,10 +216,9 @@ static ssize_t consumer_stream_consume_splice( return written_bytes; } -static int consumer_stream_send_index( - struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuffer, - struct lttng_consumer_local_data *ctx __attribute__((unused))) +static int consumer_stream_send_index(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx __attribute__((unused))) { off_t packet_offset = 0; struct ctf_packet_index index = {}; @@ -241,8 +228,7 @@ static int consumer_stream_send_index( * effect this sub-buffer from the offset. */ if (stream->net_seq_idx == (uint64_t) -1ULL) { - packet_offset = stream->out_fd_offset - - subbuffer->info.data.padded_subbuf_size; + packet_offset = stream->out_fd_offset - subbuffer->info.data.padded_subbuf_size; } ctf_packet_index_populate(&index, packet_offset, subbuffer); @@ -256,7 +242,7 @@ static int consumer_stream_send_index( * indicating that there is no metadata available for that stream. */ static int do_sync_metadata(struct lttng_consumer_stream *metadata, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret; enum sync_metadata_status status; @@ -373,8 +359,7 @@ end_unlock_mutex: * * Return 0 on success or else a negative value. */ -int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, - uint64_t session_id) +int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, uint64_t session_id) { int ret; struct lttng_consumer_stream *stream = NULL; @@ -391,8 +376,13 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, /* Search the metadata associated with the session id of the given stream. */ cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, - &session_id, &iter.iter, stream, node_session_id.node) { + ht->hash_fct(&session_id, lttng_ht_seed), + ht->match_fct, + &session_id, + &iter.iter, + stream, + node_session_id.node) + { if (!stream->metadata_flag) { continue; } @@ -414,10 +404,9 @@ end: return ret; } -static int consumer_stream_sync_metadata_index( - struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuffer, - struct lttng_consumer_local_data *ctx) +static int consumer_stream_sync_metadata_index(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx) { bool missed_metadata_flush; int ret; @@ -469,17 +458,15 @@ end: * of the metadata stream in the kernel. If it was updated, set the reset flag * on the stream. */ -static -int metadata_stream_check_version(struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuffer) +static int metadata_stream_check_version(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) { if (stream->metadata_version == subbuffer->info.metadata.version) { goto end; } DBG("New metadata version detected"); - consumer_stream_metadata_set_version(stream, - subbuffer->info.metadata.version); + consumer_stream_metadata_set_version(stream, subbuffer->info.metadata.version); if (stream->read_subbuffer_ops.reset_metadata) { stream->read_subbuffer_ops.reset_metadata(stream); @@ -489,9 +476,7 @@ end: return 0; } -static -bool stream_is_rotating_to_null_chunk( - const struct lttng_consumer_stream *stream) +static bool stream_is_rotating_to_null_chunk(const struct lttng_consumer_stream *stream) { bool rotating_to_null_chunk = false; @@ -500,16 +485,15 @@ bool stream_is_rotating_to_null_chunk( goto end; } - if (stream->trace_chunk == stream->chan->trace_chunk || - !stream->chan->trace_chunk) { + if (stream->trace_chunk == stream->chan->trace_chunk || !stream->chan->trace_chunk) { rotating_to_null_chunk = true; } end: return rotating_to_null_chunk; } -enum consumer_stream_open_packet_status consumer_stream_open_packet( - struct lttng_consumer_stream *stream) +enum consumer_stream_open_packet_status +consumer_stream_open_packet(struct lttng_consumer_stream *stream) { int ret; enum consumer_stream_open_packet_status status; @@ -518,20 +502,21 @@ enum consumer_stream_open_packet_status consumer_stream_open_packet( ret = lttng_consumer_sample_snapshot_positions(stream); if (ret < 0) { ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; goto end; } - ret = lttng_consumer_get_produced_snapshot( - stream, &produced_pos_before); + ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_before); if (ret < 0) { ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; goto end; } @@ -539,9 +524,10 @@ enum consumer_stream_open_packet_status consumer_stream_open_packet( ret = consumer_stream_flush_buffer(stream, 0); if (ret) { ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; goto end; } @@ -549,9 +535,10 @@ enum consumer_stream_open_packet_status consumer_stream_open_packet( ret = lttng_consumer_sample_snapshot_positions(stream); if (ret < 0) { ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; goto end; } @@ -559,9 +546,10 @@ enum consumer_stream_open_packet_status consumer_stream_open_packet( ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after); if (ret < 0) { ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; goto end; } @@ -571,8 +559,8 @@ enum consumer_stream_open_packet_status consumer_stream_open_packet( * positons before and after the flush. */ status = produced_pos_before != produced_pos_after ? - CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED : - CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE; + CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED : + CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE; if (status == CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED) { stream->opened_packet_in_current_trace_chunk = true; } @@ -589,25 +577,26 @@ end: * ring-buffer. In that case, a second attempt is performed after consuming * a packet since that will have freed enough space in the ring-buffer. */ -static -int post_consume_open_new_packet(struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuffer __attribute__((unused)), - struct lttng_consumer_local_data *ctx __attribute__((unused))) +static int post_consume_open_new_packet(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer + __attribute__((unused)), + struct lttng_consumer_local_data *ctx + __attribute__((unused))) { int ret = 0; - if (!stream->opened_packet_in_current_trace_chunk && - stream->trace_chunk && - !stream_is_rotating_to_null_chunk(stream)) { + if (!stream->opened_packet_in_current_trace_chunk && stream->trace_chunk && + !stream_is_rotating_to_null_chunk(stream)) { const enum consumer_stream_open_packet_status status = - consumer_stream_open_packet(stream); + consumer_stream_open_packet(stream); switch (status) { case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); stream->opened_packet_in_current_trace_chunk = true; break; case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: @@ -618,9 +607,10 @@ int post_consume_open_new_packet(struct lttng_consumer_stream *stream, * anyhow. */ DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); stream->opened_packet_in_current_trace_chunk = true; break; case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: @@ -638,18 +628,17 @@ end: return ret; } -struct lttng_consumer_stream *consumer_stream_create( - struct lttng_consumer_channel *channel, - uint64_t channel_key, - uint64_t stream_key, - const char *channel_name, - uint64_t relayd_id, - uint64_t session_id, - struct lttng_trace_chunk *trace_chunk, - int cpu, - int *alloc_ret, - enum consumer_channel_type type, - unsigned int monitor) +struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel, + uint64_t channel_key, + uint64_t stream_key, + const char *channel_name, + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, + int cpu, + int *alloc_ret, + enum consumer_channel_type type, + unsigned int monitor) { int ret; struct lttng_consumer_stream *stream; @@ -698,8 +687,7 @@ struct lttng_consumer_stream *consumer_stream_create( pthread_mutex_init(&stream->metadata_rdv_lock, NULL); } else { /* Format stream name to _ */ - ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", - channel_name, cpu); + ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", channel_name, cpu); if (ret < 0) { PERROR("snprintf stream name"); goto error; @@ -730,63 +718,55 @@ struct lttng_consumer_stream *consumer_stream_create( /* Init session id node with the stream session id */ lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id); - DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 - " relayd_id %" PRIu64 ", session_id %" PRIu64, - stream->name, stream->key, channel_key, - stream->net_seq_idx, stream->session_id); + DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 + ", session_id %" PRIu64, + stream->name, + stream->key, + channel_key, + stream->net_seq_idx, + stream->session_id); rcu_read_unlock(); - lttng_dynamic_array_init(&stream->read_subbuffer_ops.post_consume_cbs, - sizeof(post_consume_cb), NULL); + lttng_dynamic_array_init( + &stream->read_subbuffer_ops.post_consume_cbs, sizeof(post_consume_cb), NULL); if (type == CONSUMER_CHANNEL_TYPE_METADATA) { - stream->read_subbuffer_ops.lock = - consumer_stream_metadata_lock_all; - stream->read_subbuffer_ops.unlock = - consumer_stream_metadata_unlock_all; + stream->read_subbuffer_ops.lock = consumer_stream_metadata_lock_all; + stream->read_subbuffer_ops.unlock = consumer_stream_metadata_unlock_all; stream->read_subbuffer_ops.assert_locked = - consumer_stream_metadata_assert_locked_all; - stream->read_subbuffer_ops.pre_consume_subbuffer = - metadata_stream_check_version; + consumer_stream_metadata_assert_locked_all; + stream->read_subbuffer_ops.pre_consume_subbuffer = metadata_stream_check_version; } else { const post_consume_cb post_consume_index_op = channel->is_live ? - consumer_stream_sync_metadata_index : - consumer_stream_send_index; - const post_consume_cb post_consume_open_new_packet_ = - post_consume_open_new_packet; - - ret = lttng_dynamic_array_add_element( - &stream->read_subbuffer_ops.post_consume_cbs, - &post_consume_index_op); + consumer_stream_sync_metadata_index : + consumer_stream_send_index; + const post_consume_cb post_consume_open_new_packet_ = post_consume_open_new_packet; + + ret = lttng_dynamic_array_add_element(&stream->read_subbuffer_ops.post_consume_cbs, + &post_consume_index_op); if (ret) { PERROR("Failed to add `send index` callback to stream's post consumption callbacks"); goto error; } - ret = lttng_dynamic_array_add_element( - &stream->read_subbuffer_ops.post_consume_cbs, - &post_consume_open_new_packet_); + ret = lttng_dynamic_array_add_element(&stream->read_subbuffer_ops.post_consume_cbs, + &post_consume_open_new_packet_); if (ret) { PERROR("Failed to add `open new packet` callback to stream's post consumption callbacks"); goto error; } stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all; - stream->read_subbuffer_ops.unlock = - consumer_stream_data_unlock_all; - stream->read_subbuffer_ops.assert_locked = - consumer_stream_data_assert_locked_all; - stream->read_subbuffer_ops.pre_consume_subbuffer = - consumer_stream_update_stats; + stream->read_subbuffer_ops.unlock = consumer_stream_data_unlock_all; + stream->read_subbuffer_ops.assert_locked = consumer_stream_data_assert_locked_all; + stream->read_subbuffer_ops.pre_consume_subbuffer = consumer_stream_update_stats; } if (channel->output == CONSUMER_CHANNEL_MMAP) { - stream->read_subbuffer_ops.consume_subbuffer = - consumer_stream_consume_mmap; + stream->read_subbuffer_ops.consume_subbuffer = consumer_stream_consume_mmap; } else { - stream->read_subbuffer_ops.consume_subbuffer = - consumer_stream_consume_splice; + stream->read_subbuffer_ops.consume_subbuffer = consumer_stream_consume_splice; } return stream; @@ -811,7 +791,7 @@ end: * a hash table before calling this. */ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, - struct consumer_relayd_sock_pair *relayd) + struct consumer_relayd_sock_pair *relayd) { int ret; @@ -825,18 +805,17 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, /* Closing streams requires to lock the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_send_close_stream(&relayd->control_sock, - stream->relayd_stream_id, - stream->next_net_seq_num - 1); + ret = relayd_send_close_stream( + &relayd->control_sock, stream->relayd_stream_id, stream->next_net_seq_num - 1); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", + relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); } /* Both conditions are met, we destroy the relayd. */ - if (uatomic_read(&relayd->refcount) == 0 && - uatomic_read(&relayd->destroy_flag)) { + if (uatomic_read(&relayd->refcount) == 0 && uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } stream->net_seq_idx = (uint64_t) -1ULL; @@ -891,8 +870,7 @@ void consumer_stream_close_output(struct lttng_consumer_stream *stream) * The consumer data lock MUST be acquired. * The stream lock MUST be acquired. */ -void consumer_stream_delete(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +void consumer_stream_delete(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { int ret; struct lttng_ht_iter iter; @@ -1028,8 +1006,7 @@ static void destroy_close_stream(struct lttng_consumer_stream *stream) * Decrement the stream's channel refcount and if down to 0, return the channel * pointer so it can be destroyed by the caller or NULL if not. */ -static struct lttng_consumer_channel *unref_channel( - struct lttng_consumer_stream *stream) +static struct lttng_consumer_channel *unref_channel(struct lttng_consumer_stream *stream) { struct lttng_consumer_channel *free_chan = NULL; @@ -1037,8 +1014,8 @@ static struct lttng_consumer_channel *unref_channel( LTTNG_ASSERT(stream->chan); /* Update refcount of channel and see if we need to destroy it. */ - if (!uatomic_sub_return(&stream->chan->refcount, 1) - && !uatomic_read(&stream->chan->nb_init_stream_left)) { + if (!uatomic_sub_return(&stream->chan->refcount, 1) && + !uatomic_read(&stream->chan->nb_init_stream_left)) { free_chan = stream->chan; } @@ -1053,8 +1030,7 @@ static struct lttng_consumer_channel *unref_channel( * This MUST be called WITHOUT the consumer data and stream lock acquired if * the stream is in _monitor_ mode else it does not matter. */ -void consumer_stream_destroy(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +void consumer_stream_destroy(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { LTTNG_ASSERT(stream); @@ -1077,7 +1053,6 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, /* Remove every reference of the stream in the consumer. */ consumer_stream_delete(stream, ht); - destroy_close_stream(stream); /* Update channel's refcount of the stream. */ @@ -1118,7 +1093,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, * Return 0 on success or else a negative value. */ int consumer_stream_write_index(struct lttng_consumer_stream *stream, - struct ctf_packet_index *element) + struct ctf_packet_index *element) { int ret; @@ -1131,21 +1106,25 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_send_index(&relayd->control_sock, element, - stream->relayd_stream_id, stream->next_net_seq_num - 1); + ret = relayd_send_index(&relayd->control_sock, + element, + stream->relayd_stream_id, + stream->next_net_seq_num - 1); if (ret < 0) { /* * Communication error with lttng-relayd, * perform cleanup now */ - ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", + relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); ret = -1; } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", - stream->key, stream->net_seq_idx); + stream->key, + stream->net_seq_idx); ret = -1; } } else { @@ -1164,8 +1143,7 @@ error: return ret; } -int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, - bool create_index) +int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, bool create_index) { int ret; enum lttng_trace_chunk_status chunk_status; @@ -1176,10 +1154,13 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, ASSERT_LOCKED(stream->lock); LTTNG_ASSERT(stream->trace_chunk); - ret = utils_stream_file_path(stream->chan->pathname, stream->name, - stream->chan->tracefile_size, - stream->tracefile_count_current, NULL, - stream_path, sizeof(stream_path)); + ret = utils_stream_file_path(stream->chan->pathname, + stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, + NULL, + stream_path, + sizeof(stream_path)); if (ret < 0) { goto end; } @@ -1187,16 +1168,15 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, if (stream->out_fd >= 0) { ret = close(stream->out_fd); if (ret < 0) { - PERROR("Failed to close stream file \"%s\"", - stream->name); + PERROR("Failed to close stream file \"%s\"", stream->name); goto end; } stream->out_fd = -1; } DBG("Opening stream output file \"%s\"", stream_path); - chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path, - flags, mode, &stream->out_fd, false); + chunk_status = lttng_trace_chunk_open_file( + stream->trace_chunk, stream_path, flags, mode, &stream->out_fd, false); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to open stream file \"%s\"", stream->name); ret = -1; @@ -1207,14 +1187,16 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, if (stream->index_file) { lttng_index_file_put(stream->index_file); } - chunk_status = lttng_index_file_create_from_trace_chunk( - stream->trace_chunk, - stream->chan->pathname, - stream->name, - stream->chan->tracefile_size, - stream->tracefile_count_current, - CTF_INDEX_MAJOR, CTF_INDEX_MINOR, - false, &stream->index_file); + chunk_status = + lttng_index_file_create_from_trace_chunk(stream->trace_chunk, + stream->chan->pathname, + stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, + CTF_INDEX_MINOR, + false, + &stream->index_file); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -1; goto end; @@ -1234,8 +1216,7 @@ int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream) stream->tracefile_count_current++; if (stream->chan->tracefile_count > 0) { - stream->tracefile_count_current %= - stream->chan->tracefile_count; + stream->tracefile_count_current %= stream->chan->tracefile_count; } DBG("Rotating output files of stream \"%s\"", stream->name); @@ -1258,8 +1239,7 @@ bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream) return cds_lfht_is_node_deleted(&stream->node.node); } -static ssize_t metadata_bucket_flush( - const struct stream_subbuffer *buffer, void *data) +static ssize_t metadata_bucket_flush(const struct stream_subbuffer *buffer, void *data) { ssize_t ret; struct lttng_consumer_stream *stream = (lttng_consumer_stream *) data; @@ -1272,10 +1252,10 @@ end: return ret; } -static ssize_t metadata_bucket_consume( - struct lttng_consumer_local_data *unused __attribute__((unused)), - struct lttng_consumer_stream *stream, - const struct stream_subbuffer *subbuffer) +static ssize_t metadata_bucket_consume(struct lttng_consumer_local_data *unused + __attribute__((unused)), + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) { ssize_t ret; enum metadata_bucket_status status; @@ -1293,8 +1273,7 @@ static ssize_t metadata_bucket_consume( return ret; } -int consumer_stream_enable_metadata_bucketization( - struct lttng_consumer_stream *stream) +int consumer_stream_enable_metadata_bucketization(struct lttng_consumer_stream *stream) { int ret = 0; @@ -1302,8 +1281,7 @@ int consumer_stream_enable_metadata_bucketization( LTTNG_ASSERT(!stream->metadata_bucket); LTTNG_ASSERT(stream->chan->output == CONSUMER_CHANNEL_MMAP); - stream->metadata_bucket = metadata_bucket_create( - metadata_bucket_flush, stream); + stream->metadata_bucket = metadata_bucket_create(metadata_bucket_flush, stream); if (!stream->metadata_bucket) { ret = -1; goto end; @@ -1314,8 +1292,8 @@ end: return ret; } -void consumer_stream_metadata_set_version( - struct lttng_consumer_stream *stream, uint64_t new_version) +void consumer_stream_metadata_set_version(struct lttng_consumer_stream *stream, + uint64_t new_version) { LTTNG_ASSERT(new_version > stream->metadata_version); stream->metadata_version = new_version; @@ -1326,8 +1304,7 @@ void consumer_stream_metadata_set_version( } } -int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, - bool producer_active) +int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, bool producer_active) { int ret = 0;