X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=1863cddc5757e6e1788642dd788629303a890881;hp=63d0d65ee3157b05ef1113336185535ea23a9dbe;hb=f02e1e8a5820da2eda835add020f92ca8d32c973;hpb=173af62f4804133d4a7f45e34b6f72126f3eca5f diff --git a/src/common/consumer.c b/src/common/consumer.c index 63d0d65ee..1863cddc5 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1087,18 +1087,128 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len) { + unsigned long mmap_offset; + ssize_t ret = 0, written = 0; + off_t orig_offset = stream->out_fd_offset; + /* Default is on the disk */ + int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } + + /* get the offset inside the fd to mmap */ switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len); + ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len); + ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle, + stream->buf, &mmap_offset); + break; default: ERR("Unknown consumer_data type"); assert(0); } + if (ret != 0) { + errno = -ret; + PERROR("tracer ctl get_mmap_read_offset"); + written = ret; + goto end; + } - return 0; + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + unsigned long netlen = len; + + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + if (stream->metadata_flag) { + /* Metadata requires the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + netlen += sizeof(stream->relayd_stream_id); + } + + ret = consumer_handle_stream_before_relayd(stream, netlen); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(outfd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + } + /* Else, use the default set before which is the filesystem. */ + } + + while (len > 0) { + do { + ret = write(outfd, stream->mmap_base + mmap_offset, len); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Error in file write"); + if (written == 0) { + written = ret; + } + goto end; + } else if (ret > len) { + PERROR("Error in file write (ret %ld > len %lu)", ret, len); + written += ret; + goto end; + } else { + len -= ret; + mmap_offset += ret; + } + DBG("Consumer mmap write() ret %ld (len %lu)", ret, len); + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } + written += ret; + } + lttng_consumer_sync_trace_file(stream, orig_offset); + +end: + /* Unlock only if ctrl socket used */ + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + return written; } /* @@ -1110,18 +1220,160 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len) { + ssize_t ret = 0, written = 0, ret_splice = 0; + loff_t offset = 0; + off_t orig_offset = stream->out_fd_offset; + int fd = stream->wait_fd; + /* Default is on the disk */ + int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len); + break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: + /* Not supported for user space tracing */ return -ENOSYS; default: ERR("Unknown consumer_data type"); assert(0); - return -ENOSYS; } + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } + + /* Write metadata stream id before payload */ + if (stream->metadata_flag && relayd) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(ctx->consumer_thread_pipe[1], (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + } + + while (len > 0) { + DBG("splice chan to pipe offset %lu of len %lu (fd : %d)", + (unsigned long)offset, len, fd); + ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice chan to pipe, ret %zd", ret_splice); + if (ret_splice < 0) { + PERROR("Error in relay splice"); + if (written == 0) { + written = ret_splice; + } + ret = errno; + goto splice_error; + } + + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + if (stream->metadata_flag) { + /* Update counter to fit the spliced data */ + ret_splice += sizeof(stream->relayd_stream_id); + len += sizeof(stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + + ret = consumer_handle_stream_before_relayd(stream, ret_splice); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + } else { + if (outfd == -1) { + ERR("Remote relayd disconnected. Stopping"); + goto end; + } + } + } + + /* Splice data out */ + ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, + ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice); + if (ret_splice < 0) { + PERROR("Error in file splice"); + if (written == 0) { + written = ret_splice; + } + ret = errno; + goto splice_error; + } else if (ret_splice > len) { + errno = EINVAL; + PERROR("Wrote more data than requested %zd (len: %lu)", + ret_splice, len); + written += ret_splice; + ret = errno; + goto splice_error; + } + len -= ret_splice; + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret_splice; + } + written += ret_splice; + } + lttng_consumer_sync_trace_file(stream, orig_offset); + + ret = ret_splice; + + goto end; + +splice_error: + /* send the appropriate error description to sessiond */ + switch (ret) { + case EBADF: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF); + break; + case EINVAL: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL); + break; + case ENOMEM: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM); + break; + case ESPIPE: + lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE); + break; + } + +end: + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + return written; } /*