kconsumerd: fix strict aliasing
[lttng-tools.git] / liblttngkconsumerd / lttngkconsumerd.c
index ba26026c4458cb955ce70a5c2727a8b3b7413e48..1893e0ae66a2f110150b7e6aa219d32771c0b0c6 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #define _GNU_SOURCE
+#include <assert.h>
 #include <fcntl.h>
 #include <poll.h>
 #include <pthread.h>
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <unistd.h>
-#include <urcu/list.h>
 
+#include <lttng-kernel-ctl.h>
+#include <lttng-sessiond-comm.h>
 #include <lttng/lttng-kconsumerd.h>
-
-#include "kernelctl.h"
-#include "lttngerr.h"
-#include "lttng-sessiond-comm.h"
+#include <lttngerr.h>
 
 static struct lttng_kconsumerd_global_data {
        /*
@@ -125,22 +124,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf)
 }
 
 /*
- * Add a fd to the global list protected by a mutex.
+ * Create a struct lttcomm_kconsumerd_msg from the
+ * information received on the receiving socket
  */
-static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
+struct lttng_kconsumerd_fd *kconsumerd_allocate_fd(
+               struct lttcomm_kconsumerd_msg *buf,
                int consumerd_fd)
 {
        struct lttng_kconsumerd_fd *tmp_fd;
-       int ret = 0;
 
-       pthread_mutex_lock(&kconsumerd_data.lock);
-       /* Check if already exist */
-       ret = kconsumerd_find_session_fd(buf->fd);
-       if (ret == 1) {
+       tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
+       if (tmp_fd == NULL) {
+               perror("malloc struct lttng_kconsumerd_fd");
                goto end;
        }
 
-       tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
        tmp_fd->sessiond_fd = buf->fd;
        tmp_fd->consumerd_fd = consumerd_fd;
        tmp_fd->state = buf->state;
@@ -152,42 +150,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
        tmp_fd->output = buf->output;
        strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
        tmp_fd->path_name[PATH_MAX - 1] = '\0';
+       DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
+                       tmp_fd->path_name, tmp_fd->sessiond_fd,
+                       tmp_fd->consumerd_fd, tmp_fd->out_fd);
 
-       /* Opening the tracefile in write mode */
-       if (tmp_fd->path_name != NULL) {
-               ret = open(tmp_fd->path_name,
-                               O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
-               if (ret < 0) {
-                       ERR("Opening %s", tmp_fd->path_name);
-                       perror("open");
-                       goto end;
-               }
-               tmp_fd->out_fd = ret;
-               DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
-                               tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
-       }
+end:
+       return tmp_fd;
+}
 
-       if (tmp_fd->output == LTTNG_EVENT_MMAP) {
-               /* get the len of the mmap region */
-               ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len);
-               if (ret != 0) {
-                       ret = errno;
-                       perror("kernctl_get_mmap_len");
-                       goto end;
-               }
+/*
+ * Add a fd to the global list protected by a mutex.
+ */
+static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd)
+{
+       int ret;
 
-               tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len,
-                               PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0);
-               if (tmp_fd->mmap_base == MAP_FAILED) {
-                       perror("Error mmaping");
-                       ret = -1;
-                       goto end;
-               }
+       pthread_mutex_lock(&kconsumerd_data.lock);
+       /* Check if already exist */
+       ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd);
+       if (ret == 1) {
+               goto end;
        }
-
        cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
        kconsumerd_data.fds_count++;
        kconsumerd_data.need_update = 1;
+
 end:
        pthread_mutex_unlock(&kconsumerd_data.lock);
        return ret;
@@ -258,11 +245,16 @@ static int kconsumerd_consumerd_recv_fd(
                enum lttng_kconsumerd_command cmd_type)
 {
        struct iovec iov[1];
-       int ret = 0, i, tmp2;
+       int ret = 0, i, j, tmp2;
        struct cmsghdr *cmsg;
        int nb_fd;
        char recv_fd[CMSG_SPACE(sizeof(int))];
        struct lttcomm_kconsumerd_msg lkm;
+       struct lttng_kconsumerd_fd *new_fd;
+       union {
+               unsigned char vc[4];
+               int vi;
+       } tmp;
 
        /* the number of fds we are about to receive */
        nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
@@ -310,17 +302,37 @@ static int kconsumerd_consumerd_recv_fd(
                if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
                        switch (cmd_type) {
                                case ADD_STREAM:
-                                       DBG("kconsumerd_add_fd %s (%d)", lkm.path_name,
-                                                       ((int *) CMSG_DATA(cmsg))[0]);
-
-                                       ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
-                                       if (ret < 0) {
+                                       for (j = 0; j < sizeof(int); j++)
+                                               tmp.vc[j] = CMSG_DATA(cmsg)[j];
+                                       DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, tmp.vi);
+                                       new_fd = kconsumerd_allocate_fd(&lkm, tmp.vi);
+                                       if (new_fd == NULL) {
                                                lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
                                                goto end;
                                        }
+
+                                       if (ctx->on_recv_fd != NULL) {
+                                               ret = ctx->on_recv_fd(new_fd);
+                                               if (ret == 0) {
+                                                       kconsumerd_add_fd(new_fd);
+                                               } else if (ret < 0) {
+                                                       goto end;
+                                               }
+                                       } else {
+                                               kconsumerd_add_fd(new_fd);
+                                       }
                                        break;
                                case UPDATE_STREAM:
-                                       kconsumerd_change_fd_state(lkm.fd, lkm.state);
+                                       if (ctx->on_update_fd != NULL) {
+                                               ret = ctx->on_update_fd(lkm.fd, lkm.state);
+                                               if (ret == 0) {
+                                                       kconsumerd_change_fd_state(lkm.fd, lkm.state);
+                                               } else if (ret < 0) {
+                                                       goto end;
+                                               }
+                                       } else {
+                                               kconsumerd_change_fd_state(lkm.fd, lkm.state);
+                                       }
                                        break;
                                default:
                                        break;
@@ -361,6 +373,42 @@ void lttng_kconsumerd_set_command_sock_path(
        ctx->kconsumerd_command_sock_path = sock;
 }
 
+static void lttng_kconsumerd_sync_trace_file(
+               struct lttng_kconsumerd_fd *kconsumerd_fd, off_t orig_offset)
+{
+       int outfd = kconsumerd_fd->out_fd;
+       /*
+        * This does a blocking write-and-wait on any page that belongs to the
+        * subbuffer prior to the one we just wrote.
+        * Don't care about error values, as these are just hints and ways to
+        * limit the amount of page cache used.
+        */
+       if (orig_offset >= kconsumerd_fd->max_sb_size) {
+               sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
+                               kconsumerd_fd->max_sb_size,
+                               SYNC_FILE_RANGE_WAIT_BEFORE
+                               | SYNC_FILE_RANGE_WRITE
+                               | SYNC_FILE_RANGE_WAIT_AFTER);
+               /*
+                * Give hints to the kernel about how we access the file:
+                * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
+                * we write it.
+                *
+                * We need to call fadvise again after the file grows because the
+                * kernel does not seem to apply fadvise to non-existing parts of the
+                * file.
+                *
+                * Call fadvise _after_ having waited for the page writeback to
+                * complete because the dirty page writeback semantic is not well
+                * defined. So it can be expected to lead to lower throughput in
+                * streaming.
+                */
+               posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
+                               kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
+       }
+}
+
+
 /*
  * Mmap the ring buffer, read it and write the data to the tracefile.
  *
@@ -371,7 +419,6 @@ int lttng_kconsumerd_on_read_subbuffer_mmap(
                struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len)
 {
        unsigned long mmap_offset;
-       char *padding = NULL;
        long ret = 0;
        off_t orig_offset = kconsumerd_fd->out_fd_offset;
        int fd = kconsumerd_fd->consumerd_fd;
@@ -400,42 +447,11 @@ int lttng_kconsumerd_on_read_subbuffer_mmap(
                kconsumerd_fd->out_fd_offset += ret;
        }
 
-       /*
-        * This does a blocking write-and-wait on any page that belongs to the
-        * subbuffer prior to the one we just wrote.
-        * Don't care about error values, as these are just hints and ways to
-        * limit the amount of page cache used.
-        */
-       if (orig_offset >= kconsumerd_fd->max_sb_size) {
-               sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
-                               kconsumerd_fd->max_sb_size,
-                               SYNC_FILE_RANGE_WAIT_BEFORE
-                               | SYNC_FILE_RANGE_WRITE
-                               | SYNC_FILE_RANGE_WAIT_AFTER);
+       lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset);
 
-               /*
-                * Give hints to the kernel about how we access the file:
-                * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
-                * we write it.
-                *
-                * We need to call fadvise again after the file grows because the
-                * kernel does not seem to apply fadvise to non-existing parts of the
-                * file.
-                *
-                * Call fadvise _after_ having waited for the page writeback to
-                * complete because the dirty page writeback semantic is not well
-                * defined. So it can be expected to lead to lower throughput in
-                * streaming.
-                */
-               posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
-                               kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
-       }
        goto end;
 
 end:
-       if (padding != NULL) {
-               free(padding);
-       }
        return ret;
 }
 
@@ -474,44 +490,14 @@ int lttng_kconsumerd_on_read_subbuffer_splice(
                        perror("Error in file splice");
                        goto splice_error;
                }
-               if (ret >= len) {
-                       len = 0;
-               }
+               len -= ret;
                /* This won't block, but will start writeout asynchronously */
                sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
                                SYNC_FILE_RANGE_WRITE);
                kconsumerd_fd->out_fd_offset += ret;
        }
+       lttng_kconsumerd_sync_trace_file(kconsumerd_fd, orig_offset);
 
-       /*
-        * This does a blocking write-and-wait on any page that belongs to the
-        * subbuffer prior to the one we just wrote.
-        * Don't care about error values, as these are just hints and ways to
-        * limit the amount of page cache used.
-        */
-       if (orig_offset >= kconsumerd_fd->max_sb_size) {
-               sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
-                               kconsumerd_fd->max_sb_size,
-                               SYNC_FILE_RANGE_WAIT_BEFORE
-                               | SYNC_FILE_RANGE_WRITE
-                               | SYNC_FILE_RANGE_WAIT_AFTER);
-               /*
-                * Give hints to the kernel about how we access the file:
-                * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
-                * we write it.
-                *
-                * We need to call fadvise again after the file grows because the
-                * kernel does not seem to apply fadvise to non-existing parts of the
-                * file.
-                *
-                * Call fadvise _after_ having waited for the page writeback to
-                * complete because the dirty page writeback semantic is not well
-                * defined. So it can be expected to lead to lower throughput in
-                * streaming.
-                */
-               posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
-                               kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
-       }
        goto end;
 
 splice_error:
@@ -535,6 +521,48 @@ end:
        return ret;
 }
 
+/*
+ * Take a snapshot for a specific fd
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumerd_take_snapshot(struct lttng_kconsumerd_local_data *ctx,
+               struct lttng_kconsumerd_fd *kconsumerd_fd)
+{
+       int ret = 0;
+       int infd = kconsumerd_fd->consumerd_fd;
+
+       ret = kernctl_snapshot(infd);
+       if (ret != 0) {
+               ret = errno;
+               perror("Getting sub-buffer snapshot.");
+       }
+
+       return ret;
+}
+
+/*
+ * Get the produced position
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_kconsumerd_get_produced_snapshot(
+               struct lttng_kconsumerd_local_data *ctx,
+               struct lttng_kconsumerd_fd *kconsumerd_fd,
+               unsigned long *pos)
+{
+       int ret;
+       int infd = kconsumerd_fd->consumerd_fd;
+
+       ret = kernctl_snapshot_get_produced(infd, pos);
+       if (ret != 0) {
+               ret = errno;
+               perror("kernctl_snapshot_get_produced");
+       }
+
+       return ret;
+}
+
 /*
  * Poll on the should_quit pipe and the command socket return -1 on error and
  * should exit, 0 if data is available on the command socket
@@ -738,42 +766,64 @@ end:
  * Returns a pointer to the new context or NULL on error.
  */
 struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
-               int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd))
+               int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+               int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+               int (*update_fd)(int sessiond_fd, uint32_t state))
 {
-       int ret;
+       int ret, i;
        struct lttng_kconsumerd_local_data *ctx;
 
        ctx = malloc(sizeof(struct lttng_kconsumerd_local_data));
        if (ctx == NULL) {
                perror("allocating context");
-               goto end;
+               goto error;
        }
 
+       ctx->kconsumerd_error_socket = -1;
+       /* assign the callbacks */
        ctx->on_buffer_ready = buffer_ready;
+       ctx->on_recv_fd = recv_fd;
+       ctx->on_update_fd = update_fd;
 
        ret = pipe(ctx->kconsumerd_poll_pipe);
        if (ret < 0) {
                perror("Error creating poll pipe");
-               ctx = NULL;
-               goto end;
+               goto error_poll_pipe;
        }
 
        ret = pipe(ctx->kconsumerd_should_quit);
        if (ret < 0) {
                perror("Error creating recv pipe");
-               ctx = NULL;
-               goto end;
+               goto error_quit_pipe;
        }
 
        ret = pipe(ctx->kconsumerd_thread_pipe);
        if (ret < 0) {
                perror("Error creating thread pipe");
-               ctx = NULL;
-               goto end;
+               goto error_thread_pipe;
        }
 
-end:
        return ctx;
+
+
+error_thread_pipe:
+       for (i = 0; i < 2; i++) {
+               int err;
+
+               err = close(ctx->kconsumerd_should_quit[i]);
+               assert(!err);
+       }
+error_quit_pipe:
+       for (i = 0; i < 2; i++) {
+               int err;
+
+               err = close(ctx->kconsumerd_poll_pipe[i]);
+               assert(!err);
+       }
+error_poll_pipe:
+       free(ctx);
+error:
+       return NULL;
 }
 
 /*
@@ -824,6 +874,7 @@ void *lttng_kconsumerd_thread_receive_fds(void *data)
 
        DBG("Sending ready command to ltt-sessiond");
        ret = lttng_kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY);
+       /* return < 0 on error, but == 0 is not fatal */
        if (ret < 0) {
                ERR("Error sending ready command to ltt-sessiond");
                goto end;
@@ -952,6 +1003,7 @@ void lttng_kconsumerd_should_exit(struct lttng_kconsumerd_local_data *ctx)
 
 /*
  * Send return code to the session daemon.
+ * If the socket is not defined, we return 0, it is not a fatal error
  */
 int lttng_kconsumerd_send_error(
                struct lttng_kconsumerd_local_data *ctx, int cmd)
This page took 0.028093 seconds and 4 git commands to generate.