Merge mmap/splice fct. for both consumers
authorDavid Goulet <dgoulet@efficios.com>
Mon, 30 Jul 2012 16:41:10 +0000 (12:41 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Mon, 30 Jul 2012 17:23:11 +0000 (13:23 -0400)
Between UST and kernel, there was duplicate portion of code for the mmap
and splice read subbuffer functions.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-consumerd/Makefile.am
src/bin/lttng-consumerd/lttng-consumerd.c
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/kernel-consumer/kernel-consumer.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h
tests/tools/streaming/run-ust

index acd58cdc398b4f4ae882aca010d37e78835131e9..89ae05958b7288469e05c7b2c1598cd783df1f88 100644 (file)
@@ -5,7 +5,6 @@ lttnglibexec_PROGRAMS = lttng-consumerd
 lttng_consumerd_SOURCES = lttng-consumerd.c lttng-consumerd.h
 
 lttng_consumerd_LDADD = \
-          $(top_builddir)/src/common/kernel-ctl/libkernel-ctl.la \
           $(top_builddir)/src/common/libconsumer.la \
           $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \
           $(top_builddir)/src/common/libcommon.la
index 3bc700dc948594a937c2ef543a8cbe4c3d4212f7..e5d7ba1c60051aebdd6c1f435b6c692294065e6f 100644 (file)
 
 #include <common/defaults.h>
 #include <common/common.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/consumer.h>
 #include <common/sessiond-comm/sessiond-comm.h>
-#include <common/ust-consumer/ust-consumer.h>
 
 #include "lttng-consumerd.h"
 
index 63d0d65ee3157b05ef1113336185535ea23a9dbe..1863cddc5757e6e1788642dd788629303a890881 100644 (file)
@@ -1087,18 +1087,128 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
+       unsigned long mmap_offset;
+       ssize_t ret = 0, written = 0;
+       off_t orig_offset = stream->out_fd_offset;
+       /* Default is on the disk */
+       int outfd = stream->out_fd;
+       uint64_t metadata_id;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+
+       /* RCU lock for the relayd pointer */
+       rcu_read_lock();
+
+       /* Flag that the current stream if set for network streaming. */
+       if (stream->net_seq_idx != -1) {
+               relayd = consumer_find_relayd(stream->net_seq_idx);
+               if (relayd == NULL) {
+                       goto end;
+               }
+       }
+
+       /* get the offset inside the fd to mmap */
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
+               ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+               break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
+               ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
+                               stream->buf, &mmap_offset);
+               break;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
        }
+       if (ret != 0) {
+               errno = -ret;
+               PERROR("tracer ctl get_mmap_read_offset");
+               written = ret;
+               goto end;
+       }
 
-       return 0;
+       /* Handle stream on the relayd if the output is on the network */
+       if (relayd) {
+               unsigned long netlen = len;
+
+               /*
+                * Lock the control socket for the complete duration of the function
+                * since from this point on we will use the socket.
+                */
+               if (stream->metadata_flag) {
+                       /* Metadata requires the control socket. */
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       netlen += sizeof(stream->relayd_stream_id);
+               }
+
+               ret = consumer_handle_stream_before_relayd(stream, netlen);
+               if (ret >= 0) {
+                       /* Use the returned socket. */
+                       outfd = ret;
+
+                       /* Write metadata stream id before payload */
+                       if (stream->metadata_flag) {
+                               metadata_id = htobe64(stream->relayd_stream_id);
+                               do {
+                                       ret = write(outfd, (void *) &metadata_id,
+                                                       sizeof(stream->relayd_stream_id));
+                               } while (ret < 0 && errno == EINTR);
+                               if (ret < 0) {
+                                       PERROR("write metadata stream id");
+                                       written = ret;
+                                       goto end;
+                               }
+                               DBG("Metadata stream id %zu written before data",
+                                               stream->relayd_stream_id);
+                               /*
+                                * We do this so the return value can match the len passed as
+                                * argument to this function.
+                                */
+                               written -= sizeof(stream->relayd_stream_id);
+                       }
+               }
+               /* Else, use the default set before which is the filesystem. */
+       }
+
+       while (len > 0) {
+               do {
+                       ret = write(outfd, stream->mmap_base + mmap_offset, len);
+               } while (ret < 0 && errno == EINTR);
+               if (ret < 0) {
+                       PERROR("Error in file write");
+                       if (written == 0) {
+                               written = ret;
+                       }
+                       goto end;
+               } else if (ret > len) {
+                       PERROR("Error in file write (ret %ld > len %lu)", ret, len);
+                       written += ret;
+                       goto end;
+               } else {
+                       len -= ret;
+                       mmap_offset += ret;
+               }
+               DBG("Consumer mmap write() ret %ld (len %lu)", ret, len);
+
+               /* This call is useless on a socket so better save a syscall. */
+               if (!relayd) {
+                       /* This won't block, but will start writeout asynchronously */
+                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
+                                       SYNC_FILE_RANGE_WRITE);
+                       stream->out_fd_offset += ret;
+               }
+               written += ret;
+       }
+       lttng_consumer_sync_trace_file(stream, orig_offset);
+
+end:
+       /* Unlock only if ctrl socket used */
+       if (relayd && stream->metadata_flag) {
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       }
+
+       rcu_read_unlock();
+       return written;
 }
 
 /*
@@ -1110,18 +1220,160 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
+       ssize_t ret = 0, written = 0, ret_splice = 0;
+       loff_t offset = 0;
+       off_t orig_offset = stream->out_fd_offset;
+       int fd = stream->wait_fd;
+       /* Default is on the disk */
+       int outfd = stream->out_fd;
+       uint64_t metadata_id;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
+               break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               /* Not supported for user space tracing */
                return -ENOSYS;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
-               return -ENOSYS;
        }
 
+       /* RCU lock for the relayd pointer */
+       rcu_read_lock();
+
+       /* Flag that the current stream if set for network streaming. */
+       if (stream->net_seq_idx != -1) {
+               relayd = consumer_find_relayd(stream->net_seq_idx);
+               if (relayd == NULL) {
+                       goto end;
+               }
+       }
+
+       /* Write metadata stream id before payload */
+       if (stream->metadata_flag && relayd) {
+               /*
+                * Lock the control socket for the complete duration of the function
+                * since from this point on we will use the socket.
+                */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+
+               metadata_id = htobe64(stream->relayd_stream_id);
+               do {
+                       ret = write(ctx->consumer_thread_pipe[1], (void *) &metadata_id,
+                                       sizeof(stream->relayd_stream_id));
+               } while (ret < 0 && errno == EINTR);
+               if (ret < 0) {
+                       PERROR("write metadata stream id");
+                       written = ret;
+                       goto end;
+               }
+               DBG("Metadata stream id %zu written before data",
+                               stream->relayd_stream_id);
+       }
+
+       while (len > 0) {
+               DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
+                               (unsigned long)offset, len, fd);
+               ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
+                               SPLICE_F_MOVE | SPLICE_F_MORE);
+               DBG("splice chan to pipe, ret %zd", ret_splice);
+               if (ret_splice < 0) {
+                       PERROR("Error in relay splice");
+                       if (written == 0) {
+                               written = ret_splice;
+                       }
+                       ret = errno;
+                       goto splice_error;
+               }
+
+               /* Handle stream on the relayd if the output is on the network */
+               if (relayd) {
+                       if (stream->metadata_flag) {
+                               /* Update counter to fit the spliced data */
+                               ret_splice += sizeof(stream->relayd_stream_id);
+                               len += sizeof(stream->relayd_stream_id);
+                               /*
+                                * We do this so the return value can match the len passed as
+                                * argument to this function.
+                                */
+                               written -= sizeof(stream->relayd_stream_id);
+                       }
+
+                       ret = consumer_handle_stream_before_relayd(stream, ret_splice);
+                       if (ret >= 0) {
+                               /* Use the returned socket. */
+                               outfd = ret;
+                       } else {
+                               if (outfd == -1) {
+                                       ERR("Remote relayd disconnected. Stopping");
+                                       goto end;
+                               }
+                       }
+               }
+
+               /* Splice data out */
+               ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
+                               ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
+               DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice);
+               if (ret_splice < 0) {
+                       PERROR("Error in file splice");
+                       if (written == 0) {
+                               written = ret_splice;
+                       }
+                       ret = errno;
+                       goto splice_error;
+               } else if (ret_splice > len) {
+                       errno = EINVAL;
+                       PERROR("Wrote more data than requested %zd (len: %lu)",
+                                       ret_splice, len);
+                       written += ret_splice;
+                       ret = errno;
+                       goto splice_error;
+               }
+               len -= ret_splice;
+
+               /* This call is useless on a socket so better save a syscall. */
+               if (!relayd) {
+                       /* This won't block, but will start writeout asynchronously */
+                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
+                                       SYNC_FILE_RANGE_WRITE);
+                       stream->out_fd_offset += ret_splice;
+               }
+               written += ret_splice;
+       }
+       lttng_consumer_sync_trace_file(stream, orig_offset);
+
+       ret = ret_splice;
+
+       goto end;
+
+splice_error:
+       /* send the appropriate error description to sessiond */
+       switch (ret) {
+       case EBADF:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
+               break;
+       case EINVAL:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
+               break;
+       case ENOMEM:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
+               break;
+       case ESPIPE:
+               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
+               break;
+       }
+
+end:
+       if (relayd && stream->metadata_flag) {
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       }
+
+       rcu_read_unlock();
+       return written;
 }
 
 /*
index cc90933e78215890423311e7cb9a233586f3f700..5cbb47dcfa5945534e48551f2fa9393486ddce2f 100644 (file)
@@ -17,8 +17,8 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef _LTTNG_CONSUMER_H
-#define _LTTNG_CONSUMER_H
+#ifndef LIB_CONSUMER_H
+#define LIB_CONSUMER_H
 
 #include <limits.h>
 #include <poll.h>
@@ -382,4 +382,4 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
 
-#endif /* _LTTNG_CONSUMER_H */
+#endif /* LIB_CONSUMER_H */
index 22bf1002097a1e33a846f2a5c04d6305f6308d69..8c2bee33363614cbf58a4e5082aad84d9b8140ad 100644 (file)
@@ -41,285 +41,6 @@ extern struct lttng_consumer_global_data consumer_data;
 extern int consumer_poll_timeout;
 extern volatile int consumer_quit;
 
-/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
- *
- * Returns the number of bytes written
- */
-ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len)
-{
-       unsigned long mmap_offset;
-       ssize_t ret = 0, written = 0;
-       off_t orig_offset = stream->out_fd_offset;
-       int fd = stream->wait_fd;
-       /* Default is on the disk */
-       int outfd = stream->out_fd;
-       uint64_t metadata_id;
-       struct consumer_relayd_sock_pair *relayd = NULL;
-
-       /* RCU lock for the relayd pointer */
-       rcu_read_lock();
-
-       /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != -1) {
-               relayd = consumer_find_relayd(stream->net_seq_idx);
-               if (relayd == NULL) {
-                       goto end;
-               }
-       }
-
-       /* get the offset inside the fd to mmap */
-       ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
-       if (ret != 0) {
-               errno = -ret;
-               perror("kernctl_get_mmap_read_offset");
-               written = ret;
-               goto end;
-       }
-
-       /* Handle stream on the relayd if the output is on the network */
-       if (relayd) {
-               unsigned long netlen = len;
-
-               /*
-                * Lock the control socket for the complete duration of the function
-                * since from this point on we will use the socket.
-                */
-               if (stream->metadata_flag) {
-                       /* Metadata requires the control socket. */
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-                       netlen += sizeof(stream->relayd_stream_id);
-               }
-
-               ret = consumer_handle_stream_before_relayd(stream, netlen);
-               if (ret >= 0) {
-                       /* Use the returned socket. */
-                       outfd = ret;
-
-                       /* Write metadata stream id before payload */
-                       if (stream->metadata_flag) {
-                               metadata_id = htobe64(stream->relayd_stream_id);
-                               do {
-                                       ret = write(outfd, (void *) &metadata_id,
-                                                       sizeof(stream->relayd_stream_id));
-                               } while (ret < 0 && errno == EINTR);
-                               if (ret < 0) {
-                                       PERROR("write metadata stream id");
-                                       written = ret;
-                                       goto end;
-                               }
-                               DBG("Metadata stream id %zu written before data",
-                                               stream->relayd_stream_id);
-                               /*
-                                * We do this so the return value can match the len passed as
-                                * argument to this function.
-                                */
-                               written -= sizeof(stream->relayd_stream_id);
-                       }
-               }
-               /* Else, use the default set before which is the filesystem. */
-       }
-
-       while (len > 0) {
-               do {
-                       ret = write(outfd, stream->mmap_base + mmap_offset, len);
-               } while (ret < 0 && errno == EINTR);
-               if (ret < 0) {
-                       perror("Error in file write");
-                       if (written == 0) {
-                               written = ret;
-                       }
-                       goto end;
-               } else if (ret > len) {
-                       perror("Error in file write");
-                       written += ret;
-                       goto end;
-               } else {
-                       len -= ret;
-                       mmap_offset += ret;
-               }
-
-               /* This call is useless on a socket so better save a syscall. */
-               if (!relayd) {
-                       /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
-                                       SYNC_FILE_RANGE_WRITE);
-                       stream->out_fd_offset += ret;
-               }
-               written += ret;
-       }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
-
-end:
-       /* Unlock only if ctrl socket used */
-       if (relayd && stream->metadata_flag) {
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       }
-
-       rcu_read_unlock();
-
-       return written;
-}
-
-/*
- * Splice the data from the ring buffer to the tracefile.
- *
- * Returns the number of bytes spliced.
- */
-ssize_t lttng_kconsumer_on_read_subbuffer_splice(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len)
-{
-       ssize_t ret = 0, written = 0, ret_splice = 0;
-       loff_t offset = 0;
-       off_t orig_offset = stream->out_fd_offset;
-       int fd = stream->wait_fd;
-       /* Default is on the disk */
-       int outfd = stream->out_fd;
-       uint64_t metadata_id;
-       struct consumer_relayd_sock_pair *relayd = NULL;
-
-       /* RCU lock for the relayd pointer */
-       rcu_read_lock();
-
-       /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != -1) {
-               relayd = consumer_find_relayd(stream->net_seq_idx);
-               if (relayd == NULL) {
-                       goto end;
-               }
-       }
-
-       /* Write metadata stream id before payload */
-       if (stream->metadata_flag && relayd) {
-               /*
-                * Lock the control socket for the complete duration of the function
-                * since from this point on we will use the socket.
-                */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-
-               metadata_id = htobe64(stream->relayd_stream_id);
-               do {
-                       ret = write(ctx->consumer_thread_pipe[1],
-                                       (void *) &metadata_id,
-                                       sizeof(stream->relayd_stream_id));
-               } while (ret < 0 && errno == EINTR);
-               if (ret < 0) {
-                       PERROR("write metadata stream id");
-                       written = ret;
-                       goto end;
-               }
-               DBG("Metadata stream id %zu written before data",
-                               stream->relayd_stream_id);
-       }
-
-       while (len > 0) {
-               DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
-                               (unsigned long)offset, len, fd);
-               ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len,
-                               SPLICE_F_MOVE | SPLICE_F_MORE);
-               DBG("splice chan to pipe, ret %zd", ret_splice);
-               if (ret_splice < 0) {
-                       perror("Error in relay splice");
-                       if (written == 0) {
-                               written = ret_splice;
-                       }
-                       ret = errno;
-                       goto splice_error;
-               }
-
-               /* Handle stream on the relayd if the output is on the network */
-               if (relayd) {
-                       if (stream->metadata_flag) {
-                               /* Update counter to fit the spliced data */
-                               ret_splice += sizeof(stream->relayd_stream_id);
-                               len += sizeof(stream->relayd_stream_id);
-                               /*
-                                * We do this so the return value can match the len passed as
-                                * argument to this function.
-                                */
-                               written -= sizeof(stream->relayd_stream_id);
-                       }
-
-                       ret = consumer_handle_stream_before_relayd(stream, ret_splice);
-                       if (ret >= 0) {
-                               /* Use the returned socket. */
-                               outfd = ret;
-                       } else {
-                               if (outfd == -1) {
-                                       ERR("Remote relayd disconnected. Stopping");
-                                       goto end;
-                               }
-                       }
-               }
-
-               DBG3("Kernel consumer splice data in %d to out %d",
-                               ctx->consumer_thread_pipe[0], outfd);
-               ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL,
-                               ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
-               DBG("splice pipe to file, ret %zd", ret_splice);
-               if (ret_splice < 0) {
-                       perror("Error in file splice");
-                       if (written == 0) {
-                               written = ret_splice;
-                       }
-                       ret = errno;
-                       goto splice_error;
-               }
-               if (ret_splice > len) {
-                       errno = EINVAL;
-                       PERROR("Wrote more data than requested %zd (len: %lu)",
-                                       ret_splice, len);
-                       written += ret_splice;
-                       ret = errno;
-                       goto splice_error;
-               }
-               len -= ret_splice;
-
-               /* This call is useless on a socket so better save a syscall. */
-               if (!relayd) {
-                       /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice,
-                                       SYNC_FILE_RANGE_WRITE);
-                       stream->out_fd_offset += ret_splice;
-               }
-               written += ret_splice;
-       }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
-
-       ret = ret_splice;
-
-       goto end;
-
-splice_error:
-       /* send the appropriate error description to sessiond */
-       switch (ret) {
-       case EBADF:
-               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF);
-               break;
-       case EINVAL:
-               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL);
-               break;
-       case ENOMEM:
-               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM);
-               break;
-       case ESPIPE:
-               lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE);
-               break;
-       }
-
-end:
-       if (relayd && stream->metadata_flag) {
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       }
-
-       rcu_read_unlock();
-
-       return written;
-}
-
 /*
  * Take a snapshot for a specific fd
  *
index 18b5396eb111e92acdaf71c10420de93a9c408ea..fd3e6d1cbe5f49437799403b3c3397b378b5d924 100644 (file)
 
 #include <common/consumer.h>
 
-/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
- *
- * Returns the number of bytes written, or negative value on error.
- */
-extern ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len);
-
-/*
- * Splice the data from the ring buffer to the tracefile.
- *
- * Returns the number of bytes spliced, or negative error value on
- * error.
- */
-extern ssize_t lttng_kconsumer_on_read_subbuffer_splice(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len);
-
 /*
  * Take a snapshot for a specific fd
  *
index 07a68d8f755da7813d765357b5a2379f86b0b589..1fed58bdc8d5be33179bf5b70e346e09b842c5d0 100644 (file)
@@ -18,6 +18,7 @@
 
 #define _GNU_SOURCE
 #include <assert.h>
+#include <lttng/ust-ctl.h>
 #include <poll.h>
 #include <pthread.h>
 #include <stdlib.h>
@@ -27,7 +28,6 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <unistd.h>
-#include <lttng/ust-ctl.h>
 
 #include <common/common.h>
 #include <common/sessiond-comm/sessiond-comm.h>
@@ -41,127 +41,14 @@ extern int consumer_poll_timeout;
 extern volatile int consumer_quit;
 
 /*
- * Mmap the ring buffer, read it and write the data to the tracefile.
- *
- * Returns the number of bytes written, else negative value on error.
+ * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
+ * compiled out, we isolate it in this library.
  */
-ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len)
+int lttng_ustctl_get_mmap_read_offset(struct lttng_ust_shm_handle *handle,
+               struct lttng_ust_lib_ring_buffer *buf, unsigned long *off)
 {
-       unsigned long mmap_offset;
-       long ret = 0, written = 0;
-       off_t orig_offset = stream->out_fd_offset;
-       int outfd = stream->out_fd;
-       uint64_t metadata_id;
-       struct consumer_relayd_sock_pair *relayd = NULL;
-
-       /* RCU lock for the relayd pointer */
-       rcu_read_lock();
-
-       /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != -1) {
-               relayd = consumer_find_relayd(stream->net_seq_idx);
-               if (relayd == NULL) {
-                       ERR("UST consumer mmap(), unable to find relay for index %d",
-                                       stream->net_seq_idx);
-                       goto end;
-               }
-       }
-
-       /* get the offset inside the fd to mmap */
-       ret = ustctl_get_mmap_read_offset(stream->chan->handle,
-               stream->buf, &mmap_offset);
-       if (ret != 0) {
-               errno = -ret;
-               PERROR("ustctl_get_mmap_read_offset");
-               written = ret;
-               goto end;
-       }
-
-       /* Handle stream on the relayd if the output is on the network */
-       if (relayd) {
-               unsigned long netlen = len;
-
-               if (stream->metadata_flag) {
-                       /* Only lock if metadata since we use the control socket. */
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-                       netlen += sizeof(stream->relayd_stream_id);
-               }
-
-               ret = consumer_handle_stream_before_relayd(stream, netlen);
-               if (ret >= 0) {
-                       outfd = ret;
-
-                       /* Write metadata stream id before payload */
-                       if (stream->metadata_flag) {
-                               metadata_id = htobe64(stream->relayd_stream_id);
-                               do {
-                                       ret = write(outfd, (void *) &metadata_id,
-                                                       sizeof(stream->relayd_stream_id));
-                               } while (ret < 0 && errno == EINTR);
-                               if (ret < 0) {
-                                       PERROR("write metadata stream id");
-                                       written = ret;
-                                       goto end;
-                               }
-                               DBG("Metadata stream id %zu written before data",
-                                               stream->relayd_stream_id);
-                       }
-               }
-               /* Else, use the default set before which is the filesystem. */
-       }
-
-       while (len > 0) {
-               do {
-                       ret = write(outfd, stream->mmap_base + mmap_offset, len);
-               } while (ret < 0 && errno == EINTR);
-               if (ret < 0) {
-                       PERROR("Error in file write");
-                       if (written == 0) {
-                               written = ret;
-                       }
-                       goto end;
-               } else if (ret > len) {
-                       PERROR("ret %ld > len %lu", ret, len);
-                       written += ret;
-                       goto end;
-               } else {
-                       len -= ret;
-                       mmap_offset += ret;
-               }
-               DBG("UST mmap write() ret %ld (len %lu)", ret, len);
-
-               /* This call is useless on a socket so better save a syscall. */
-               if (!relayd) {
-                       /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
-                                       SYNC_FILE_RANGE_WRITE);
-                       stream->out_fd_offset += ret;
-               }
-               written += ret;
-       }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
-
-end:
-       if (relayd && stream->metadata_flag) {
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       }
-       rcu_read_unlock();
-       return written;
-}
-
-/*
- * Splice the data from the ring buffer to the tracefile.
- *
- * Returns the number of bytes spliced.
- */
-ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len)
-{
-       return -ENOSYS;
-}
+       return ustctl_get_mmap_read_offset(handle, buf, off);
+};
 
 /*
  * Take a snapshot for a specific fd
index c07377f8ea38711cfbd72951982ff173640ff683..7e055a95fffa37b3cc817c93d76929109bd78a8c 100644 (file)
 
 #ifdef HAVE_LIBLTTNG_UST_CTL
 
-/*
- * Mmap the ring buffer, read it and write the data to the tracefile.
- *
- * Returns the number of bytes written, else negative value on error.
- */
-extern ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len);
-
-/* Not implemented */
-extern ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len);
-
 /*
  * Take a snapshot for a specific fd
  *
@@ -72,6 +58,10 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
 
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
 
+extern int lttng_ustctl_get_mmap_read_offset(
+               struct lttng_ust_shm_handle *handle,
+               struct lttng_ust_lib_ring_buffer *buf, unsigned long *off);
+
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
 static inline
@@ -153,6 +143,12 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
 }
 
+static inline
+int lttng_ustctl_get_mmap_read_offset(struct lttng_ust_shm_handle *handle,
+               struct lttng_ust_lib_ring_buffer *buf, unsigned long *off)
+{
+       return -ENOSYS;
+}
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTTNG_USTCONSUMER_H */
index 2e88d2a9fc8541b4aa686e39cf0f30054f433cb4..0149918e22dff17f7489c27084c5fac5e2eb11cc 100755 (executable)
@@ -22,6 +22,8 @@ SESSION_NAME="stream"
 EVENT_NAME="tp:tptest"
 PID_RELAYD=0
 
+TRACE_PATH=$(mktemp -d)
+
 source $TESTDIR/utils.sh
 
 echo -e "\n-------------------------------"
@@ -91,7 +93,7 @@ function test_ust_after_start ()
 }
 
 start_sessiond
-lttng_start_relayd
+lttng_start_relayd "-o $TRACE_PATH"
 
 tests=( test_ust_before_start test_ust_after_start )
 
@@ -101,11 +103,10 @@ do
        ${fct_test}
 
        # Validate test
-       validate_trace $EVENT_NAME ~/lttng-traces/$HOSTNAME/$SESSION_NAME*
+       validate_trace $EVENT_NAME $TRACE_PATH/$HOSTNAME/$SESSION_NAME*
        if [ $? -eq 0 ]; then
                # Only delete if successful
-               rm -rf ~/lttng-traces/$HOSTNAME/$SESSION_NAME*
-               rm -rf ~/lttng-traces/$SESSION_NAME*
+               rm -rf $TRACE_PATH
        else
                break
        fi
This page took 0.037864 seconds and 4 git commands to generate.