X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=01845871b91a2a2423e4375e983de1cac6243b7f;hb=671e39d79a1ad9c3f13c4784a26710a5b1f14237;hp=27b34a39f249071867e49e9df7fb438517bb6ee8;hpb=56047f5a23df5c2c583a102b8015bbec5a7da9f1;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 27b34a39f..01845871b 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +35,7 @@ #include #include +#include #include #include #include @@ -1336,7 +1338,6 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) */ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset) { - int ret; int outfd = stream->out_fd; /* @@ -1348,31 +1349,8 @@ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, if (orig_offset < stream->max_sb_size) { return; } - lttng_sync_file_range(outfd, - orig_offset - stream->max_sb_size, - stream->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | - SYNC_FILE_RANGE_WAIT_AFTER); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - ret = posix_fadvise( - outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED); - if (ret && ret != -ENOSYS) { - errno = ret; - PERROR("posix_fadvise on fd %i", outfd); - } + lttng::io::hint_flush_range_dont_need_sync( + outfd, orig_offset - stream->max_sb_size, stream->max_sb_size); } /* @@ -1733,8 +1711,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range( - outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE); + lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len); stream->out_fd_offset += write_len; lttng_consumer_sync_trace_file(stream, orig_offset); } @@ -1933,8 +1910,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range( - outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE); + lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice); stream->out_fd_offset += ret_splice; } stream->output_written += ret_splice; @@ -2516,7 +2492,7 @@ error_testpoint: */ void *consumer_thread_data_poll(void *data) { - int num_rdy, num_hup, high_prio, ret, i, err = -1; + int num_rdy, high_prio, ret, i, err = -1; struct pollfd *pollfd = nullptr; /* local view of the streams */ struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr; @@ -2549,7 +2525,6 @@ void *consumer_thread_data_poll(void *data) health_code_update(); high_prio = 0; - num_hup = 0; /* * the fds set has been updated, we need to update our @@ -2765,21 +2740,18 @@ void *consumer_thread_data_poll(void *data) if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = nullptr; - num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = nullptr; - num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->has_data_left_to_be_read_before_teardown) { consumer_del_stream(local_stream[i], data_ht); local_stream[i] = nullptr; - num_hup++; } } if (local_stream[i] != nullptr) {