X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=8c2bee33363614cbf58a4e5082aad84d9b8140ad;hp=22bf1002097a1e33a846f2a5c04d6305f6308d69;hb=f02e1e8a5820da2eda835add020f92ca8d32c973;hpb=173af62f4804133d4a7f45e34b6f72126f3eca5f diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 22bf10020..8c2bee333 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -41,285 +41,6 @@ extern struct lttng_consumer_global_data consumer_data; extern int consumer_poll_timeout; extern volatile int consumer_quit; -/* - * Mmap the ring buffer, read it and write the data to the tracefile. - * - * Returns the number of bytes written - */ -ssize_t lttng_kconsumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) -{ - unsigned long mmap_offset; - ssize_t ret = 0, written = 0; - off_t orig_offset = stream->out_fd_offset; - int fd = stream->wait_fd; - /* Default is on the disk */ - int outfd = stream->out_fd; - uint64_t metadata_id; - struct consumer_relayd_sock_pair *relayd = NULL; - - /* RCU lock for the relayd pointer */ - rcu_read_lock(); - - /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != -1) { - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd == NULL) { - goto end; - } - } - - /* get the offset inside the fd to mmap */ - ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); - if (ret != 0) { - errno = -ret; - perror("kernctl_get_mmap_read_offset"); - written = ret; - goto end; - } - - /* Handle stream on the relayd if the output is on the network */ - if (relayd) { - unsigned long netlen = len; - - /* - * Lock the control socket for the complete duration of the function - * since from this point on we will use the socket. - */ - if (stream->metadata_flag) { - /* Metadata requires the control socket. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - netlen += sizeof(stream->relayd_stream_id); - } - - ret = consumer_handle_stream_before_relayd(stream, netlen); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; - - /* Write metadata stream id before payload */ - if (stream->metadata_flag) { - metadata_id = htobe64(stream->relayd_stream_id); - do { - ret = write(outfd, (void *) &metadata_id, - sizeof(stream->relayd_stream_id)); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata stream id"); - written = ret; - goto end; - } - DBG("Metadata stream id %zu written before data", - stream->relayd_stream_id); - /* - * We do this so the return value can match the len passed as - * argument to this function. - */ - written -= sizeof(stream->relayd_stream_id); - } - } - /* Else, use the default set before which is the filesystem. */ - } - - while (len > 0) { - do { - ret = write(outfd, stream->mmap_base + mmap_offset, len); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - perror("Error in file write"); - if (written == 0) { - written = ret; - } - goto end; - } else if (ret > len) { - perror("Error in file write"); - written += ret; - goto end; - } else { - len -= ret; - mmap_offset += ret; - } - - /* This call is useless on a socket so better save a syscall. */ - if (!relayd) { - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - } - written += ret; - } - lttng_consumer_sync_trace_file(stream, orig_offset); - -end: - /* Unlock only if ctrl socket used */ - if (relayd && stream->metadata_flag) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } - - rcu_read_unlock(); - - return written; -} - -/* - * Splice the data from the ring buffer to the tracefile. - * - * Returns the number of bytes spliced. - */ -ssize_t lttng_kconsumer_on_read_subbuffer_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) -{ - ssize_t ret = 0, written = 0, ret_splice = 0; - loff_t offset = 0; - off_t orig_offset = stream->out_fd_offset; - int fd = stream->wait_fd; - /* Default is on the disk */ - int outfd = stream->out_fd; - uint64_t metadata_id; - struct consumer_relayd_sock_pair *relayd = NULL; - - /* RCU lock for the relayd pointer */ - rcu_read_lock(); - - /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != -1) { - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd == NULL) { - goto end; - } - } - - /* Write metadata stream id before payload */ - if (stream->metadata_flag && relayd) { - /* - * Lock the control socket for the complete duration of the function - * since from this point on we will use the socket. - */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - - metadata_id = htobe64(stream->relayd_stream_id); - do { - ret = write(ctx->consumer_thread_pipe[1], - (void *) &metadata_id, - sizeof(stream->relayd_stream_id)); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata stream id"); - written = ret; - goto end; - } - DBG("Metadata stream id %zu written before data", - stream->relayd_stream_id); - } - - while (len > 0) { - DBG("splice chan to pipe offset %lu of len %lu (fd : %d)", - (unsigned long)offset, len, fd); - ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice chan to pipe, ret %zd", ret_splice); - if (ret_splice < 0) { - perror("Error in relay splice"); - if (written == 0) { - written = ret_splice; - } - ret = errno; - goto splice_error; - } - - /* Handle stream on the relayd if the output is on the network */ - if (relayd) { - if (stream->metadata_flag) { - /* Update counter to fit the spliced data */ - ret_splice += sizeof(stream->relayd_stream_id); - len += sizeof(stream->relayd_stream_id); - /* - * We do this so the return value can match the len passed as - * argument to this function. - */ - written -= sizeof(stream->relayd_stream_id); - } - - ret = consumer_handle_stream_before_relayd(stream, ret_splice); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; - } else { - if (outfd == -1) { - ERR("Remote relayd disconnected. Stopping"); - goto end; - } - } - } - - DBG3("Kernel consumer splice data in %d to out %d", - ctx->consumer_thread_pipe[0], outfd); - ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, - ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice pipe to file, ret %zd", ret_splice); - if (ret_splice < 0) { - perror("Error in file splice"); - if (written == 0) { - written = ret_splice; - } - ret = errno; - goto splice_error; - } - if (ret_splice > len) { - errno = EINVAL; - PERROR("Wrote more data than requested %zd (len: %lu)", - ret_splice, len); - written += ret_splice; - ret = errno; - goto splice_error; - } - len -= ret_splice; - - /* This call is useless on a socket so better save a syscall. */ - if (!relayd) { - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret_splice; - } - written += ret_splice; - } - lttng_consumer_sync_trace_file(stream, orig_offset); - - ret = ret_splice; - - goto end; - -splice_error: - /* send the appropriate error description to sessiond */ - switch (ret) { - case EBADF: - lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF); - break; - case EINVAL: - lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL); - break; - case ENOMEM: - lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM); - break; - case ESPIPE: - lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE); - break; - } - -end: - if (relayd && stream->metadata_flag) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } - - rcu_read_unlock(); - - return written; -} - /* * Take a snapshot for a specific fd *