-/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
- *
- * Returns the number of bytes written
- */
-ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
-{
- unsigned long mmap_offset;
- ssize_t ret = 0, written = 0;
- off_t orig_offset = stream->out_fd_offset;
- int fd = stream->wait_fd;
- /* Default is on the disk */
- int outfd = stream->out_fd;
- uint64_t metadata_id;
- struct consumer_relayd_sock_pair *relayd = NULL;
-
- /* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
- goto end;
- }
- }
-
- /* get the offset inside the fd to mmap */
- ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
- if (ret != 0) {
- errno = -ret;
- perror("kernctl_get_mmap_read_offset");
- written = ret;
- goto end;
- }
-
- /* RCU lock for the relayd pointer */
- rcu_read_lock();
-
- /* Handle stream on the relayd if the output is on the network */
- if (relayd) {
- /*
- * Lock the control socket for the complete duration of the function
- * since from this point on we will use the socket.
- */
- if (stream->metadata_flag) {
- /* Metadata requires the control socket. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- }
-
- ret = consumer_handle_stream_before_relayd(stream, len);
- if (ret >= 0) {
- /* Use the returned socket. */
- outfd = ret;
-
- /* Write metadata stream id before payload */
- if (stream->metadata_flag) {
- metadata_id = htobe64(stream->relayd_stream_id);
- do {
- ret = write(outfd, (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- if (ret < 0) {
- PERROR("write metadata stream id");
- written = ret;
- goto end;
- }
- } while (errno == EINTR);
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
- /*
- * We do this so the return value can match the len passed as
- * argument to this function.
- */
- written -= sizeof(stream->relayd_stream_id);
- }
- }
- /* Else, use the default set before which is the filesystem. */
- }
-
- while (len > 0) {
- ret = write(outfd, stream->mmap_base + mmap_offset, len);
- if (ret < 0) {
- if (errno == EINTR) {
- /* restart the interrupted system call */
- continue;
- } else {
- perror("Error in file write");
- if (written == 0) {
- written = ret;
- }
- goto end;
- }
- } else if (ret > len) {
- perror("Error in file write");
- written += ret;
- goto end;
- } else {
- len -= ret;
- mmap_offset += ret;
- }
-
- /* This call is useless on a socket so better save a syscall. */
- if (!relayd) {
- /* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
- SYNC_FILE_RANGE_WRITE);
- stream->out_fd_offset += ret;
- }
- written += ret;
- }
- lttng_consumer_sync_trace_file(stream, orig_offset);
-
-end:
- /* Unlock only if ctrl socket used */
- if (relayd && stream->metadata_flag) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- }
-
- rcu_read_unlock();
-
- return written;
-}
-
-/*
- * Splice the data from the ring buffer to the tracefile.
- *
- * Returns the number of bytes spliced.
- */
-ssize_t lttng_kconsumer_on_read_subbuffer_splice(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
-{
- ssize_t ret = 0, written = 0, ret_splice = 0;
- loff_t offset = 0;
- off_t orig_offset = stream->out_fd_offset;
- int fd = stream->wait_fd;
- /* Default is on the disk */
- int outfd = stream->out_fd;
- uint64_t metadata_id;
- struct consumer_relayd_sock_pair *relayd = NULL;
-
- /* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
- goto end;
- }
- }
-
- /* RCU lock for the relayd pointer */
- rcu_read_lock();
-
- /* Write metadata stream id before payload */
- if (stream->metadata_flag && relayd) {
- /*
- * Lock the control socket for the complete duration of the function
- * since from this point on we will use the socket.
- */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-
- do {
- metadata_id = htobe64(stream->relayd_stream_id);
- ret = write(ctx->consumer_thread_pipe[1],
- (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- if (ret < 0) {
- PERROR("write metadata stream id");
- written = ret;
- goto end;
- }
- } while (errno == EINTR);
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
- }
-
- while (len > 0) {
- DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
- (unsigned long)offset, len, fd);
- ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
- SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("splice chan to pipe, ret %zd", ret_splice);
- if (ret_splice < 0) {
- perror("Error in relay splice");
- if (written == 0) {
- written = ret_splice;
- }
- ret = errno;
- goto splice_error;
- }
-
- /* Handle stream on the relayd if the output is on the network */
- if (relayd) {
- if (stream->metadata_flag) {
- /* Update counter to fit the spliced data */
- ret_splice += sizeof(stream->relayd_stream_id);
- len += sizeof(stream->relayd_stream_id);
- /*
- * We do this so the return value can match the len passed as
- * argument to this function.
- */
- written -= sizeof(stream->relayd_stream_id);
- }
-
- ret = consumer_handle_stream_before_relayd(stream, ret_splice);
- if (ret >= 0) {
- /* Use the returned socket. */
- outfd = ret;
- } else {
- if (outfd == -1) {
- ERR("Remote relayd disconnected. Stopping");
- goto end;
- }
- }
- }
-
- DBG3("Kernel consumer splice data in %d to out %d",
- ctx->consumer_thread_pipe[0], outfd);
- ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
- ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("splice pipe to file, ret %zd", ret_splice);
- if (ret_splice < 0) {
- perror("Error in file splice");
- if (written == 0) {
- written = ret_splice;
- }
- ret = errno;
- goto splice_error;
- }
- if (ret_splice > len) {
- errno = EINVAL;
- PERROR("Wrote more data than requested %zd (len: %lu)",
- ret_splice, len);
- written += ret_splice;
- ret = errno;
- goto splice_error;
- }
- len -= ret_splice;
-
- /* This call is useless on a socket so better save a syscall. */
- if (!relayd) {
- /* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
- SYNC_FILE_RANGE_WRITE);
- stream->out_fd_offset += ret_splice;
- }
- written += ret_splice;
- }
- lttng_consumer_sync_trace_file(stream, orig_offset);
-
- ret = ret_splice;
-
- goto end;
-
-splice_error:
- /* send the appropriate error description to sessiond */
- switch (ret) {
- case EBADF:
- lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
- break;
- case EINVAL:
- lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
- break;
- case ENOMEM:
- lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
- break;
- case ESPIPE:
- lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
- break;
- }
-
-end:
- if (relayd && stream->metadata_flag) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- }
-
- rcu_read_unlock();
-
- return written;
-}
-