Merge mmap/splice fct. for both consumers
[lttng-tools.git] / src / common / consumer.c
index 63d0d65ee3157b05ef1113336185535ea23a9dbe..1863cddc5757e6e1788642dd788629303a890881 100644 (file)
@@ -1087,18 +1087,128 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
+       unsigned long mmap_offset;
+       ssize_t ret = 0, written = 0;
+       off_t orig_offset = stream->out_fd_offset;
+       /* Default is on the disk */
+       int outfd = stream->out_fd;
+       uint64_t metadata_id;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+
+       /* RCU lock for the relayd pointer */
+       rcu_read_lock();
+
+       /* Flag that the current stream if set for network streaming. */
+       if (stream->net_seq_idx != -1) {
+               relayd = consumer_find_relayd(stream->net_seq_idx);
+               if (relayd == NULL) {
+                       goto end;
+               }
+       }
+
+       /* get the offset inside the fd to mmap */
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
+               ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+               break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
+               ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
+                               stream->buf, &mmap_offset);
+               break;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
        }
+       if (ret != 0) {
+               errno = -ret;
+               PERROR("tracer ctl get_mmap_read_offset");
+               written = ret;
+               goto end;
+       }
 
-       return 0;
+       /* Handle stream on the relayd if the output is on the network */
+       if (relayd) {
+               unsigned long netlen = len;
+
+               /*
+                * Lock the control socket for the complete duration of the function
+                * since from this point on we will use the socket.
+                */
+               if (stream->metadata_flag) {
+                       /* Metadata requires the control socket. */
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       netlen += sizeof(stream->relayd_stream_id);
+               }
+
+               ret = consumer_handle_stream_before_relayd(stream, netlen);
+               if (ret >= 0) {
+                       /* Use the returned socket. */
+                       outfd = ret;
+
+                       /* Write metadata stream id before payload */
+                       if (stream->metadata_flag) {
+                               metadata_id = htobe64(stream->relayd_stream_id);
+                               do {
+                                       ret = write(outfd, (void *) &metadata_id,
+                                                       sizeof(stream->relayd_stream_id));
+                               } while (ret < 0 && errno == EINTR);
+                               if (ret < 0) {
+                                       PERROR("write metadata stream id");
+                                       written = ret;
+                                       goto end;
+                               }
+                               DBG("Metadata stream id %zu written before data",
+                                               stream->relayd_stream_id);
+                               /*
+                                * We do this so the return value can match the len passed as
+                                * argument to this function.
+                                */
+                               written -= sizeof(stream->relayd_stream_id);
+                       }
+               }
+               /* Else, use the default set before which is the filesystem. */
+       }
+
+       while (len > 0) {
+               do {
+                       ret = write(outfd, stream->mmap_base + mmap_offset, len);
+               } while (ret < 0 && errno == EINTR);
+               if (ret < 0) {
+                       PERROR("Error in file write");
+                       if (written == 0) {
+                               written = ret;
+                       }
+                       goto end;
+               } else if (ret > len) {
+                       PERROR("Error in file write (ret %ld > len %lu)", ret, len);
+                       written += ret;
+                       goto end;
+               } else {
+                       len -= ret;
+                       mmap_offset += ret;
+               }
+               DBG("Consumer mmap write() ret %ld (len %lu)", ret, len);
+
+               /* This call is useless on a socket so better save a syscall. */
+               if (!relayd) {
+                       /* This won't block, but will start writeout asynchronously */
+                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
+                                       SYNC_FILE_RANGE_WRITE);
+                       stream->out_fd_offset += ret;
+               }
+               written += ret;
+       }
+       lttng_consumer_sync_trace_file(stream, orig_offset);
+
+end:
+       /* Unlock only if ctrl socket used */
+       if (relayd && stream->metadata_flag) {
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       }
+
+       rcu_read_unlock();
+       return written;
 }
 
 /*
@@ -1110,18 +1220,160 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
+       ssize_t ret = 0, written = 0, ret_splice = 0;
+       loff_t offset = 0;
+       off_t orig_offset = stream->out_fd_offset;
+       int fd = stream->wait_fd;
+       /* Default is on the disk */
+       int outfd = stream->out_fd;
+       uint64_t metadata_id;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
+               break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               /* Not supported for user space tracing */
                return -ENOSYS;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
-               return -ENOSYS;
        }
 
+       /* RCU lock for the relayd pointer */
+       rcu_read_lock();
+
+       /* Flag that the current stream if set for network streaming. */
+       if (stream->net_seq_idx != -1) {
+               relayd = consumer_find_relayd(stream->net_seq_idx);
+               if (relayd == NULL) {
+                       goto end;
+               }
+       }
+
+       /* Write metadata stream id before payload */
+       if (stream->metadata_flag && relayd) {
+               /*
+                * Lock the control socket for the complete duration of the function
+                * since from this point on we will use the socket.
+                */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+
+               metadata_id = htobe64(stream->relayd_stream_id);
+               do {
+                       ret = write(ctx->consumer_thread_pipe[1], (void *) &metadata_id,
+                                       sizeof(stream->relayd_stream_id));
+               } while (ret < 0 && errno == EINTR);
+               if (ret < 0) {
+                       PERROR("write metadata stream id");
+                       written = ret;
+                       goto end;
+               }
+               DBG("Metadata stream id %zu written before data",
+                               stream->relayd_stream_id);
+       }
+
+       while (len > 0) {
+               DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
+                               (unsigned long)offset, len, fd);
+               ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
+                               SPLICE_F_MOVE | SPLICE_F_MORE);
+               DBG("splice chan to pipe, ret %zd", ret_splice);
+               if (ret_splice < 0) {
+                       PERROR("Error in relay splice");
+                       if (written == 0) {
+                               written = ret_splice;
+                       }
+                       ret = errno;
+                       goto splice_error;
+               }
+
+               /* Handle stream on the relayd if the output is on the network */
+               if (relayd) {
+                       if (stream->metadata_flag) {
+                               /* Update counter to fit the spliced data */
+                               ret_splice += sizeof(stream->relayd_stream_id);
+                               len += sizeof(stream->relayd_stream_id);
+                               /*
+                                * We do this so the return value can match the len passed as
+                                * argument to this function.
+                                */
+                               written -= sizeof(stream->relayd_stream_id);
+                       }
+
+                       ret = consumer_handle_stream_before_relayd(stream, ret_splice);
+                       if (ret >= 0) {
+                               /* Use the returned socket. */
+                               outfd = ret;
+                       } else {
+                               if (outfd == -1) {
+                                       ERR("Remote relayd disconnected. Stopping");
+                                       goto end;
+                               }
+                       }
+               }
+
+               /* Splice data out */
+               ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
+                               ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
+               DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice);
+               if (ret_splice < 0) {
+                       PERROR("Error in file splice");
+                       if (written == 0) {
+                               written = ret_splice;
+                       }
+                       ret = errno;
+                       goto splice_error;
+               } else if (ret_splice > len) {
+                       errno = EINVAL;
+                       PERROR("Wrote more data than requested %zd (len: %lu)",
+                                       ret_splice, len);
+                       written += ret_splice;
+                       ret = errno;
+                       goto splice_error;
+               }
+               len -= ret_splice;
+
+               /* This call is useless on a socket so better save a syscall. */
+               if (!relayd) {
+                       /* This won't block, but will start writeout asynchronously */
+                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
+                                       SYNC_FILE_RANGE_WRITE);
+                       stream->out_fd_offset += ret_splice;
+               }
+               written += ret_splice;
+       }
+       lttng_consumer_sync_trace_file(stream, orig_offset);
+
+       ret = ret_splice;
+
+       goto end;
+
+splice_error:
+       /* send the appropriate error description to sessiond */
+       switch (ret) {
+       case EBADF:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
+               break;
+       case EINVAL:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
+               break;
+       case ENOMEM:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
+               break;
+       case ESPIPE:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
+               break;
+       }
+
+end:
+       if (relayd && stream->metadata_flag) {
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       }
+
+       rcu_read_unlock();
+       return written;
 }
 
 /*
This page took 0.02661 seconds and 4 git commands to generate.