X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.cpp;h=ab9b39111a02580bd176baac55b7c3b95379d123;hb=56047f5a23df5c2c583a102b8015bbec5a7da9f1;hp=2816d1a37cb28a9d544e61fb874dc4e3a2468a9a;hpb=28ab034a2c3582d07d3423d2d746731f87d3969f;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.cpp b/src/common/consumer/consumer-stream.cpp index 2816d1a37..ab9b39111 100644 --- a/src/common/consumer/consumer-stream.cpp +++ b/src/common/consumer/consumer-stream.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -362,7 +363,7 @@ end_unlock_mutex: int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, uint64_t session_id) { int ret; - struct lttng_consumer_stream *stream = NULL; + struct lttng_consumer_stream *stream = nullptr; struct lttng_ht_iter iter; struct lttng_ht *ht; @@ -371,7 +372,7 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, uint64_ /* Ease our life a bit. */ ht = the_consumer_data.stream_list_ht; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Search the metadata associated with the session id of the given stream. */ @@ -400,7 +401,6 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, uint64_ ret = 0; end: - rcu_read_unlock(); return ret; } @@ -521,7 +521,7 @@ consumer_stream_open_packet(struct lttng_consumer_stream *stream) goto end; } - ret = consumer_stream_flush_buffer(stream, 0); + ret = consumer_stream_flush_buffer(stream, false); if (ret) { ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64 ", channel name = %s, session id = %" PRIu64, @@ -642,16 +642,15 @@ struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_chann { int ret; struct lttng_consumer_stream *stream; + lttng::urcu::read_lock_guard read_lock; stream = zmalloc(); - if (stream == NULL) { + if (stream == nullptr) { PERROR("malloc struct lttng_consumer_stream"); ret = -ENOMEM; goto end; } - rcu_read_lock(); - if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) { ERR("Failed to acquire trace chunk reference during the creation of a stream"); ret = -1; @@ -669,13 +668,13 @@ struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_chann stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; - stream->index_file = NULL; + stream->index_file = nullptr; stream->last_sequence_number = -1ULL; stream->rotate_position = -1ULL; /* Buffer is created with an open packet. */ stream->opened_packet_in_current_trace_chunk = true; - pthread_mutex_init(&stream->lock, NULL); - pthread_mutex_init(&stream->metadata_timer_lock, NULL); + pthread_mutex_init(&stream->lock, nullptr); + pthread_mutex_init(&stream->metadata_timer_lock, nullptr); /* If channel is the metadata, flag this stream as metadata. */ if (type == CONSUMER_CHANNEL_TYPE_METADATA) { @@ -683,8 +682,8 @@ struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_chann /* Metadata is flat out. */ strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name)); /* Live rendez-vous point. */ - pthread_cond_init(&stream->metadata_rdv, NULL); - pthread_mutex_init(&stream->metadata_rdv_lock, NULL); + pthread_cond_init(&stream->metadata_rdv, nullptr); + pthread_mutex_init(&stream->metadata_rdv_lock, nullptr); } else { /* Format stream name to _ */ ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", channel_name, cpu); @@ -726,10 +725,8 @@ struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_chann stream->net_seq_idx, stream->session_id); - rcu_read_unlock(); - lttng_dynamic_array_init( - &stream->read_subbuffer_ops.post_consume_cbs, sizeof(post_consume_cb), NULL); + &stream->read_subbuffer_ops.post_consume_cbs, sizeof(post_consume_cb), nullptr); if (type == CONSUMER_CHANNEL_TYPE_METADATA) { stream->read_subbuffer_ops.lock = consumer_stream_metadata_lock_all; @@ -772,7 +769,6 @@ struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_chann return stream; error: - rcu_read_unlock(); lttng_trace_chunk_put(stream->trace_chunk); lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs); free(stream); @@ -780,7 +776,7 @@ end: if (alloc_ret) { *alloc_ret = ret; } - return NULL; + return nullptr; } /* @@ -847,21 +843,19 @@ void consumer_stream_close_output(struct lttng_consumer_stream *stream) if (stream->index_file) { lttng_index_file_put(stream->index_file); - stream->index_file = NULL; + stream->index_file = nullptr; } lttng_trace_chunk_put(stream->trace_chunk); - stream->trace_chunk = NULL; + stream->trace_chunk = nullptr; /* Check and cleanup relayd if needed. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { + if (relayd != nullptr) { consumer_stream_relayd_close(stream, relayd); stream->net_seq_idx = -1ULL; } - - rcu_read_unlock(); } /* @@ -879,7 +873,7 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, struct lttng_h /* Should NEVER be called not in monitor mode. */ LTTNG_ASSERT(stream->chan->monitor); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; if (ht) { iter.iter.node = &stream->node.node; @@ -902,8 +896,6 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, struct lttng_h /* See the previous ht del on why we ignore the returned value. */ (void) lttng_ht_del(the_consumer_data.stream_list_ht, &iter); - rcu_read_unlock(); - if (!stream->metadata_flag) { /* Decrement the stream count of the global consumer data. */ LTTNG_ASSERT(the_consumer_data.stream_count > 0); @@ -931,7 +923,7 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - if (stream->mmap_base != NULL) { + if (stream->mmap_base != nullptr) { const auto ret = munmap(stream->mmap_base, stream->mmap_len); if (ret != 0) { @@ -1008,7 +1000,7 @@ static void destroy_close_stream(struct lttng_consumer_stream *stream) */ static struct lttng_consumer_channel *unref_channel(struct lttng_consumer_stream *stream) { - struct lttng_consumer_channel *free_chan = NULL; + struct lttng_consumer_channel *free_chan = nullptr; LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->chan); @@ -1038,7 +1030,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, struct lttng_ /* Stream is in monitor mode. */ if (stream->monitor) { - struct lttng_consumer_channel *free_chan = NULL; + struct lttng_consumer_channel *free_chan = nullptr; /* * This means that the stream was successfully removed from the streams @@ -1082,7 +1074,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, struct lttng_ /* Free stream within a RCU call. */ lttng_trace_chunk_put(stream->trace_chunk); - stream->trace_chunk = NULL; + stream->trace_chunk = nullptr; lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs); consumer_stream_free(stream); } @@ -1100,7 +1092,7 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, LTTNG_ASSERT(stream); LTTNG_ASSERT(element); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; if (stream->net_seq_idx != (uint64_t) -1ULL) { struct consumer_relayd_sock_pair *relayd; relayd = consumer_find_relayd(stream->net_seq_idx); @@ -1139,7 +1131,6 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, } error: - rcu_read_unlock(); return ret; } @@ -1158,7 +1149,7 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, bo stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, - NULL, + nullptr, stream_path, sizeof(stream_path)); if (ret < 0) { @@ -1244,7 +1235,7 @@ static ssize_t metadata_bucket_flush(const struct stream_subbuffer *buffer, void ssize_t ret; struct lttng_consumer_stream *stream = (lttng_consumer_stream *) data; - ret = consumer_stream_consume_mmap(NULL, stream, buffer); + ret = consumer_stream_consume_mmap(nullptr, stream, buffer); if (ret < 0) { goto end; }