X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=be19c1bfd17a3bcf3526096f51ed389c387bd849;hb=48a4000561343808724f7cb5fa8c131877489ccd;hp=1e4b9c92a8813746a3e988a18b7c49d6c22beb8c;hpb=514775d9bca89b3bd072c58e779201682304c57d;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 1e4b9c92a..be19c1bfd 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -8,7 +8,6 @@ */ #define _LGPL_SOURCE -#include #include #include #include @@ -246,9 +245,9 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata, int ret; enum sync_metadata_status status; - assert(metadata); - assert(metadata->metadata_flag); - assert(ctx); + LTTNG_ASSERT(metadata); + LTTNG_ASSERT(metadata->metadata_flag); + LTTNG_ASSERT(ctx); /* * In UST, since we have to write the metadata from the cache packet @@ -366,10 +365,10 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, struct lttng_ht_iter iter; struct lttng_ht *ht; - assert(ctx); + LTTNG_ASSERT(ctx); /* Ease our life a bit. */ - ht = consumer_data.stream_list_ht; + ht = the_consumer_data.stream_list_ht; rcu_read_lock(); @@ -408,7 +407,7 @@ static int consumer_stream_sync_metadata_index( /* Block until all the metadata is sent. */ pthread_mutex_lock(&stream->metadata_timer_lock); - assert(!stream->missed_metadata_flush); + LTTNG_ASSERT(!stream->missed_metadata_flush); stream->waiting_on_metadata = true; pthread_mutex_unlock(&stream->metadata_timer_lock); @@ -629,13 +628,14 @@ struct lttng_consumer_stream *consumer_stream_create( goto end; } + rcu_read_lock(); + if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) { ERR("Failed to acquire trace chunk reference during the creation of a stream"); ret = -1; goto error; } - rcu_read_lock(); stream->chan = channel; stream->key = stream_key; stream->trace_chunk = trace_chunk; @@ -775,12 +775,12 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, { int ret; - assert(stream); - assert(relayd); + LTTNG_ASSERT(stream); + LTTNG_ASSERT(relayd); if (stream->sent_to_relayd) { uatomic_dec(&relayd->refcount); - assert(uatomic_read(&relayd->refcount) >= 0); + LTTNG_ASSERT(uatomic_read(&relayd->refcount) >= 0); } /* Closing streams requires to lock the control socket. */ @@ -815,9 +815,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) int ret; struct consumer_relayd_sock_pair *relayd; - assert(stream); + LTTNG_ASSERT(stream); - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: if (stream->mmap_base != NULL) { ret = munmap(stream->mmap_base, stream->mmap_len); @@ -864,7 +864,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) } default: ERR("Unknown consumer_data type"); - assert(0); + abort(); } /* Close output fd. Could be a socket or local file at this point. */ @@ -905,16 +905,16 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, int ret; struct lttng_ht_iter iter; - assert(stream); + LTTNG_ASSERT(stream); /* Should NEVER be called not in monitor mode. */ - assert(stream->chan->monitor); + LTTNG_ASSERT(stream->chan->monitor); rcu_read_lock(); if (ht) { iter.iter.node = &stream->node.node; ret = lttng_ht_del(ht, &iter); - assert(!ret); + LTTNG_ASSERT(!ret); } /* Delete from stream per channel ID hash table. */ @@ -925,19 +925,19 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, * that did not add the stream to a (all) hash table. Same goes for the * next call ht del call. */ - (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter); + (void) lttng_ht_del(the_consumer_data.stream_per_chan_id_ht, &iter); /* Delete from the global stream list. */ iter.iter.node = &stream->node_session_id.node; /* See the previous ht del on why we ignore the returned value. */ - (void) lttng_ht_del(consumer_data.stream_list_ht, &iter); + (void) lttng_ht_del(the_consumer_data.stream_list_ht, &iter); rcu_read_unlock(); if (!stream->metadata_flag) { /* Decrement the stream count of the global consumer data. */ - assert(consumer_data.stream_count > 0); - consumer_data.stream_count--; + LTTNG_ASSERT(the_consumer_data.stream_count > 0); + the_consumer_data.stream_count--; } } @@ -946,7 +946,7 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, */ void consumer_stream_free(struct lttng_consumer_stream *stream) { - assert(stream); + LTTNG_ASSERT(stream); metadata_bucket_destroy(stream->metadata_bucket); call_rcu(&stream->node.head, free_stream_rcu); @@ -957,9 +957,9 @@ void consumer_stream_free(struct lttng_consumer_stream *stream) */ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) { - assert(stream); + LTTNG_ASSERT(stream); - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; case LTTNG_CONSUMER32_UST: @@ -968,7 +968,7 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) break; default: ERR("Unknown consumer_data type"); - assert(0); + abort(); } } @@ -977,7 +977,7 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) */ static void destroy_close_stream(struct lttng_consumer_stream *stream) { - assert(stream); + LTTNG_ASSERT(stream); DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key); @@ -996,8 +996,8 @@ static struct lttng_consumer_channel *unref_channel( { struct lttng_consumer_channel *free_chan = NULL; - assert(stream); - assert(stream->chan); + LTTNG_ASSERT(stream); + LTTNG_ASSERT(stream->chan); /* Update refcount of channel and see if we need to destroy it. */ if (!uatomic_sub_return(&stream->chan->refcount, 1) @@ -1019,7 +1019,7 @@ static struct lttng_consumer_channel *unref_channel( void consumer_stream_destroy(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { - assert(stream); + LTTNG_ASSERT(stream); /* Stream is in monitor mode. */ if (stream->monitor) { @@ -1031,7 +1031,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, * stream thus being globally visible. */ if (stream->globally_visible) { - pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&the_consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); /* Remove every reference of the stream in the consumer. */ @@ -1043,11 +1043,11 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, free_chan = unref_channel(stream); /* Indicates that the consumer data state MUST be updated after this. */ - consumer_data.need_update = 1; + the_consumer_data.need_update = 1; pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); - pthread_mutex_unlock(&consumer_data.lock); + pthread_mutex_unlock(&the_consumer_data.lock); } else { /* * If the stream is not visible globally, this needs to be done @@ -1080,8 +1080,8 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, { int ret; - assert(stream); - assert(element); + LTTNG_ASSERT(stream); + LTTNG_ASSERT(element); rcu_read_lock(); if (stream->net_seq_idx != (uint64_t) -1ULL) { @@ -1132,7 +1132,7 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, char stream_path[LTTNG_PATH_MAX]; ASSERT_LOCKED(stream->lock); - assert(stream->trace_chunk); + LTTNG_ASSERT(stream->trace_chunk); ret = utils_stream_file_path(stream->chan->pathname, stream->name, stream->chan->tracefile_size, @@ -1150,16 +1150,16 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, goto end; } stream->out_fd = -1; - } + } DBG("Opening stream output file \"%s\"", stream_path); chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path, flags, mode, &stream->out_fd, false); - if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to open stream file \"%s\"", stream->name); ret = -1; goto end; - } + } if (!stream->metadata_flag && (create_index || stream->index_file)) { if (stream->index_file) { @@ -1212,7 +1212,7 @@ bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream) * This function does not take a const stream since * cds_lfht_is_node_deleted was not const before liburcu 0.12. */ - assert(stream); + LTTNG_ASSERT(stream); return cds_lfht_is_node_deleted(&stream->node.node); } @@ -1256,9 +1256,9 @@ int consumer_stream_enable_metadata_bucketization( { int ret = 0; - assert(stream->metadata_flag); - assert(!stream->metadata_bucket); - assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); + LTTNG_ASSERT(stream->metadata_flag); + LTTNG_ASSERT(!stream->metadata_bucket); + LTTNG_ASSERT(stream->chan->output == CONSUMER_CHANNEL_MMAP); stream->metadata_bucket = metadata_bucket_create( metadata_bucket_flush, stream); @@ -1275,7 +1275,7 @@ end: void consumer_stream_metadata_set_version( struct lttng_consumer_stream *stream, uint64_t new_version) { - assert(new_version > stream->metadata_version); + LTTNG_ASSERT(new_version > stream->metadata_version); stream->metadata_version = new_version; stream->reset_metadata_flag = 1; @@ -1289,7 +1289,7 @@ int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, { int ret = 0; - switch (consumer_data.type) { + switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: if (producer_active) { ret = kernctl_buffer_flush(stream->wait_fd); @@ -1317,7 +1317,7 @@ int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_flush_buffer(stream, (int) producer_active); + ret = lttng_ustconsumer_flush_buffer(stream, (int) producer_active); break; default: ERR("Unknown consumer_data type");