X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=a9de1696e4ce862d52bf58b5c6ab44f73ad76b28;hp=b059eaf52c58397166b564988136dbd59b1b0036;hb=HEAD;hpb=28ab034a2c3582d07d3423d2d746731f87d3969f diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index b059eaf52..1da243601 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -29,10 +30,12 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -42,6 +45,7 @@ #include #include #include +#include #include lttng_consumer_global_data the_consumer_data; @@ -79,7 +83,7 @@ int data_consumption_paused; */ int consumer_quit; -static const char *get_consumer_domain(void) +static const char *get_consumer_domain() { switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -100,11 +104,12 @@ static const char *get_consumer_domain(void) */ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe) { - struct lttng_consumer_stream *null_stream = NULL; + struct lttng_consumer_stream *null_stream = nullptr; LTTNG_ASSERT(pipe); - (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); + (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a + pointer. */ } static void notify_health_quit_pipe(int *pipe) @@ -138,7 +143,7 @@ static void notify_channel_pipe(struct lttng_consumer_local_data *ctx, void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key) { - notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL); + notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL); } static int read_channel_pipe(struct lttng_consumer_local_data *ctx, @@ -173,14 +178,7 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel) /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { - /* - * Once a stream is added to this list, the buffers were created so we - * have a guarantee that this call will succeed. Setting the monitor - * mode to 0 so we don't lock nor try to delete the stream from the - * global hash table. - */ - stream->monitor = 0; - consumer_stream_destroy(stream, NULL); + consumer_stream_destroy(stream, nullptr); } } @@ -192,25 +190,23 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht * { struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; - struct lttng_consumer_stream *stream = NULL; + struct lttng_consumer_stream *stream = nullptr; LTTNG_ASSERT(ht); /* -1ULL keys are lookup failures */ if (key == (uint64_t) -1ULL) { - return NULL; + return nullptr; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_lookup(ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); - if (node != NULL) { + if (node != nullptr) { stream = lttng::utils::container_of(node, <tng_consumer_stream::node); } - rcu_read_unlock(); - return stream; } @@ -218,7 +214,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; stream = find_stream(key, ht); if (stream) { stream->key = (uint64_t) -1ULL; @@ -229,7 +225,6 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht) */ stream->node.key = (uint64_t) -1ULL; } - rcu_read_unlock(); } /* @@ -242,18 +237,18 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) { struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; - struct lttng_consumer_channel *channel = NULL; + struct lttng_consumer_channel *channel = nullptr; ASSERT_RCU_READ_LOCKED(); /* -1ULL keys are lookup failures */ if (key == (uint64_t) -1ULL) { - return NULL; + return nullptr; } lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); - if (node != NULL) { + if (node != nullptr) { channel = lttng::utils::container_of(node, <tng_consumer_channel::node); } @@ -272,7 +267,7 @@ static void steal_channel_key(uint64_t key) { struct lttng_consumer_channel *channel; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(key); if (channel) { channel->key = (uint64_t) -1ULL; @@ -283,7 +278,6 @@ static void steal_channel_key(uint64_t key) */ channel->node.key = (uint64_t) -1ULL; } - rcu_read_unlock(); } static void free_channel_rcu(struct rcu_head *head) @@ -303,7 +297,8 @@ static void free_channel_rcu(struct rcu_head *head) ERR("Unknown consumer_data type"); abort(); } - free(channel); + + delete channel; } /* @@ -338,7 +333,7 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) int ret; struct lttng_ht_iter iter; - if (relayd == NULL) { + if (relayd == nullptr) { return; } @@ -398,12 +393,12 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) } lttng_trace_chunk_put(channel->trace_chunk); - channel->trace_chunk = NULL; + channel->trace_chunk = nullptr; if (channel->is_published) { int ret; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; iter.iter.node = &channel->node.node; ret = lttng_ht_del(the_consumer_data.channel_ht, &iter); LTTNG_ASSERT(!ret); @@ -411,7 +406,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) iter.iter.node = &channel->channels_by_session_id_ht_node.node; ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter); LTTNG_ASSERT(!ret); - rcu_read_unlock(); } channel->is_deleted = true; @@ -425,19 +419,20 @@ end: * Iterate over the relayd hash table and destroy each element. Finally, * destroy the whole hash table. */ -static void cleanup_relayd_ht(void) +static void cleanup_relayd_ht() { struct lttng_ht_iter iter; struct consumer_relayd_sock_pair *relayd; - rcu_read_lock(); + { + lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { - consumer_destroy_relayd(relayd); + cds_lfht_for_each_entry ( + the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { + consumer_destroy_relayd(relayd); + } } - rcu_read_unlock(); - lttng_ht_destroy(the_consumer_data.relayd_ht); } @@ -456,12 +451,14 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Let's begin with metadata */ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); + stream->chan->metadata_pushed_wait_queue.wake_all(); + DBG("Delete flag set to metadata stream %d", stream->wait_fd); } } @@ -473,7 +470,6 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, DBG("Delete flag set to data stream %d", stream->wait_fd); } } - rcu_read_unlock(); } /* @@ -580,7 +576,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Steal stream identifier to avoid having streams with the same key */ steal_stream_key(stream->key, ht); @@ -613,7 +609,6 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) the_consumer_data.stream_count++; the_consumer_data.need_update = 1; - rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); @@ -635,7 +630,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd) lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter); node = lttng_ht_iter_get_node_u64(&iter); - if (node != NULL) { + if (node != nullptr) { goto end; } lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node); @@ -649,7 +644,7 @@ end: */ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx) { - struct consumer_relayd_sock_pair *obj = NULL; + struct consumer_relayd_sock_pair *obj = nullptr; /* net sequence index of -1 is a failure */ if (net_seq_idx == (uint64_t) -1ULL) { @@ -657,7 +652,7 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint } obj = zmalloc(); - if (obj == NULL) { + if (obj == nullptr) { PERROR("zmalloc relayd sock"); goto error; } @@ -668,7 +663,7 @@ static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint obj->control_sock.sock.fd = -1; obj->data_sock.sock.fd = -1; lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx); - pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); + pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr); error: return obj; @@ -685,7 +680,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key) { struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; - struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_sock_pair *relayd = nullptr; ASSERT_RCU_READ_LOCKED(); @@ -696,7 +691,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key) lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); - if (node != NULL) { + if (node != nullptr) { relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node); } @@ -719,9 +714,9 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path LTTNG_ASSERT(path); /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { + if (relayd != nullptr) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, @@ -756,7 +751,6 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path stream->net_seq_idx); end: - rcu_read_unlock(); return ret; } @@ -773,9 +767,9 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) LTTNG_ASSERT(net_seq_idx != -1ULL); /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(net_seq_idx); - if (relayd != NULL) { + if (relayd != nullptr) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_streams_sent(&relayd->control_sock); @@ -796,7 +790,6 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) DBG("All streams sent relayd id %" PRIu64, net_seq_idx); end: - rcu_read_unlock(); return ret; } @@ -808,12 +801,11 @@ void close_relayd_stream(struct lttng_consumer_stream *stream) struct consumer_relayd_sock_pair *relayd; /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd) { consumer_stream_relayd_close(stream, relayd); } - rcu_read_unlock(); } /* @@ -1010,8 +1002,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, const char *root_shm_path, const char *shm_path) { - struct lttng_consumer_channel *channel = NULL; - struct lttng_trace_chunk *trace_chunk = NULL; + struct lttng_consumer_channel *channel = nullptr; + struct lttng_trace_chunk *trace_chunk = nullptr; if (chunk_id) { trace_chunk = lttng_trace_chunk_registry_find_chunk( @@ -1022,9 +1014,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } } - channel = zmalloc(); - if (channel == NULL) { - PERROR("malloc struct lttng_consumer_channel"); + try { + channel = new lttng_consumer_channel; + } catch (const std::bad_alloc& e) { + ERR("Failed to allocate lttng_consumer_channel: %s", e.what()); + channel = nullptr; goto end; } @@ -1050,8 +1044,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, break; default: abort(); - free(channel); - channel = NULL; + delete channel; + channel = nullptr; goto end; } @@ -1103,7 +1097,7 @@ end: return channel; error: consumer_del_channel(channel); - channel = NULL; + channel = nullptr; goto end; } @@ -1126,11 +1120,10 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ steal_channel_key(channel->key); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node); lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht, &channel->channels_by_session_id_ht_node); - rcu_read_unlock(); channel->is_published = true; pthread_mutex_unlock(&channel->timer_lock); @@ -1168,35 +1161,34 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, DBG("Updating poll fd array"); *nb_inactive_fd = 0; - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Only active streams with an active end point can be added to the - * poll set and local stream storage of the thread. - * - * There is a potential race here for endpoint_status to be updated - * just after the check. However, this is OK since the stream(s) will - * be deleted once the thread is notified that the end point state has - * changed where this function will be called back again. - * - * We track the number of inactive FDs because they still need to be - * closed by the polling thread after a wakeup on the data_pipe or - * metadata_pipe. - */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { - (*nb_inactive_fd)++; - continue; + + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Only active streams with an active end point can be added to the + * poll set and local stream storage of the thread. + * + * There is a potential race here for endpoint_status to be updated + * just after the check. However, this is OK since the stream(s) will + * be deleted once the thread is notified that the end point state has + * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. + */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; + continue; + } + + (*pollfd)[i].fd = stream->wait_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; + local_stream[i] = stream; + i++; } - /* - * This clobbers way too much the debug output. Uncomment that if you - * need it for debugging purposes. - */ - (*pollfd)[i].fd = stream->wait_fd; - (*pollfd)[i].events = POLLIN | POLLPRI; - local_stream[i] = stream; - i++; } - rcu_read_unlock(); /* * Insert the consumer_data_pipe at the end of the array and don't @@ -1257,11 +1249,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, * Send return code to the session daemon. * If the socket is not defined, we return 0, it is not a fatal error */ -int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd) +int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, + enum lttcomm_return_code error_code) { if (ctx->consumer_error_socket > 0) { + const std::int32_t comm_code = std::int32_t(error_code); + + static_assert( + sizeof(comm_code) >= sizeof(std::underlying_type), + "Fixed-size communication type too small to accomodate lttcomm_return_code"); return lttcomm_send_unix_sock( - ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command)); + ctx->consumer_error_socket, &comm_code, sizeof(comm_code)); } return 0; @@ -1271,20 +1269,21 @@ int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd) * Close all the tracefiles and stream fds and MUST be called when all * instances are destroyed i.e. when all threads were joined and are ended. */ -void lttng_consumer_cleanup(void) +void lttng_consumer_cleanup() { struct lttng_ht_iter iter; struct lttng_consumer_channel *channel; unsigned int trace_chunks_left; - rcu_read_lock(); + { + lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - consumer_del_channel(channel); + cds_lfht_for_each_entry ( + the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { + consumer_del_channel(channel); + } } - rcu_read_unlock(); - lttng_ht_destroy(the_consumer_data.channel_ht); lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht); @@ -1344,7 +1343,6 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) */ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset) { - int ret; int outfd = stream->out_fd; /* @@ -1356,31 +1354,8 @@ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, if (orig_offset < stream->max_sb_size) { return; } - lttng_sync_file_range(outfd, - orig_offset - stream->max_sb_size, - stream->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | - SYNC_FILE_RANGE_WAIT_AFTER); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - ret = posix_fadvise( - outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED); - if (ret && ret != -ENOSYS) { - errno = ret; - PERROR("posix_fadvise on fd %i", outfd); - } + lttng::io::hint_flush_range_dont_need_sync( + outfd, orig_offset - stream->max_sb_size, stream->max_sb_size); } /* @@ -1414,14 +1389,14 @@ lttng_consumer_create(enum lttng_consumer_type type, the_consumer_data.type = type; ctx = zmalloc(); - if (ctx == NULL) { + if (ctx == nullptr) { PERROR("allocating context"); goto error; } ctx->consumer_error_socket = -1; ctx->consumer_metadata_socket = -1; - pthread_mutex_init(&ctx->metadata_socket_lock, NULL); + pthread_mutex_init(&ctx->metadata_socket_lock, nullptr); /* assign the callbacks */ ctx->on_buffer_ready = buffer_ready; ctx->on_recv_channel = recv_channel; @@ -1470,7 +1445,7 @@ error_wakeup_pipe: error_poll_pipe: free(ctx); error: - return NULL; + return nullptr; } /* @@ -1481,19 +1456,20 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; - if (ht == NULL) { + if (ht == nullptr) { return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_stream(stream, ht); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -1507,19 +1483,20 @@ static void destroy_metadata_stream_ht(struct lttng_ht *ht) struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; - if (ht == NULL) { + if (ht == nullptr) { return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_metadata_stream(stream, ht); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -1615,19 +1592,19 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre off_t orig_offset = stream->out_fd_offset; /* Default is on the disk */ int outfd = stream->out_fd; - struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_sock_pair *relayd = nullptr; unsigned int relayd_hang_up = 0; const size_t subbuf_content_size = buffer->size - padding; size_t write_len; /* RCU lock for the relayd pointer */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk); /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd == NULL) { + if (relayd == nullptr) { ret = -EPIPE; goto end; } @@ -1739,8 +1716,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range( - outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE); + lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len); stream->out_fd_offset += write_len; lttng_consumer_sync_trace_file(stream, orig_offset); } @@ -1761,7 +1737,6 @@ end: pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - rcu_read_unlock(); return ret; } @@ -1783,7 +1758,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data int fd = stream->wait_fd; /* Default is on the disk */ int outfd = stream->out_fd; - struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_sock_pair *relayd = nullptr; int *splice_pipe; unsigned int relayd_hang_up = 0; @@ -1800,12 +1775,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data } /* RCU lock for the relayd pointer */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd == NULL) { + if (relayd == nullptr) { written = -ret; goto end; } @@ -1886,7 +1861,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data fd, splice_pipe[1]); ret_splice = splice( - fd, &offset, splice_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE); + fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE); DBG("splice chan to pipe, ret %zd", ret_splice); if (ret_splice < 0) { ret = errno; @@ -1912,9 +1887,9 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data /* Splice data out */ ret_splice = splice(splice_pipe[0], - NULL, + nullptr, outfd, - NULL, + nullptr, ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice); @@ -1940,8 +1915,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range( - outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE); + lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice); stream->out_fd_offset += ret_splice; } stream->output_written += ret_splice; @@ -1983,7 +1957,6 @@ end: pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - rcu_read_unlock(); return written; } @@ -2083,7 +2056,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } -static void lttng_consumer_close_all_metadata(void) +static void lttng_consumer_close_all_metadata() { switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -2114,7 +2087,7 @@ static void lttng_consumer_close_all_metadata(void) */ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { - struct lttng_consumer_channel *channel = NULL; + struct lttng_consumer_channel *channel = nullptr; bool free_channel = false; LTTNG_ASSERT(stream); @@ -2154,14 +2127,15 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l /* Go for channel deletion! */ free_channel = true; } - stream->chan = NULL; + stream->chan = nullptr; /* * Nullify the stream reference so it is not used after deletion. The * channel lock MUST be acquired before being able to check for a NULL * pointer value. */ - channel->metadata_stream = NULL; + channel->metadata_stream = nullptr; + channel->metadata_pushed_wait_queue.wake_all(); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock); @@ -2175,7 +2149,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l } lttng_trace_chunk_put(stream->trace_chunk); - stream->trace_chunk = NULL; + stream->trace_chunk = nullptr; consumer_stream_free(stream); } @@ -2204,7 +2178,7 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) * after this point. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* * Lookup the stream just to make sure it does not exist in our internal @@ -2238,8 +2212,6 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) */ lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id); - rcu_read_unlock(); - pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&stream->chan->timer_lock); @@ -2249,23 +2221,25 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) /* * Delete data stream that are flagged for deletion (endpoint_status). */ -static void validate_endpoint_status_data_stream(void) +static void validate_endpoint_status_data_stream() { struct lttng_ht_iter iter; struct lttng_consumer_stream *stream; DBG("Consumer delete flagged data stream"); - rcu_read_lock(); - cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; + } + /* Delete it right now */ + consumer_del_stream(stream, data_ht); } - /* Delete it right now */ - consumer_del_stream(stream, data_ht); } - rcu_read_unlock(); } /* @@ -2280,22 +2254,23 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po LTTNG_ASSERT(pollset); - rcu_read_lock(); - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; - } - /* - * Remove from pollset so the metadata thread can continue without - * blocking on a deleted stream. - */ - lttng_poll_del(pollset, stream->wait_fd); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; + } + /* + * Remove from pollset so the metadata thread can continue without + * blocking on a deleted stream. + */ + lttng_poll_del(pollset, stream->wait_fd); - /* Delete it right now */ - consumer_del_metadata_stream(stream, metadata_ht); + /* Delete it right now */ + consumer_del_metadata_stream(stream, metadata_ht); + } } - rcu_read_unlock(); } /* @@ -2306,7 +2281,7 @@ void *consumer_thread_metadata_poll(void *data) { int ret, i, pollfd, err = -1; uint32_t revents, nb_fd; - struct lttng_consumer_stream *stream = NULL; + struct lttng_consumer_stream *stream = nullptr; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; struct lttng_poll_event events; @@ -2340,7 +2315,7 @@ void *consumer_thread_metadata_poll(void *data) /* Main loop */ DBG("Metadata main loop started"); - while (1) { + while (true) { restart: health_code_update(); health_poll_entry(); @@ -2375,8 +2350,11 @@ void *consumer_thread_metadata_poll(void *data) pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, - sizeof(stream)); - if (pipe_len < sizeof(stream)) { + sizeof(stream)); /* NOLINT sizeof + used on a + pointer. */ + if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a + pointer. */ if (pipe_len < 0) { PERROR("read metadata stream"); } @@ -2393,7 +2371,7 @@ void *consumer_thread_metadata_poll(void *data) } /* A NULL stream means that the state has changed. */ - if (stream == NULL) { + if (stream == nullptr) { /* Check for deleted streams. */ validate_endpoint_status_metadata_stream(&events); goto restart; @@ -2427,7 +2405,7 @@ void *consumer_thread_metadata_poll(void *data) continue; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; @@ -2491,11 +2469,9 @@ void *consumer_thread_metadata_poll(void *data) consumer_del_metadata_stream(stream, metadata_ht); } else { ERR("Unexpected poll events %u for sock %d", revents, pollfd); - rcu_read_unlock(); goto end; } /* Release RCU lock for the stream looked up */ - rcu_read_unlock(); } } @@ -2513,7 +2489,7 @@ error_testpoint: } health_unregister(health_consumerd); rcu_unregister_thread(); - return NULL; + return nullptr; } /* @@ -2522,10 +2498,10 @@ error_testpoint: */ void *consumer_thread_data_poll(void *data) { - int num_rdy, num_hup, high_prio, ret, i, err = -1; - struct pollfd *pollfd = NULL; + int num_rdy, high_prio, ret, i, err = -1; + struct pollfd *pollfd = nullptr; /* local view of the streams */ - struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; + struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr; /* local view of consumer_data.fds_count */ int nb_fd = 0; /* 2 for the consumer_data_pipe and wake up pipe */ @@ -2546,16 +2522,15 @@ void *consumer_thread_data_poll(void *data) health_code_update(); local_stream = zmalloc(); - if (local_stream == NULL) { + if (local_stream == nullptr) { PERROR("local_stream malloc"); goto end; } - while (1) { + while (true) { health_code_update(); high_prio = 0; - num_hup = 0; /* * the fds set has been updated, we need to update our @@ -2564,15 +2539,15 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_lock(&the_consumer_data.lock); if (the_consumer_data.need_update) { free(pollfd); - pollfd = NULL; + pollfd = nullptr; free(local_stream); - local_stream = NULL; + local_stream = nullptr; /* Allocate for all fds */ pollfd = calloc(the_consumer_data.stream_count + nb_pipes_fd); - if (pollfd == NULL) { + if (pollfd == nullptr) { PERROR("pollfd malloc"); pthread_mutex_unlock(&the_consumer_data.lock); goto end; @@ -2580,7 +2555,7 @@ void *consumer_thread_data_poll(void *data) local_stream = calloc( the_consumer_data.stream_count + nb_pipes_fd); - if (local_stream == NULL) { + if (local_stream == nullptr) { PERROR("local_stream malloc"); pthread_mutex_unlock(&the_consumer_data.lock); goto end; @@ -2643,9 +2618,12 @@ void *consumer_thread_data_poll(void *data) ssize_t pipe_readlen; DBG("consumer_data_pipe wake up"); - pipe_readlen = lttng_pipe_read( - ctx->consumer_data_pipe, &new_stream, sizeof(new_stream)); - if (pipe_readlen < sizeof(new_stream)) { + pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, + &new_stream, + sizeof(new_stream)); /* NOLINT sizeof used on + a pointer. */ + if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer. + */ PERROR("Consumer data pipe"); /* Continue so we can at least handle the current stream(s). */ continue; @@ -2656,7 +2634,7 @@ void *consumer_thread_data_poll(void *data) * the sessiond poll thread changed the consumer_quit state and is * waking us up to test it. */ - if (new_stream == NULL) { + if (new_stream == nullptr) { validate_endpoint_status_data_stream(); continue; } @@ -2683,7 +2661,7 @@ void *consumer_thread_data_poll(void *data) for (i = 0; i < nb_fd; i++) { health_code_update(); - if (local_stream[i] == NULL) { + if (local_stream[i] == nullptr) { continue; } if (pollfd[i].revents & POLLPRI) { @@ -2694,7 +2672,7 @@ void *consumer_thread_data_poll(void *data) if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean the stream and free it. */ consumer_del_stream(local_stream[i], data_ht); - local_stream[i] = NULL; + local_stream[i] = nullptr; } else if (len > 0) { local_stream[i]->has_data_left_to_be_read_before_teardown = 1; @@ -2714,7 +2692,7 @@ void *consumer_thread_data_poll(void *data) for (i = 0; i < nb_fd; i++) { health_code_update(); - if (local_stream[i] == NULL) { + if (local_stream[i] == nullptr) { continue; } if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done || @@ -2725,7 +2703,7 @@ void *consumer_thread_data_poll(void *data) if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean the stream and free it. */ consumer_del_stream(local_stream[i], data_ht); - local_stream[i] = NULL; + local_stream[i] = nullptr; } else if (len > 0) { local_stream[i]->has_data_left_to_be_read_before_teardown = 1; @@ -2737,7 +2715,7 @@ void *consumer_thread_data_poll(void *data) for (i = 0; i < nb_fd; i++) { health_code_update(); - if (local_stream[i] == NULL) { + if (local_stream[i] == nullptr) { continue; } if (!local_stream[i]->hangup_flush_done && @@ -2767,25 +2745,22 @@ void *consumer_thread_data_poll(void *data) DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); - local_stream[i] = NULL; - num_hup++; + local_stream[i] = nullptr; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); - local_stream[i] = NULL; - num_hup++; + local_stream[i] = nullptr; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); - local_stream[i] = NULL; - num_hup++; + local_stream[i] = nullptr; } } - if (local_stream[i] != NULL) { + if (local_stream[i] != nullptr) { local_stream[i]->has_data_left_to_be_read_before_teardown = 0; } } @@ -2815,7 +2790,7 @@ error_testpoint: health_unregister(health_consumerd); rcu_unregister_thread(); - return NULL; + return nullptr; } /* @@ -2831,7 +2806,7 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe ht = the_consumer_data.stream_per_chan_id_ht; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, @@ -2871,7 +2846,6 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe next: pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); } static void destroy_channel_ht(struct lttng_ht *ht) @@ -2880,16 +2854,18 @@ static void destroy_channel_ht(struct lttng_ht *ht) struct lttng_consumer_channel *channel; int ret; - if (ht == NULL) { + if (ht == nullptr) { return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { - ret = lttng_ht_del(ht, &iter); - LTTNG_ASSERT(ret != 0); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { + ret = lttng_ht_del(ht, &iter); + LTTNG_ASSERT(ret != 0); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -2905,7 +2881,7 @@ void *consumer_thread_channel_poll(void *data) { int ret, i, pollfd, err = -1; uint32_t revents, nb_fd; - struct lttng_consumer_channel *chan = NULL; + struct lttng_consumer_channel *chan = nullptr; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; struct lttng_poll_event events; @@ -2945,7 +2921,7 @@ void *consumer_thread_channel_poll(void *data) /* Main loop */ DBG("Channel main loop started"); - while (1) { + while (true) { restart: health_code_update(); DBG("Channel poll wait"); @@ -2991,19 +2967,20 @@ void *consumer_thread_channel_poll(void *data) switch (action) { case CONSUMER_CHANNEL_ADD: + { DBG("Adding channel %d to poll set", chan->wait_fd); lttng_ht_node_init_u64(&chan->wait_fd_node, chan->wait_fd); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(channel_ht, &chan->wait_fd_node); - rcu_read_unlock(); /* Add channel to the global poll events list */ // FIXME: Empty flag on a pipe pollset, this might // hang on FreeBSD. lttng_poll_add(&events, chan->wait_fd, 0); break; + } case CONSUMER_CHANNEL_DEL: { /* @@ -3016,10 +2993,9 @@ void *consumer_thread_channel_poll(void *data) * GET_CHANNEL failed. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; chan = consumer_find_channel(key); if (!chan) { - rcu_read_unlock(); ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key); @@ -3052,7 +3028,6 @@ void *consumer_thread_channel_poll(void *data) if (!uatomic_sub_return(&chan->refcount, 1)) { consumer_del_channel(chan); } - rcu_read_unlock(); goto restart; } case CONSUMER_CHANNEL_QUIT: @@ -3086,7 +3061,7 @@ void *consumer_thread_channel_poll(void *data) continue; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; @@ -3119,12 +3094,10 @@ void *consumer_thread_channel_poll(void *data) } } else { ERR("Unexpected poll events %u for sock %d", revents, pollfd); - rcu_read_unlock(); goto end; } /* Release RCU lock for the channel looked up */ - rcu_read_unlock(); } } @@ -3143,7 +3116,7 @@ error_testpoint: } health_unregister(health_consumerd); rcu_unregister_thread(); - return NULL; + return nullptr; } static int set_metadata_socket(struct lttng_consumer_local_data *ctx, @@ -3266,7 +3239,7 @@ void *consumer_thread_sessiond_poll(void *data) consumer_sockpoll[1].fd = sock; consumer_sockpoll[1].events = POLLIN | POLLPRI; - while (1) { + while (true) { health_code_update(); health_poll_entry(); @@ -3323,7 +3296,7 @@ end: */ notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT); + notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT); notify_health_quit_pipe(health_quit_pipe); @@ -3349,7 +3322,7 @@ error_testpoint: health_unregister(health_consumerd); rcu_unregister_thread(); - return NULL; + return nullptr; } static int post_consume(struct lttng_consumer_stream *stream, @@ -3503,7 +3476,7 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) /* * Allocate and set consumer data hash tables. */ -int lttng_consumer_init(void) +int lttng_consumer_init() { the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!the_consumer_data.channel_ht) { @@ -3570,7 +3543,7 @@ void consumer_add_relayd_socket(uint64_t net_seq_idx, { int fd = -1, ret = -1, relayd_created = 0; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; - struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_sock_pair *relayd = nullptr; LTTNG_ASSERT(ctx); LTTNG_ASSERT(sock >= 0); @@ -3580,11 +3553,11 @@ void consumer_add_relayd_socket(uint64_t net_seq_idx, /* Get relayd reference if exists. */ relayd = consumer_find_relayd(net_seq_idx); - if (relayd == NULL) { + if (relayd == nullptr) { LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL); /* Not found. Allocate one. */ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); - if (relayd == NULL) { + if (relayd == nullptr) { ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; } else { @@ -3727,7 +3700,7 @@ error_nosignal: static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) { struct lttng_ht_iter iter; - struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_sock_pair *relayd = nullptr; ASSERT_RCU_READ_LOCKED(); @@ -3743,7 +3716,7 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) } } - return NULL; + return nullptr; found: return relayd; @@ -3761,12 +3734,12 @@ int consumer_data_pending(uint64_t id) struct lttng_ht_iter iter; struct lttng_ht *ht; struct lttng_consumer_stream *stream; - struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_sock_pair *relayd = nullptr; int (*data_pending)(struct lttng_consumer_stream *); DBG("Consumer data pending command on session id %" PRIu64, id); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&the_consumer_data.lock); switch (the_consumer_data.type) { @@ -3880,13 +3853,11 @@ int consumer_data_pending(uint64_t id) data_not_pending: /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); return 0; data_pending: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); return 1; } @@ -3998,7 +3969,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, uint64_t next_chunk_id, stream_count = 0; enum lttng_trace_chunk_status chunk_status; const bool is_local_trace = relayd_id == -1ULL; - struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_sock_pair *relayd = nullptr; bool rotating_to_new_chunk = true; /* Array of `struct lttng_consumer_stream *` */ struct lttng_dynamic_pointer_array streams_packet_to_open; @@ -4008,11 +3979,12 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, DBG("Consumer sample rotate position for channel %" PRIu64, key); - lttng_dynamic_array_init( - &stream_rotation_positions, sizeof(struct relayd_stream_rotation_position), NULL); - lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL); + lttng_dynamic_array_init(&stream_rotation_positions, + sizeof(struct relayd_stream_rotation_position), + nullptr); + lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&channel->lock); LTTNG_ASSERT(channel->trace_chunk); @@ -4107,7 +4079,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, chunk_status = lttng_trace_chunk_get_name( stream->trace_chunk, &trace_chunk_name, - NULL); + nullptr); if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) { trace_chunk_name = "none"; } @@ -4290,7 +4262,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, pthread_mutex_unlock(&stream->lock); } - stream = NULL; + stream = nullptr; if (!is_local_trace) { relayd = consumer_find_relayd(relayd_id); @@ -4303,7 +4275,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_rotate_streams(&relayd->control_sock, stream_count, - rotating_to_new_chunk ? &next_chunk_id : NULL, + rotating_to_new_chunk ? &next_chunk_id : nullptr, (const struct relayd_stream_rotation_position *) stream_rotation_positions.buffer.data); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); @@ -4364,7 +4336,6 @@ end_unlock_stream: end_unlock_channel: pthread_mutex_unlock(&channel->lock); end: - rcu_read_unlock(); lttng_dynamic_array_reset(&stream_rotation_positions); lttng_dynamic_pointer_array_reset(&streams_packet_to_open); return ret; @@ -4427,7 +4398,7 @@ static int consumer_clear_stream(struct lttng_consumer_stream *stream) { int ret; - ret = consumer_stream_flush_buffer(stream, 1); + ret = consumer_stream_flush_buffer(stream, true); if (ret < 0) { ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key); ret = LTTCOMM_CONSUMERD_FATAL; @@ -4451,7 +4422,7 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha int ret; struct lttng_consumer_stream *stream; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&channel->lock); cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); @@ -4463,13 +4434,11 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha pthread_mutex_unlock(&stream->lock); } pthread_mutex_unlock(&channel->lock); - rcu_read_unlock(); return 0; error_unlock: pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&channel->lock); - rcu_read_unlock(); return ret; } @@ -4561,7 +4530,7 @@ static int rotate_local_stream(struct lttng_consumer_stream *stream) if (stream->index_file) { lttng_index_file_put(stream->index_file); - stream->index_file = NULL; + stream->index_file = nullptr; } if (!stream->trace_chunk) { @@ -4599,7 +4568,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream) * parent channel, becomes part of no chunk and can't output * anything until a new trace chunk is created. */ - stream->trace_chunk = NULL; + stream->trace_chunk = nullptr; } else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) { ERR("Failed to acquire a reference to channel's trace chunk during stream rotation"); ret = -1; @@ -4665,7 +4634,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, ASSERT_RCU_READ_LOCKED(); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; DBG("Consumer rotate ready streams in channel %" PRIu64, key); @@ -4700,7 +4669,6 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, ret = 0; end: - rcu_read_unlock(); return ret; } @@ -4735,7 +4703,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, { int ret; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; - struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL; + struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr; enum lttng_trace_chunk_status chunk_status; char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; char creation_timestamp_buffer[ISO8601_STR_LEN]; @@ -4784,7 +4752,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands. */ - created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, NULL); + created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr); if (!created_chunk) { ERR("Failed to create trace chunk"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; @@ -4811,7 +4779,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, * directory. */ chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle); - chunk_directory_handle = NULL; + chunk_directory_handle = nullptr; if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to set trace chunk's directory handle"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; @@ -4822,53 +4790,57 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, published_chunk = lttng_trace_chunk_registry_publish_chunk( the_consumer_data.chunk_registry, session_id, created_chunk); lttng_trace_chunk_put(created_chunk); - created_chunk = NULL; + created_chunk = nullptr; if (!published_chunk) { ERR("Failed to publish trace chunk"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; goto error; } - rcu_read_lock(); - cds_lfht_for_each_entry_duplicate( - the_consumer_data.channels_by_session_id_ht->ht, - the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed), - the_consumer_data.channels_by_session_id_ht->match_fct, - &session_id, - &iter.iter, - channel, - channels_by_session_id_ht_node.node) { - ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); - if (ret) { - /* - * Roll-back the creation of this chunk. - * - * This is important since the session daemon will - * assume that the creation of this chunk failed and - * will never ask for it to be closed, resulting - * in a leak and an inconsistent state for some - * channels. - */ - enum lttcomm_return_code close_ret; - char path[LTTNG_PATH_MAX]; + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry_duplicate( + the_consumer_data.channels_by_session_id_ht->ht, + the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, + lttng_ht_seed), + the_consumer_data.channels_by_session_id_ht->match_fct, + &session_id, + &iter.iter, + channel, + channels_by_session_id_ht_node.node) + { + ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); + if (ret) { + /* + * Roll-back the creation of this chunk. + * + * This is important since the session daemon will + * assume that the creation of this chunk failed and + * will never ask for it to be closed, resulting + * in a leak and an inconsistent state for some + * channels. + */ + enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; + + DBG("Failed to set new trace chunk on existing channels, rolling back"); + close_ret = + lttng_consumer_close_trace_chunk(relayd_id, + session_id, + chunk_id, + chunk_creation_timestamp, + nullptr, + path); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 + ", chunk_id = %" PRIu64, + session_id, + chunk_id); + } - DBG("Failed to set new trace chunk on existing channels, rolling back"); - close_ret = lttng_consumer_close_trace_chunk(relayd_id, - session_id, - chunk_id, - chunk_creation_timestamp, - NULL, - path); - if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { - ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 - ", chunk_id = %" PRIu64, - session_id, - chunk_id); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + break; } - - ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - break; } } @@ -4892,7 +4864,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, session_id, chunk_id, chunk_creation_timestamp, - NULL, + nullptr, path); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 @@ -4906,7 +4878,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, } } error_unlock: - rcu_read_unlock(); error: /* Release the reference returned by the "publish" operation. */ lttng_trace_chunk_put(published_chunk); @@ -4982,30 +4953,32 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, * it; it is only kept around to compare it (by address) to the * current chunk found in the session's channels. */ - rcu_read_lock(); - cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - int ret; + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry ( + the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { + int ret; - /* - * Only change the channel's chunk to NULL if it still - * references the chunk being closed. The channel may - * reference a newer channel in the case of a session - * rotation. When a session rotation occurs, the "next" - * chunk is created before the "current" chunk is closed. - */ - if (channel->trace_chunk != chunk) { - continue; - } - ret = lttng_consumer_channel_set_trace_chunk(channel, NULL); - if (ret) { /* - * Attempt to close the chunk on as many channels as - * possible. + * Only change the channel's chunk to NULL if it still + * references the chunk being closed. The channel may + * reference a newer channel in the case of a session + * rotation. When a session rotation occurs, the "next" + * chunk is created before the "current" chunk is closed. */ - ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + if (channel->trace_chunk != chunk) { + continue; + } + ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); + if (ret) { + /* + * Attempt to close the chunk on as many channels as + * possible. + */ + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + } } } - if (relayd_id) { int ret; struct consumer_relayd_sock_pair *relayd; @@ -5025,7 +4998,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, } } error_unlock: - rcu_read_unlock(); end: /* * Release the reference returned by the "find" operation and @@ -5045,8 +5017,9 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; const char *relayd_id_str = "(none)"; const bool is_local_trace = !relayd_id; - struct consumer_relayd_sock_pair *relayd = NULL; + struct consumer_relayd_sock_pair *relayd = nullptr; bool chunk_exists_local, chunk_exists_remote; + lttng::urcu::read_lock_guard read_lock; if (relayd_id) { /* Only used for logging purposes. */ @@ -5079,7 +5052,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id goto end; } - rcu_read_lock(); relayd = consumer_find_relayd(*relayd_id); if (!relayd) { ERR("Failed to find relayd %" PRIu64, *relayd_id); @@ -5101,7 +5073,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist"); end_rcu_unlock: - rcu_read_unlock(); end: return ret_code; } @@ -5115,7 +5086,7 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann ht = the_consumer_data.stream_per_chan_id_ht; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, @@ -5138,12 +5109,10 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann next: pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); return LTTCOMM_CONSUMERD_SUCCESS; error_unlock: pthread_mutex_unlock(&stream->lock); - rcu_read_unlock(); return ret; } @@ -5184,56 +5153,56 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum goto end; } - rcu_read_lock(); - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { - enum consumer_stream_open_packet_status status; + { + lttng::urcu::read_lock_guard read_lock; + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + enum consumer_stream_open_packet_status status; - pthread_mutex_lock(&stream->lock); - if (cds_lfht_is_node_deleted(&stream->node.node)) { - goto next; - } + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } - status = consumer_stream_open_packet(stream); - switch (status) { - case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: - DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = true; - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: - DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: - /* - * Only unexpected internal errors can lead to this - * failing. Report an unknown error. - */ - ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 - ", channel id = %" PRIu64 ", channel name = %s" - ", session id = %" PRIu64, - stream->key, - channel->key, - channel->name, - channel->session_id); - ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; - goto error_unlock; - default: - abort(); - } + status = consumer_stream_open_packet(stream); + switch (status) { + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: + DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: + /* + * Only unexpected internal errors can lead to this + * failing. Report an unknown error. + */ + ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 + ", channel id = %" PRIu64 ", channel name = %s" + ", session id = %" PRIu64, + stream->key, + channel->key, + channel->name, + channel->session_id); + ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; + goto error_unlock; + default: + abort(); + } - next: - pthread_mutex_unlock(&stream->lock); + next: + pthread_mutex_unlock(&stream->lock); + } } - end_rcu_unlock: - rcu_read_unlock(); end: return ret;