#include <common/dynamic-array.hpp>
#include <common/index/ctf-index.hpp>
#include <common/index/index.hpp>
+#include <common/io-hint.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
#include <common/relayd/relayd.hpp>
#include <common/utils.hpp>
#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
}
channel->monitor = monitor;
channel->live_timer_interval = live_timer_interval;
channel->is_live = is_in_live_session;
- pthread_mutex_init(&channel->lock, nullptr);
- pthread_mutex_init(&channel->timer_lock, nullptr);
+ pthread_mutex_init(&channel->lock, NULL);
+ pthread_mutex_init(&channel->timer_lock, NULL);
+ lttng_wait_queue_init(&channel->metadata_pushed_wait_queue);
switch (output) {
case LTTNG_EVENT_SPLICE:
*/
static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
- int ret;
int outfd = stream->out_fd;
/*
if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd,
- orig_offset - stream->max_sb_size,
- stream->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
- SYNC_FILE_RANGE_WAIT_AFTER);
- /*
- * Give hints to the kernel about how we access the file:
- * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
- * we write it.
- *
- * We need to call fadvise again after the file grows because the
- * kernel does not seem to apply fadvise to non-existing parts of the
- * file.
- *
- * Call fadvise _after_ having waited for the page writeback to
- * complete because the dirty page writeback semantic is not well
- * defined. So it can be expected to lead to lower throughput in
- * streaming.
- */
- ret = posix_fadvise(
- outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
- if (ret && ret != -ENOSYS) {
- errno = ret;
- PERROR("posix_fadvise on fd %i", outfd);
- }
+ lttng::io::hint_flush_range_dont_need_sync(
+ outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
}
/*
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
stream->out_fd_offset += write_len;
lttng_consumer_sync_trace_file(stream, orig_offset);
}
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
stream->out_fd_offset += ret_splice;
}
stream->output_written += ret_splice;
* pointer value.
*/
channel->metadata_stream = nullptr;
+ lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);
*/
void *consumer_thread_data_poll(void *data)
{
- int num_rdy, num_hup, high_prio, ret, i, err = -1;
+ int num_rdy, high_prio, ret, i, err = -1;
struct pollfd *pollfd = nullptr;
/* local view of the streams */
struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
health_code_update();
high_prio = 0;
- num_hup = 0;
/*
* the fds set has been updated, we need to update our
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = nullptr;
- num_hup++;
}
}
if (local_stream[i] != nullptr) {