X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=a9de1696e4ce862d52bf58b5c6ab44f73ad76b28;hp=eb7e6375eed4577736fb303a098ab37a6d7c3ece;hb=HEAD;hpb=cd9adb8b829564212158943a0d279bb35322ab30 diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index eb7e6375e..1da243601 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -29,10 +30,12 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -42,6 +45,7 @@ #include #include #include +#include #include lttng_consumer_global_data the_consumer_data; @@ -104,7 +108,8 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe) LTTNG_ASSERT(pipe); - (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); + (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a + pointer. */ } static void notify_health_quit_pipe(int *pipe) @@ -173,13 +178,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel) /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { - /* - * Once a stream is added to this list, the buffers were created so we - * have a guarantee that this call will succeed. Setting the monitor - * mode to 0 so we don't lock nor try to delete the stream from the - * global hash table. - */ - stream->monitor = 0; consumer_stream_destroy(stream, nullptr); } } @@ -201,7 +199,7 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht * return nullptr; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_lookup(ht, &key, &iter); node = lttng_ht_iter_get_node_u64(&iter); @@ -209,8 +207,6 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht * stream = lttng::utils::container_of(node, <tng_consumer_stream::node); } - rcu_read_unlock(); - return stream; } @@ -218,7 +214,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; stream = find_stream(key, ht); if (stream) { stream->key = (uint64_t) -1ULL; @@ -229,7 +225,6 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht) */ stream->node.key = (uint64_t) -1ULL; } - rcu_read_unlock(); } /* @@ -272,7 +267,7 @@ static void steal_channel_key(uint64_t key) { struct lttng_consumer_channel *channel; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(key); if (channel) { channel->key = (uint64_t) -1ULL; @@ -283,7 +278,6 @@ static void steal_channel_key(uint64_t key) */ channel->node.key = (uint64_t) -1ULL; } - rcu_read_unlock(); } static void free_channel_rcu(struct rcu_head *head) @@ -303,7 +297,8 @@ static void free_channel_rcu(struct rcu_head *head) ERR("Unknown consumer_data type"); abort(); } - free(channel); + + delete channel; } /* @@ -403,7 +398,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->is_published) { int ret; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; iter.iter.node = &channel->node.node; ret = lttng_ht_del(the_consumer_data.channel_ht, &iter); LTTNG_ASSERT(!ret); @@ -411,7 +406,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) iter.iter.node = &channel->channels_by_session_id_ht_node.node; ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter); LTTNG_ASSERT(!ret); - rcu_read_unlock(); } channel->is_deleted = true; @@ -430,14 +424,15 @@ static void cleanup_relayd_ht() struct lttng_ht_iter iter; struct consumer_relayd_sock_pair *relayd; - rcu_read_lock(); + { + lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { - consumer_destroy_relayd(relayd); + cds_lfht_for_each_entry ( + the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { + consumer_destroy_relayd(relayd); + } } - rcu_read_unlock(); - lttng_ht_destroy(the_consumer_data.relayd_ht); } @@ -456,12 +451,14 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Let's begin with metadata */ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); + stream->chan->metadata_pushed_wait_queue.wake_all(); + DBG("Delete flag set to metadata stream %d", stream->wait_fd); } } @@ -473,7 +470,6 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, DBG("Delete flag set to data stream %d", stream->wait_fd); } } - rcu_read_unlock(); } /* @@ -580,7 +576,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Steal stream identifier to avoid having streams with the same key */ steal_stream_key(stream->key, ht); @@ -613,7 +609,6 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) the_consumer_data.stream_count++; the_consumer_data.need_update = 1; - rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->timer_lock); pthread_mutex_unlock(&stream->chan->lock); @@ -719,7 +714,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path LTTNG_ASSERT(path); /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != nullptr) { /* Add stream on the relayd */ @@ -756,7 +751,6 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path stream->net_seq_idx); end: - rcu_read_unlock(); return ret; } @@ -773,7 +767,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) LTTNG_ASSERT(net_seq_idx != -1ULL); /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(net_seq_idx); if (relayd != nullptr) { /* Add stream on the relayd */ @@ -796,7 +790,6 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) DBG("All streams sent relayd id %" PRIu64, net_seq_idx); end: - rcu_read_unlock(); return ret; } @@ -808,12 +801,11 @@ void close_relayd_stream(struct lttng_consumer_stream *stream) struct consumer_relayd_sock_pair *relayd; /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd) { consumer_stream_relayd_close(stream, relayd); } - rcu_read_unlock(); } /* @@ -1022,9 +1014,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } } - channel = zmalloc(); - if (channel == nullptr) { - PERROR("malloc struct lttng_consumer_channel"); + try { + channel = new lttng_consumer_channel; + } catch (const std::bad_alloc& e) { + ERR("Failed to allocate lttng_consumer_channel: %s", e.what()); + channel = nullptr; goto end; } @@ -1038,8 +1032,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->monitor = monitor; channel->live_timer_interval = live_timer_interval; channel->is_live = is_in_live_session; - pthread_mutex_init(&channel->lock, nullptr); - pthread_mutex_init(&channel->timer_lock, nullptr); + pthread_mutex_init(&channel->lock, NULL); + pthread_mutex_init(&channel->timer_lock, NULL); switch (output) { case LTTNG_EVENT_SPLICE: @@ -1050,7 +1044,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, break; default: abort(); - free(channel); + delete channel; channel = nullptr; goto end; } @@ -1126,11 +1120,10 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ steal_channel_key(channel->key); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node); lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht, &channel->channels_by_session_id_ht_node); - rcu_read_unlock(); channel->is_published = true; pthread_mutex_unlock(&channel->timer_lock); @@ -1168,35 +1161,34 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, DBG("Updating poll fd array"); *nb_inactive_fd = 0; - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Only active streams with an active end point can be added to the - * poll set and local stream storage of the thread. - * - * There is a potential race here for endpoint_status to be updated - * just after the check. However, this is OK since the stream(s) will - * be deleted once the thread is notified that the end point state has - * changed where this function will be called back again. - * - * We track the number of inactive FDs because they still need to be - * closed by the polling thread after a wakeup on the data_pipe or - * metadata_pipe. - */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { - (*nb_inactive_fd)++; - continue; + + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Only active streams with an active end point can be added to the + * poll set and local stream storage of the thread. + * + * There is a potential race here for endpoint_status to be updated + * just after the check. However, this is OK since the stream(s) will + * be deleted once the thread is notified that the end point state has + * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. + */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; + continue; + } + + (*pollfd)[i].fd = stream->wait_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; + local_stream[i] = stream; + i++; } - /* - * This clobbers way too much the debug output. Uncomment that if you - * need it for debugging purposes. - */ - (*pollfd)[i].fd = stream->wait_fd; - (*pollfd)[i].events = POLLIN | POLLPRI; - local_stream[i] = stream; - i++; } - rcu_read_unlock(); /* * Insert the consumer_data_pipe at the end of the array and don't @@ -1257,11 +1249,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, * Send return code to the session daemon. * If the socket is not defined, we return 0, it is not a fatal error */ -int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd) +int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, + enum lttcomm_return_code error_code) { if (ctx->consumer_error_socket > 0) { + const std::int32_t comm_code = std::int32_t(error_code); + + static_assert( + sizeof(comm_code) >= sizeof(std::underlying_type), + "Fixed-size communication type too small to accomodate lttcomm_return_code"); return lttcomm_send_unix_sock( - ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command)); + ctx->consumer_error_socket, &comm_code, sizeof(comm_code)); } return 0; @@ -1277,14 +1275,15 @@ void lttng_consumer_cleanup() struct lttng_consumer_channel *channel; unsigned int trace_chunks_left; - rcu_read_lock(); + { + lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - consumer_del_channel(channel); + cds_lfht_for_each_entry ( + the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { + consumer_del_channel(channel); + } } - rcu_read_unlock(); - lttng_ht_destroy(the_consumer_data.channel_ht); lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht); @@ -1344,7 +1343,6 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) */ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset) { - int ret; int outfd = stream->out_fd; /* @@ -1356,31 +1354,8 @@ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, if (orig_offset < stream->max_sb_size) { return; } - lttng_sync_file_range(outfd, - orig_offset - stream->max_sb_size, - stream->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | - SYNC_FILE_RANGE_WAIT_AFTER); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - ret = posix_fadvise( - outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED); - if (ret && ret != -ENOSYS) { - errno = ret; - PERROR("posix_fadvise on fd %i", outfd); - } + lttng::io::hint_flush_range_dont_need_sync( + outfd, orig_offset - stream->max_sb_size, stream->max_sb_size); } /* @@ -1485,15 +1460,16 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_stream(stream, ht); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -1511,15 +1487,16 @@ static void destroy_metadata_stream_ht(struct lttng_ht *ht) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_metadata_stream(stream, ht); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -1621,7 +1598,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre size_t write_len; /* RCU lock for the relayd pointer */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk); /* Flag that the current stream if set for network streaming. */ @@ -1739,8 +1716,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range( - outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE); + lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len); stream->out_fd_offset += write_len; lttng_consumer_sync_trace_file(stream, orig_offset); } @@ -1761,7 +1737,6 @@ end: pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - rcu_read_unlock(); return ret; } @@ -1800,7 +1775,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data } /* RCU lock for the relayd pointer */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != (uint64_t) -1ULL) { @@ -1940,8 +1915,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range( - outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE); + lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice); stream->out_fd_offset += ret_splice; } stream->output_written += ret_splice; @@ -1983,7 +1957,6 @@ end: pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } - rcu_read_unlock(); return written; } @@ -2162,6 +2135,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l * pointer value. */ channel->metadata_stream = nullptr; + channel->metadata_pushed_wait_queue.wake_all(); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock); @@ -2204,7 +2178,7 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) * after this point. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; /* * Lookup the stream just to make sure it does not exist in our internal @@ -2238,8 +2212,6 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) */ lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id); - rcu_read_unlock(); - pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&stream->chan->timer_lock); @@ -2256,16 +2228,18 @@ static void validate_endpoint_status_data_stream() DBG("Consumer delete flagged data stream"); - rcu_read_lock(); - cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; + } + /* Delete it right now */ + consumer_del_stream(stream, data_ht); } - /* Delete it right now */ - consumer_del_stream(stream, data_ht); } - rcu_read_unlock(); } /* @@ -2280,22 +2254,23 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po LTTNG_ASSERT(pollset); - rcu_read_lock(); - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; - } - /* - * Remove from pollset so the metadata thread can continue without - * blocking on a deleted stream. - */ - lttng_poll_del(pollset, stream->wait_fd); + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; + } + /* + * Remove from pollset so the metadata thread can continue without + * blocking on a deleted stream. + */ + lttng_poll_del(pollset, stream->wait_fd); - /* Delete it right now */ - consumer_del_metadata_stream(stream, metadata_ht); + /* Delete it right now */ + consumer_del_metadata_stream(stream, metadata_ht); + } } - rcu_read_unlock(); } /* @@ -2375,8 +2350,11 @@ void *consumer_thread_metadata_poll(void *data) pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, - sizeof(stream)); - if (pipe_len < sizeof(stream)) { + sizeof(stream)); /* NOLINT sizeof + used on a + pointer. */ + if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a + pointer. */ if (pipe_len < 0) { PERROR("read metadata stream"); } @@ -2427,7 +2405,7 @@ void *consumer_thread_metadata_poll(void *data) continue; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; @@ -2491,11 +2469,9 @@ void *consumer_thread_metadata_poll(void *data) consumer_del_metadata_stream(stream, metadata_ht); } else { ERR("Unexpected poll events %u for sock %d", revents, pollfd); - rcu_read_unlock(); goto end; } /* Release RCU lock for the stream looked up */ - rcu_read_unlock(); } } @@ -2522,7 +2498,7 @@ error_testpoint: */ void *consumer_thread_data_poll(void *data) { - int num_rdy, num_hup, high_prio, ret, i, err = -1; + int num_rdy, high_prio, ret, i, err = -1; struct pollfd *pollfd = nullptr; /* local view of the streams */ struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr; @@ -2555,7 +2531,6 @@ void *consumer_thread_data_poll(void *data) health_code_update(); high_prio = 0; - num_hup = 0; /* * the fds set has been updated, we need to update our @@ -2643,9 +2618,12 @@ void *consumer_thread_data_poll(void *data) ssize_t pipe_readlen; DBG("consumer_data_pipe wake up"); - pipe_readlen = lttng_pipe_read( - ctx->consumer_data_pipe, &new_stream, sizeof(new_stream)); - if (pipe_readlen < sizeof(new_stream)) { + pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, + &new_stream, + sizeof(new_stream)); /* NOLINT sizeof used on + a pointer. */ + if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer. + */ PERROR("Consumer data pipe"); /* Continue so we can at least handle the current stream(s). */ continue; @@ -2768,21 +2746,18 @@ void *consumer_thread_data_poll(void *data) if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = nullptr; - num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = nullptr; - num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = nullptr; - num_hup++; } } if (local_stream[i] != nullptr) { @@ -2831,7 +2806,7 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe ht = the_consumer_data.stream_per_chan_id_ht; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, @@ -2871,7 +2846,6 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe next: pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); } static void destroy_channel_ht(struct lttng_ht *ht) @@ -2884,12 +2858,14 @@ static void destroy_channel_ht(struct lttng_ht *ht) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { - ret = lttng_ht_del(ht, &iter); - LTTNG_ASSERT(ret != 0); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { + ret = lttng_ht_del(ht, &iter); + LTTNG_ASSERT(ret != 0); + } } - rcu_read_unlock(); lttng_ht_destroy(ht); } @@ -2991,19 +2967,20 @@ void *consumer_thread_channel_poll(void *data) switch (action) { case CONSUMER_CHANNEL_ADD: + { DBG("Adding channel %d to poll set", chan->wait_fd); lttng_ht_node_init_u64(&chan->wait_fd_node, chan->wait_fd); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(channel_ht, &chan->wait_fd_node); - rcu_read_unlock(); /* Add channel to the global poll events list */ // FIXME: Empty flag on a pipe pollset, this might // hang on FreeBSD. lttng_poll_add(&events, chan->wait_fd, 0); break; + } case CONSUMER_CHANNEL_DEL: { /* @@ -3016,10 +2993,9 @@ void *consumer_thread_channel_poll(void *data) * GET_CHANNEL failed. */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; chan = consumer_find_channel(key); if (!chan) { - rcu_read_unlock(); ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key); @@ -3052,7 +3028,6 @@ void *consumer_thread_channel_poll(void *data) if (!uatomic_sub_return(&chan->refcount, 1)) { consumer_del_channel(chan); } - rcu_read_unlock(); goto restart; } case CONSUMER_CHANNEL_QUIT: @@ -3086,7 +3061,7 @@ void *consumer_thread_channel_poll(void *data) continue; } - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; @@ -3119,12 +3094,10 @@ void *consumer_thread_channel_poll(void *data) } } else { ERR("Unexpected poll events %u for sock %d", revents, pollfd); - rcu_read_unlock(); goto end; } /* Release RCU lock for the channel looked up */ - rcu_read_unlock(); } } @@ -3766,7 +3739,7 @@ int consumer_data_pending(uint64_t id) DBG("Consumer data pending command on session id %" PRIu64, id); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&the_consumer_data.lock); switch (the_consumer_data.type) { @@ -3880,13 +3853,11 @@ int consumer_data_pending(uint64_t id) data_not_pending: /* Data is available to be read by a viewer. */ pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); return 0; data_pending: /* Data is still being extracted from buffers. */ pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); return 1; } @@ -4013,7 +3984,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, nullptr); lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&channel->lock); LTTNG_ASSERT(channel->trace_chunk); @@ -4365,7 +4336,6 @@ end_unlock_stream: end_unlock_channel: pthread_mutex_unlock(&channel->lock); end: - rcu_read_unlock(); lttng_dynamic_array_reset(&stream_rotation_positions); lttng_dynamic_pointer_array_reset(&streams_packet_to_open); return ret; @@ -4452,7 +4422,7 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha int ret; struct lttng_consumer_stream *stream; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&channel->lock); cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); @@ -4464,13 +4434,11 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha pthread_mutex_unlock(&stream->lock); } pthread_mutex_unlock(&channel->lock); - rcu_read_unlock(); return 0; error_unlock: pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&channel->lock); - rcu_read_unlock(); return ret; } @@ -4666,7 +4634,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, ASSERT_RCU_READ_LOCKED(); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; DBG("Consumer rotate ready streams in channel %" PRIu64, key); @@ -4701,7 +4669,6 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, ret = 0; end: - rcu_read_unlock(); return ret; } @@ -4830,46 +4797,50 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, goto error; } - rcu_read_lock(); - cds_lfht_for_each_entry_duplicate( - the_consumer_data.channels_by_session_id_ht->ht, - the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed), - the_consumer_data.channels_by_session_id_ht->match_fct, - &session_id, - &iter.iter, - channel, - channels_by_session_id_ht_node.node) { - ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); - if (ret) { - /* - * Roll-back the creation of this chunk. - * - * This is important since the session daemon will - * assume that the creation of this chunk failed and - * will never ask for it to be closed, resulting - * in a leak and an inconsistent state for some - * channels. - */ - enum lttcomm_return_code close_ret; - char path[LTTNG_PATH_MAX]; + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry_duplicate( + the_consumer_data.channels_by_session_id_ht->ht, + the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, + lttng_ht_seed), + the_consumer_data.channels_by_session_id_ht->match_fct, + &session_id, + &iter.iter, + channel, + channels_by_session_id_ht_node.node) + { + ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); + if (ret) { + /* + * Roll-back the creation of this chunk. + * + * This is important since the session daemon will + * assume that the creation of this chunk failed and + * will never ask for it to be closed, resulting + * in a leak and an inconsistent state for some + * channels. + */ + enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; + + DBG("Failed to set new trace chunk on existing channels, rolling back"); + close_ret = + lttng_consumer_close_trace_chunk(relayd_id, + session_id, + chunk_id, + chunk_creation_timestamp, + nullptr, + path); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 + ", chunk_id = %" PRIu64, + session_id, + chunk_id); + } - DBG("Failed to set new trace chunk on existing channels, rolling back"); - close_ret = lttng_consumer_close_trace_chunk(relayd_id, - session_id, - chunk_id, - chunk_creation_timestamp, - nullptr, - path); - if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { - ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 - ", chunk_id = %" PRIu64, - session_id, - chunk_id); + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + break; } - - ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - break; } } @@ -4907,7 +4878,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, } } error_unlock: - rcu_read_unlock(); error: /* Release the reference returned by the "publish" operation. */ lttng_trace_chunk_put(published_chunk); @@ -4983,30 +4953,32 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, * it; it is only kept around to compare it (by address) to the * current chunk found in the session's channels. */ - rcu_read_lock(); - cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - int ret; + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry ( + the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { + int ret; - /* - * Only change the channel's chunk to NULL if it still - * references the chunk being closed. The channel may - * reference a newer channel in the case of a session - * rotation. When a session rotation occurs, the "next" - * chunk is created before the "current" chunk is closed. - */ - if (channel->trace_chunk != chunk) { - continue; - } - ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); - if (ret) { /* - * Attempt to close the chunk on as many channels as - * possible. + * Only change the channel's chunk to NULL if it still + * references the chunk being closed. The channel may + * reference a newer channel in the case of a session + * rotation. When a session rotation occurs, the "next" + * chunk is created before the "current" chunk is closed. */ - ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + if (channel->trace_chunk != chunk) { + continue; + } + ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); + if (ret) { + /* + * Attempt to close the chunk on as many channels as + * possible. + */ + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; + } } } - if (relayd_id) { int ret; struct consumer_relayd_sock_pair *relayd; @@ -5026,7 +4998,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, } } error_unlock: - rcu_read_unlock(); end: /* * Release the reference returned by the "find" operation and @@ -5048,6 +5019,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id const bool is_local_trace = !relayd_id; struct consumer_relayd_sock_pair *relayd = nullptr; bool chunk_exists_local, chunk_exists_remote; + lttng::urcu::read_lock_guard read_lock; if (relayd_id) { /* Only used for logging purposes. */ @@ -5080,7 +5052,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id goto end; } - rcu_read_lock(); relayd = consumer_find_relayd(*relayd_id); if (!relayd) { ERR("Failed to find relayd %" PRIu64, *relayd_id); @@ -5102,7 +5073,6 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist"); end_rcu_unlock: - rcu_read_unlock(); end: return ret_code; } @@ -5116,7 +5086,7 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann ht = the_consumer_data.stream_per_chan_id_ht; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, @@ -5139,12 +5109,10 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann next: pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); return LTTCOMM_CONSUMERD_SUCCESS; error_unlock: pthread_mutex_unlock(&stream->lock); - rcu_read_unlock(); return ret; } @@ -5185,56 +5153,56 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum goto end; } - rcu_read_lock(); - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { - enum consumer_stream_open_packet_status status; + { + lttng::urcu::read_lock_guard read_lock; + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + enum consumer_stream_open_packet_status status; - pthread_mutex_lock(&stream->lock); - if (cds_lfht_is_node_deleted(&stream->node.node)) { - goto next; - } + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } - status = consumer_stream_open_packet(stream); - switch (status) { - case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: - DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = true; - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: - DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: - /* - * Only unexpected internal errors can lead to this - * failing. Report an unknown error. - */ - ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 - ", channel id = %" PRIu64 ", channel name = %s" - ", session id = %" PRIu64, - stream->key, - channel->key, - channel->name, - channel->session_id); - ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; - goto error_unlock; - default: - abort(); - } + status = consumer_stream_open_packet(stream); + switch (status) { + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: + DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: + /* + * Only unexpected internal errors can lead to this + * failing. Report an unknown error. + */ + ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 + ", channel id = %" PRIu64 ", channel name = %s" + ", session id = %" PRIu64, + stream->key, + channel->key, + channel->name, + channel->session_id); + ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; + goto error_unlock; + default: + abort(); + } - next: - pthread_mutex_unlock(&stream->lock); + next: + pthread_mutex_unlock(&stream->lock); + } } - end_rcu_unlock: - rcu_read_unlock(); end: return ret;