Add support for UST application registration
authorDavid Goulet <david.goulet@polymtl.ca>
Tue, 23 Aug 2011 17:56:15 +0000 (13:56 -0400)
committerDavid Goulet <david.goulet@polymtl.ca>
Thu, 25 Aug 2011 21:21:20 +0000 (17:21 -0400)
This is a big commit ;)

Introduce two new threads and one almost rewritten. The first thread
added is the one that manage UST registration. This thread is *very*
lightweight and, basically, receive the registration request, queue it
in a lock free queue and wake up the dispatch thread (second new thread)
using a N wakers / 1 waiters futex scheme.

This dispatch thread will then dequeue and notify the application thread
(that manages application registration and monitor UST sockets). The
notification consist of sending the dequeued node (command) to the
application thread pipe where the application thread is blocked on
poll().

For now, the registration is handle meaning that the application will
get added to the session daemon internal data structures, the daemon
will send a REGISTER_DONE command to the application and, finally, end
the communication by waiting for the UST reply of the previous command.

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <david.goulet@polymtl.ca>
12 files changed:
configure.ac
include/lttng-sessiond-comm.h
include/lttng-ust.h
ltt-sessiond/Makefile.am
ltt-sessiond/futex.c [new file with mode: 0644]
ltt-sessiond/futex.h [new file with mode: 0644]
ltt-sessiond/ltt-sessiond.h
ltt-sessiond/main.c
ltt-sessiond/traceable-app.c
ltt-sessiond/traceable-app.h
ltt-sessiond/ust-comm.c [new file with mode: 0644]
ltt-sessiond/ust-comm.h [new file with mode: 0644]

index 066c4e20e3cc5ed293ca631428f86683c4294a78..02698885375515674b0ee2f68795dbd5dd67c05f 100644 (file)
@@ -22,11 +22,16 @@ AC_CHECK_LIB([popt], [poptGetContext], [],
        [AC_MSG_ERROR([Cannot find libpopt. Use [LDFLAGS]=-Ldir to specify its location.])]
 )
 
-# Check liburcu
+# Check liburcu list.h
 AC_CHECK_DECL([cds_list_add], [],
        [AC_MSG_ERROR([liburcu 0.5.4 or newer is needed])], [[#include <urcu/list.h>]]
 )
 
+# Check liburcu wfqueue.h
+AC_CHECK_DECL([cds_wfq_init], [],
+       [AC_MSG_ERROR([liburcu 0.5.4 or newer is needed])], [[#include <urcu/wfqueue.h>]]
+)
+
 AC_PROG_CC
 AC_PROG_LIBTOOL
 
index 33af994a5d3fb0799413a0985c0415b1aba4498c..4deec5fc8d4fd00163fb1ee110a41e26ef523997 100644 (file)
@@ -207,7 +207,6 @@ struct lttcomm_kconsumerd_msg {
  * Data structure for the commands sent from sessiond to UST.
  */
 struct lttcomm_ust_msg {
-       uint32_t cmd_type;    /* enum lttcomm_ust_command */
        uint32_t handle;
        uint32_t cmd;
        union {
index 4a4564e6d5afcbc4b6d55ef016593f36862f25bb..284b24cbc13907b37d5b1604e907e0043f6f2b6a 100644 (file)
@@ -76,6 +76,7 @@ struct lttng_ust_context {
        _UST_CMDR(0x41, struct lttng_ust_tracer_version)
 #define LTTNG_UST_TRACEPOINT_LIST       _UST_CMD(0x42)
 #define LTTNG_UST_WAIT_QUIESCENT        _UST_CMD(0x43)
+#define LTTNG_UST_REGISTER_DONE         _UST_CMD(0x44)
 
 /* Session FD ioctl */
 #define LTTNG_UST_METADATA             \
@@ -98,4 +99,6 @@ struct lttng_ust_context {
 #define LTTNG_UST_ENABLE               _UST_CMD(0x80)
 #define LTTNG_UST_DISABLE              _UST_CMD(0x81)
 
+#define LTTNG_UST_ROOT_HANDLE          0
+
 #endif /* _LTTNG_UST_H */
index 80c6b81e287a3135aea47cddab3029f392b866e3..2f65a2fb87f96561e310c6af7572b9aee6471bb7 100644 (file)
@@ -6,9 +6,10 @@ AM_CFLAGS = -fno-strict-aliasing
 bin_PROGRAMS = ltt-sessiond
 
 ltt_sessiond_SOURCES = utils.c trace.c session.c traceable-app.c ust-ctl.c \
-                       kernel-ctl.c context.c main.c \
+                       kernel-ctl.c context.c futex.c futex.h \
                        utils.h trace.h session.h traceable-app.h ust-ctl.h \
-                       context.h kernel-ctl.h ltt-sessiond.h
+                       ust-comm.c ust-comm.h \
+                       context.h kernel-ctl.h ltt-sessiond.h main.c
 
 # link on liblttngctl for check if sessiond is already alive.
 ltt_sessiond_LDADD = \
diff --git a/ltt-sessiond/futex.c b/ltt-sessiond/futex.c
new file mode 100644 (file)
index 0000000..0c333fc
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (C)  2011 - David Goulet <david.goulet@polymtl.ca>
+ *                       Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#define _GNU_SOURCE
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <urcu.h>
+#include <urcu/futex.h>
+
+#include <lttngerr.h>
+
+#include "futex.h"
+
+/*
+ * This futex wait/wake scheme only works for N wakers / 1 waiters. Hence the
+ * "nto1" added to all function signature.
+ *
+ * Please see wait_gp()/update_counter_and_wait() calls in urcu.c in the urcu
+ * git tree for a detail example of this scheme being used. futex_async() is
+ * the urcu wrapper over the futex() sycall.
+ *
+ * There is also a formal verification available in the git tree.
+ *
+ *   branch: formal-model
+ *   commit id: 2a8044f3493046fcc8c67016902dc7beec6f026a
+ *
+ * Ref: git://git.lttng.org/userspace-rcu.git
+ */
+
+/*
+ * Prepare futex.
+ */
+void futex_nto1_prepare(int32_t *futex)
+{
+       uatomic_set(futex, -1);
+       cmm_smp_mb();
+
+       DBG("Futex n to 1 prepare done");
+}
+
+/*
+ * Wait futex.
+ */
+void futex_nto1_wait(int32_t *futex)
+{
+       cmm_smp_mb();
+
+       if (uatomic_read(futex) == -1) {
+               futex_async(futex, FUTEX_WAIT, -1, NULL, NULL, 0);
+       }
+
+       DBG("Futex n to 1 wait done");
+}
+
+/*
+ * Wake 1 futex.
+ */
+void futex_nto1_wake(int32_t *futex)
+{
+       if (unlikely(uatomic_read(futex) == -1)) {
+               uatomic_set(futex, 0);
+               futex_async(futex, FUTEX_WAKE, 1, NULL, NULL, 0);
+       }
+
+       DBG("Futex n to 1 wake done");
+}
diff --git a/ltt-sessiond/futex.h b/ltt-sessiond/futex.h
new file mode 100644 (file)
index 0000000..ebda1e0
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
+ *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the Free
+ * Software Foundation; only version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#ifndef _LTT_FUTEX_H
+#define _LTT_FUTEX_H
+
+void futex_nto1_prepare(int32_t *futex);
+void futex_nto1_wait(int32_t *futex);
+void futex_nto1_wake(int32_t *futex);
+
+#endif /* _LTT_FUTEX_H */
index 2b09a06baab9eb6cdc05be21b2bda0d2cfa85b43..cc6a6ba155ced9e26b6cec7b5f9f4e24a54bb968 100644 (file)
 #ifndef _LTT_SESSIOND_H
 #define _LTT_SESSIOND_H
 
+#define _LGPL_SOURCE
+#include <urcu/wfqueue.h>
+
+#include "traceable-app.h"
+
 #define DEFAULT_HOME_DIR            "/tmp"
 #define DEFAULT_UST_SOCK_DIR        DEFAULT_HOME_DIR "/ust-app-socks"
 #define DEFAULT_GLOBAL_APPS_PIPE    DEFAULT_UST_SOCK_DIR "/global"
@@ -68,4 +73,19 @@ struct command_ctx {
        struct lttcomm_session_msg *lsm;
 };
 
+struct ust_command {
+       int sock;
+       struct ust_register_msg reg_msg;
+       struct cds_wfq_node node;
+};
+
+/*
+ * Queue used to enqueue UST registration request (ust_commant) and protected
+ * by a futex with a scheme N wakers / 1 waiters. See futex.c/.h
+ */
+struct ust_cmd_queue {
+       int32_t futex;
+       struct cds_wfq_queue queue;
+};
+
 #endif /* _LTT_SESSIOND_H */
index 66ea028dc4a0e5b30c5c86f81506485cb4a30f82..b636986f832d7352b572c5a08fbcbaf92ceadfc7 100644 (file)
 #include <lttng/lttng-kconsumerd.h>
 #include <lttngerr.h>
 
+#include "context.h"
+#include "futex.h"
 #include "kernel-ctl.h"
 #include "ltt-sessiond.h"
 #include "traceable-app.h"
 #include "ust-ctl.h"
 #include "utils.h"
+#include "ust-comm.h"
 
 /* Const values */
 const char default_home_dir[] = DEFAULT_HOME_DIR;
@@ -66,6 +69,7 @@ static int is_root;                   /* Set to 1 if the daemon is running as root */
 static pid_t ppid;          /* Parent PID for --sig-parent option */
 static pid_t kconsumerd_pid;
 static struct pollfd *kernel_pollfd;
+static int dispatch_thread_exit;
 
 static char apps_unix_sock_path[PATH_MAX];                             /* Global application Unix socket path */
 static char client_unix_sock_path[PATH_MAX];                   /* Global client Unix socket path */
@@ -86,17 +90,34 @@ static int kernel_poll_pipe[2];
  */
 static int thread_quit_pipe[2];
 
+/*
+ * This pipe is used to inform the thread managing application communication
+ * that a command is queued and ready to be processed.
+ */
+static int apps_cmd_pipe[2];
+
 /* Pthread, Mutexes and Semaphores */
 static pthread_t kconsumerd_thread;
 static pthread_t apps_thread;
+static pthread_t reg_apps_thread;
 static pthread_t client_thread;
 static pthread_t kernel_thread;
+static pthread_t dispatch_thread;
 static sem_t kconsumerd_sem;
 
 static pthread_mutex_t kconsumerd_pid_mutex;   /* Mutex to control kconsumerd pid assignation */
 
 static int modprobe_remove_kernel_modules(void);
 
+/*
+ * UST registration command queue. This queue is tied with a futex and uses a N
+ * wakers / 1 waiter implemented and detailed in futex.c/.h
+ *
+ * The thread_manage_apps and thread_dispatch_ust_registration interact with
+ * this queue and the wait/wake scheme.
+ */
+static struct ust_cmd_queue ust_cmd_queue;
+
 /*
  * Pointer initialized before thread creation.
  *
@@ -167,12 +188,18 @@ static void teardown_kernel_session(struct ltt_session *session)
        }
 }
 
+/*
+ * Stop all threads by closing the thread quit pipe.
+ */
 static void stop_threads(void)
 {
        /* Stopping all threads */
        DBG("Terminating all threads");
        close(thread_quit_pipe[0]);
        close(thread_quit_pipe[1]);
+       /* Dispatch thread */
+       dispatch_thread_exit = 1;
+       futex_nto1_wake(&ust_cmd_queue.futex);
 }
 
 /*
@@ -217,6 +244,9 @@ static void cleanup(void)
                }
        }
 
+       DBG("Closing all UST sockets");
+       clean_traceable_apps_list();
+
        pthread_mutex_destroy(&kconsumerd_pid_mutex);
 
        DBG("Closing kernel fd");
@@ -401,7 +431,6 @@ static int ust_connect_app(pid_t pid)
 
        return sock;
 }
-#endif /* DISABLED */
 
 /*
  * Notify apps by writing 42 to a named pipe using name. Every applications
@@ -432,6 +461,7 @@ static int notify_apps(const char *name)
 error:
        return ret;
 }
+#endif /* DISABLED */
 
 /*
  * Setup the outgoing data buffer for the response (llm) by allocating the
@@ -794,24 +824,232 @@ error:
 }
 
 /*
- *     This thread manage the application socket communication
+ * Reallocate the apps command pollfd structure of nb_fd size.
+ *
+ * The first two fds must be there at all time.
+ */
+static int update_apps_cmd_pollfd(unsigned int nb_fd, struct pollfd **pollfd)
+{
+       /* Can't accept pollfd less than 2 */
+       if (nb_fd < 2) {
+               goto end;
+       }
+
+       *pollfd = realloc(*pollfd, nb_fd * sizeof(struct pollfd));
+       if (*pollfd == NULL) {
+               perror("realloc manage apps pollfd");
+               goto error;
+       }
+
+       /* First fd is always the quit pipe */
+       (*pollfd)[0].fd = thread_quit_pipe[0];
+       /* Apps command pipe */
+       (*pollfd)[1].fd = apps_cmd_pipe[0];
+       (*pollfd)[1].events = POLLIN;
+
+       DBG("Apps cmd pollfd realloc of size %d", nb_fd);
+
+end:
+       return 0;
+
+error:
+       return -1;
+}
+
+/*
+ * Send registration done packet to the application.
+ */
+static int send_ust_register_done(int sock)
+{
+       struct lttcomm_ust_msg lum;
+
+       DBG("Sending register done command to %d", sock);
+
+       lum.cmd = LTTNG_UST_REGISTER_DONE;
+       lum.handle = LTTNG_UST_ROOT_HANDLE;
+
+       return ustcomm_send_command(sock, &lum);
+}
+
+/*
+ * This thread manage application communication.
  */
 static void *thread_manage_apps(void *data)
+{
+       int i, ret, count;
+       unsigned int nb_fd = 2;
+       int update_poll_flag = 1;
+       struct pollfd *pollfd = NULL;
+       struct ust_command ust_cmd;
+
+       DBG("[thread] Manage application started");
+
+       ust_cmd.sock = -1;
+
+       while (1) {
+               /* See if we have a valid socket to add to pollfd */
+               if (ust_cmd.sock != -1) {
+                       nb_fd++;
+                       update_poll_flag = 1;
+               }
+
+               /* The pollfd struct must be updated */
+               if (update_poll_flag) {
+                       ret = update_apps_cmd_pollfd(nb_fd, &pollfd);
+                       if (ret < 0) {
+                               /* malloc failed so we quit */
+                               goto error;
+                       }
+                       if (ust_cmd.sock != -1) {
+                               /* Update pollfd with the new UST socket */
+                               DBG("Adding sock %d to apps cmd pollfd", ust_cmd.sock);
+                               pollfd[nb_fd - 1].fd = ust_cmd.sock;
+                               pollfd[nb_fd - 1].events = POLLHUP | POLLNVAL;
+                               ust_cmd.sock = -1;
+                       }
+               }
+
+               DBG("Apps thread polling on %d fds", nb_fd);
+
+               /* Inifinite blocking call, waiting for transmission */
+               ret = poll(pollfd, nb_fd, -1);
+               if (ret < 0) {
+                       perror("poll apps thread");
+                       goto error;
+               }
+
+               /* Thread quit pipe has been closed. Killing thread. */
+               if (pollfd[0].revents == POLLNVAL) {
+                       goto error;
+               } else if (pollfd[1].revents == POLLERR) {
+                       ERR("Apps command pipe poll error");
+                       goto error;
+               } else if (pollfd[1].revents == POLLIN) {
+                       /* Empty pipe */
+                       ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd));
+                       if (ret < 0 || ret < sizeof(ust_cmd)) {
+                               perror("read apps cmd pipe");
+                               goto error;
+                       }
+
+                       /* Register applicaton to the session daemon */
+                       ret = register_traceable_app(&ust_cmd.reg_msg, ust_cmd.sock);
+                       if (ret < 0) {
+                               /* Only critical ENOMEM error can be returned here */
+                               goto error;
+                       }
+
+                       ret = send_ust_register_done(ust_cmd.sock);
+                       if (ret < 0) {
+                               /*
+                                * If the registration is not possible, we simply unregister
+                                * the apps and continue
+                                */
+                               unregister_traceable_app(ust_cmd.sock);
+                       }
+               }
+
+               count = nb_fd;
+               for (i = 2; i < count; i++) {
+                       /* Apps socket is closed/hungup */
+                       switch (pollfd[i].revents) {
+                       case POLLNVAL:
+                       case POLLHUP:
+                               /* Pipe closed */
+                               unregister_traceable_app(pollfd[i].fd);
+                               nb_fd--;
+                       }
+               }
+
+               if (nb_fd != count) {
+                       update_poll_flag = 1;
+               }
+       }
+
+error:
+       DBG("Application communication apps dying");
+       close(apps_cmd_pipe[0]);
+       close(apps_cmd_pipe[1]);
+
+       free(pollfd);
+
+       return NULL;
+}
+
+/*
+ * Dispatch request from the registration threads to the application
+ * communication thread.
+ */
+static void *thread_dispatch_ust_registration(void *data)
+{
+       int ret;
+       struct cds_wfq_node *node;
+       struct ust_command *ust_cmd = NULL;
+
+       DBG("[thread] Dispatch UST command started");
+
+       while (!dispatch_thread_exit) {
+               /* Atomically prepare the queue futex */
+               futex_nto1_prepare(&ust_cmd_queue.futex);
+
+               do {
+                       /* Dequeue command for registration */
+                       node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue);
+                       if (node == NULL) {
+                               DBG("Waked up but nothing in the UST command queue");
+                               /* Continue thread execution */
+                               break;
+                       }
+
+                       ust_cmd = caa_container_of(node, struct ust_command, node);
+
+                       DBG("Dispatching UST registration pid:%d sock:%d",
+                                       ust_cmd->reg_msg.pid, ust_cmd->sock);
+                       /*
+                        * Inform apps thread of the new application registration. This
+                        * call is blocking so we can be assured that the data will be read
+                        * at some point in time or wait to the end of the world :)
+                        */
+                       ret = write(apps_cmd_pipe[1], ust_cmd,
+                                       sizeof(struct ust_command));
+                       if (ret < 0) {
+                               perror("write apps cmd pipe");
+                               if (errno == EBADF) {
+                                       /*
+                                        * We can't inform the application thread to process
+                                        * registration. We will exit or else application
+                                        * registration will not occur and tracing will never
+                                        * start.
+                                        */
+                                       goto error;
+                               }
+                       }
+                       free(ust_cmd);
+               } while (node != NULL);
+
+               /* Futex wait on queue. Blocking call on futex() */
+               futex_nto1_wait(&ust_cmd_queue.futex);
+       }
+
+error:
+       DBG("Dispatch thread dying");
+       return NULL;
+}
+
+/*
+ * This thread manage application registration.
+ */
+static void *thread_registration_apps(void *data)
 {
        int sock = 0, ret;
        struct pollfd pollfd[2];
+       /*
+        * Get allocated in this thread, enqueued to a global queue, dequeued and
+        * freed in the manage apps thread.
+        */
+       struct ust_command *ust_cmd = NULL;
 
-       /* TODO: Something more elegant is needed but fine for now */
-       /* FIXME: change all types to either uint8_t, uint32_t, uint64_t
-        * for 32-bit vs 64-bit compat processes. */
-       /* replicate in ust with version number */
-       struct {
-               int reg;        /* 1:register, 0:unregister */
-               pid_t pid;
-               uid_t uid;
-       } reg_msg;
-
-       DBG("[thread] Manage apps started");
+       DBG("[thread] Manage application registration started");
 
        ret = lttcomm_listen_unix_sock(apps_sock);
        if (ret < 0) {
@@ -826,7 +1064,7 @@ static void *thread_manage_apps(void *data)
        pollfd[1].events = POLLIN;
 
        /* Notify all applications to register */
-       notify_apps(default_global_apps_pipe);
+       //notify_apps(default_global_apps_pipe);
 
        while (1) {
                DBG("Accepting application registration");
@@ -834,7 +1072,7 @@ static void *thread_manage_apps(void *data)
                /* Inifinite blocking call, waiting for transmission */
                ret = poll(pollfd, 2, -1);
                if (ret < 0) {
-                       perror("poll apps thread");
+                       perror("poll register apps thread");
                        goto error;
                }
 
@@ -842,7 +1080,7 @@ static void *thread_manage_apps(void *data)
                if (pollfd[0].revents == POLLNVAL) {
                        goto error;
                } else if (pollfd[1].revents == POLLERR) {
-                       ERR("Apps socket poll error");
+                       ERR("Register apps socket poll error");
                        goto error;
                }
 
@@ -851,39 +1089,46 @@ static void *thread_manage_apps(void *data)
                        goto error;
                }
 
+               /* Create UST registration command for enqueuing */
+               ust_cmd = malloc(sizeof(struct ust_command));
+               if (ust_cmd == NULL) {
+                       perror("ust command malloc");
+                       goto error;
+               }
+
                /*
-                * Using message-based transmissions to ensure we don't
-                * have to deal with partially received messages.
+                * Using message-based transmissions to ensure we don't have to deal
+                * with partially received messages.
                 */
-               ret = lttcomm_recv_unix_sock(sock, &reg_msg, sizeof(reg_msg));
-               if (ret < 0) {
-                       perror("recv");
+               ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg,
+                               sizeof(struct ust_register_msg));
+               if (ret < 0 || ret != sizeof(struct ust_register_msg)) {
+                       perror("lttcomm_recv_unix_sock register apps");
+                       free(ust_cmd);
+                       close(sock);
                        continue;
                }
 
-               /* Add application to the global traceable list */
-               if (reg_msg.reg == 1) {
-                       /* Registering */
-                       /*
-                        * TODO: socket should be either passed to a
-                        * listener thread (for more messages) or
-                        * closed. It currently leaks.
-                        */
-                       ret = register_traceable_app(reg_msg.pid, reg_msg.uid);
-                       if (ret < 0) {
-                               /* register_traceable_app only return an error with
-                                * ENOMEM. At this point, we better stop everything.
-                                */
-                               goto error;
-                       }
-               } else {
-                       /* Unregistering */
-                       unregister_traceable_app(reg_msg.pid);
-               }
+               ust_cmd->sock = sock;
+
+               /*
+                * Lock free enqueue the registration request.
+                * The red pill has been taken! This apps will be part of the *system*
+                */
+               cds_wfq_enqueue(&ust_cmd_queue.queue, &ust_cmd->node);
+
+               /*
+                * Wake the registration queue futex.
+                * Implicit memory barrier with the exchange in cds_wfq_enqueue.
+                */
+               futex_nto1_wake(&ust_cmd_queue.futex);
+
+               DBG("Thread manage apps informed of queued node with sock:%d pid:%d",
+                               sock, ust_cmd->reg_msg.pid);
        }
 
 error:
-       DBG("Apps thread dying");
+       DBG("Register apps thread dying");
        if (apps_sock) {
                close(apps_sock);
        }
@@ -2579,13 +2824,15 @@ end:
 static int check_existing_daemon(void)
 {
        if (access(client_unix_sock_path, F_OK) < 0 &&
-           access(apps_unix_sock_path, F_OK) < 0)
+                       access(apps_unix_sock_path, F_OK) < 0) {
                return 0;
+       }
        /* Is there anybody out there ? */
-       if (lttng_session_daemon_alive())
+       if (lttng_session_daemon_alive()) {
                return -EEXIST;
-       else
+       } else {
                return 0;
+       }
 }
 
 /*
@@ -2646,6 +2893,14 @@ static int create_kernel_poll_pipe(void)
        return pipe2(kernel_poll_pipe, O_CLOEXEC);
 }
 
+/*
+ * Create the application command pipe to wake thread_manage_apps.
+ */
+static int create_apps_cmd_pipe(void)
+{
+       return pipe2(apps_cmd_pipe, O_CLOEXEC);
+}
+
 /*
  * Create the lttng run directory needed for all global sockets and pipe.
  */
@@ -2922,6 +3177,14 @@ int main(int argc, char **argv)
                goto exit;
        }
 
+       /* Setup the thread apps communication pipe. */
+       if ((ret = create_apps_cmd_pipe()) < 0) {
+               goto exit;
+       }
+
+       /* Init UST command queue. */
+       cds_wfq_init(&ust_cmd_queue.queue);
+
        /*
         * Get session list pointer. This pointer MUST NOT be free().
         * This list is statically declared in session.c
@@ -2929,23 +3192,40 @@ int main(int argc, char **argv)
        session_list_ptr = get_session_list();
 
        /* Create thread to manage the client socket */
-       ret = pthread_create(&client_thread, NULL, thread_manage_clients, (void *) NULL);
+       ret = pthread_create(&client_thread, NULL,
+                       thread_manage_clients, (void *) NULL);
        if (ret != 0) {
-               perror("pthread_create");
+               perror("pthread_create clients");
                goto exit_client;
        }
 
+       /* Create thread to dispatch registration */
+       ret = pthread_create(&dispatch_thread, NULL,
+                       thread_dispatch_ust_registration, (void *) NULL);
+       if (ret != 0) {
+               perror("pthread_create dispatch");
+               goto exit_dispatch;
+       }
+
+       /* Create thread to manage application registration. */
+       ret = pthread_create(&reg_apps_thread, NULL,
+                       thread_registration_apps, (void *) NULL);
+       if (ret != 0) {
+               perror("pthread_create registration");
+               goto exit_reg_apps;
+       }
+
        /* Create thread to manage application socket */
        ret = pthread_create(&apps_thread, NULL, thread_manage_apps, (void *) NULL);
        if (ret != 0) {
-               perror("pthread_create");
+               perror("pthread_create apps");
                goto exit_apps;
        }
 
        /* Create kernel thread to manage kernel event */
        ret = pthread_create(&kernel_thread, NULL, thread_manage_kernel, (void *) NULL);
        if (ret != 0) {
-               perror("pthread_create");
+               perror("pthread_create kernel");
                goto exit_kernel;
        }
 
@@ -2963,6 +3243,20 @@ exit_kernel:
        }
 
 exit_apps:
+       ret = pthread_join(reg_apps_thread, &status);
+       if (ret != 0) {
+               perror("pthread_join");
+               goto error;     /* join error, exit without cleanup */
+       }
+
+exit_reg_apps:
+       ret = pthread_join(dispatch_thread, &status);
+       if (ret != 0) {
+               perror("pthread_join");
+               goto error;     /* join error, exit without cleanup */
+       }
+
+exit_dispatch:
        ret = pthread_join(client_thread, &status);
        if (ret != 0) {
                perror("pthread_join");
index ad4212e7dec8e5df5d49f82ef4d8834f0eb4788e..85d50393a6b9eaaa60eeb17bb963370656d9708a 100644 (file)
 #include <pthread.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
 
 #include <lttngerr.h>
 
 #include "traceable-app.h"
 
-/* Number of element for the list below. */
-static unsigned int traceable_app_count;
-
 /* Init ust traceabl application's list */
-struct ltt_traceable_app_list ltt_traceable_app_list = {
+static struct ltt_traceable_app_list ltt_traceable_app_list = {
        .head = CDS_LIST_HEAD_INIT(ltt_traceable_app_list.head),
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+       .count = 0,
 };
 
-/* List mutex */
-pthread_mutex_t ltt_traceable_app_list_mutex;
-
-/* Internal function */
-static void add_traceable_app(struct ltt_traceable_app *lta);
-static void del_traceable_app(struct ltt_traceable_app *lta);
-
 /*
- * Add a traceable application structure to the global list protected by a
- * mutex.
+ * Add a traceable application structure to the global list.
  */
 static void add_traceable_app(struct ltt_traceable_app *lta)
 {
-       pthread_mutex_lock(&ltt_traceable_app_list_mutex);
        cds_list_add(&lta->list, &ltt_traceable_app_list.head);
-       traceable_app_count++;
-       pthread_mutex_unlock(&ltt_traceable_app_list_mutex);
+       ltt_traceable_app_list.count++;
 }
 
 /*
- * Delete a traceable application structure from the global list protected by a
- * mutex.
+ * Delete a traceable application structure from the global list.
  */
 static void del_traceable_app(struct ltt_traceable_app *lta)
 {
-       pthread_mutex_lock(&ltt_traceable_app_list_mutex);
        cds_list_del(&lta->list);
        /* Sanity check */
-       if (traceable_app_count != 0) {
-               traceable_app_count--;
+       if (ltt_traceable_app_list.count > 0) {
+               ltt_traceable_app_list.count--;
+       }
+}
+
+/*
+ * Return pointer to traceable apps list.
+ */
+struct ltt_traceable_app_list *get_traceable_apps_list(void)
+{
+       return &ltt_traceable_app_list;
+}
+
+/*
+ * Acquire traceable apps list lock.
+ */
+void lock_apps_list(void)
+{
+       pthread_mutex_lock(&ltt_traceable_app_list.lock);
+}
+
+/*
+ * Release traceable apps list lock.
+ */
+void unlock_apps_list(void)
+{
+       pthread_mutex_unlock(&ltt_traceable_app_list.lock);
+}
+
+/*
+ * Iterate over the traceable apps list and return a pointer or NULL if not
+ * found.
+ */
+static struct ltt_traceable_app *find_app_by_sock(int sock)
+{
+       struct ltt_traceable_app *iter;
+
+       cds_list_for_each_entry(iter, &ltt_traceable_app_list.head, list) {
+               if (iter->sock == sock) {
+                       /* Found */
+                       return iter;
+               }
        }
-       pthread_mutex_unlock(&ltt_traceable_app_list_mutex);
+
+       return NULL;
 }
 
 /*
@@ -74,7 +104,7 @@ static void del_traceable_app(struct ltt_traceable_app *lta)
  *
  * On success, return 0, else return malloc ENOMEM.
  */
-int register_traceable_app(pid_t pid, uid_t uid)
+int register_traceable_app(struct ust_register_msg *msg, int sock)
 {
        struct ltt_traceable_app *lta;
 
@@ -84,10 +114,23 @@ int register_traceable_app(pid_t pid, uid_t uid)
                return -ENOMEM;
        }
 
-       lta->uid = uid;
-       lta->pid = pid;
+       lta->uid = msg->uid;
+       lta->gid = msg->gid;
+       lta->pid = msg->pid;
+       lta->ppid = msg->ppid;
+       lta->v_major = msg->major;
+       lta->v_minor = msg->minor;
+       lta->sock = sock;
+       strncpy(lta->name, msg->name, sizeof(lta->name));
+       lta->name[16] = '\0';
+
+       lock_apps_list();
        add_traceable_app(lta);
-       DBG("Application %d registered with UID %d", pid, uid);
+       unlock_apps_list();
+
+       DBG("App registered with pid:%d ppid:%d uid:%d gid:%d sock:%d name:%s"
+                       " (version %d.%d)", lta->pid, lta->ppid, lta->uid, lta->gid,
+                       lta->sock, lta->name, lta->v_major, lta->v_minor);
 
        return 0;
 }
@@ -95,17 +138,23 @@ int register_traceable_app(pid_t pid, uid_t uid)
 /*
  * Unregister app by removing it from the global traceable app list and freeing
  * the data struct.
+ *
+ * The socket is already closed at this point so no close to sock.
  */
-void unregister_traceable_app(pid_t pid)
+void unregister_traceable_app(int sock)
 {
        struct ltt_traceable_app *lta;
 
-       lta = find_app_by_pid(pid);
-       if (lta != NULL) {
+       lock_apps_list();
+       lta = find_app_by_sock(sock);
+       if (lta) {
+               DBG("PID %d unregistered with sock %d", lta->pid, sock);
+               close(lta->sock);
                del_traceable_app(lta);
+               unlock_apps_list();
                free(lta);
-               DBG("PID %d unregistered", pid);
        }
+       unlock_apps_list();
 }
 
 /*
@@ -113,45 +162,28 @@ void unregister_traceable_app(pid_t pid)
  */
 unsigned int get_app_count(void)
 {
-       return traceable_app_count;
-}
-
-/*
- * Iterate over the traceable apps list and return a pointer or NULL if not
- * found.
- */
-struct ltt_traceable_app *find_app_by_pid(pid_t pid)
-{
-       struct ltt_traceable_app *iter;
+       unsigned int count;
 
-       pthread_mutex_lock(&ltt_traceable_app_list_mutex);
-       cds_list_for_each_entry(iter, &ltt_traceable_app_list.head, list) {
-               if (iter->pid == pid) {
-                       pthread_mutex_unlock(&ltt_traceable_app_list_mutex);
-                       /* Found */
-                       return iter;
-               }
-       }
-       pthread_mutex_unlock(&ltt_traceable_app_list_mutex);
+       lock_apps_list();
+       count = ltt_traceable_app_list.count;
+       unlock_apps_list();
 
-       return NULL;
+       return count;
 }
 
 /*
- * List traceable user-space application and fill an array of pids.
+ * Free and clean all traceable apps of the global list.
  */
-void get_app_list_pids(pid_t *pids)
+void clean_traceable_apps_list(void)
 {
-       int i = 0;
-       struct ltt_traceable_app *iter;
+       struct ltt_traceable_app *iter, *tmp;
 
-       /* Protected by a mutex here because the threads manage_client
-        * and manage_apps can access this list.
+       /*
+        * Don't acquire list lock here. This function should be called from
+        * cleanup() functions meaning that the program will exit.
         */
-       pthread_mutex_lock(&ltt_traceable_app_list_mutex);
-       cds_list_for_each_entry(iter, &ltt_traceable_app_list.head, list) {
-               pids[i] = iter->pid;
-               i++;
+       cds_list_for_each_entry_safe(iter, tmp, &ltt_traceable_app_list.head, list) {
+               close(iter->sock);
+               free(iter);
        }
-       pthread_mutex_unlock(&ltt_traceable_app_list_mutex);
 }
index b43d1b044f66f93dd21019b8756049c8a92b3e80..4d5e56a66d9763153eb1afb1440ce513abfe34d8 100644 (file)
 #ifndef _TRACEABLE_APP_H 
 #define _TRACEABLE_APP_H
 
+#include <stdint.h>
 #include <urcu/list.h>
 
-/* Traceable application list */
+/*
+ * Application registration data structure.
+ */
+struct ust_register_msg {
+       uint32_t major;
+       uint32_t minor;
+       pid_t pid;
+       pid_t ppid;
+       uid_t uid;
+       gid_t gid;
+       char name[16];
+};
+
+/*
+ * Traceable application list.
+ */
 struct ltt_traceable_app_list {
+       /*
+        * This lock protects any read/write access to the list and count (which is
+        * basically the list size). All public functions in traceable-app.c
+        * acquire this lock and release it before returning. If none of those
+        * functions are used, the lock MUST be acquired in order to iterate or/and
+        * do any actions on that list.
+        */
+       pthread_mutex_t lock;
+
+       /*
+        * Number of element in the list. The session list lock MUST be acquired if
+        * this counter is used when iterating over the session list.
+        */
+       unsigned int count;
+
+       /* Linked list head */
        struct cds_list_head head;
 };
 
@@ -30,15 +62,24 @@ struct ltt_traceable_app_list {
  * and a linked list is kept of all running traceable app.
  */
 struct ltt_traceable_app {
-       struct cds_list_head list;
+       int sock;            /* Communication socket with the application */
        pid_t pid;
-       uid_t uid;              /* User ID that owns the apps */
+       pid_t ppid;
+       uid_t uid;           /* User ID that owns the apps */
+       gid_t gid;           /* Group ID that owns the apps */
+       uint32_t v_major;    /* Verion major number */
+       uint32_t v_minor;    /* Verion minor number */
+       char name[17];       /* Process name (short) */
+       struct cds_list_head list;
 };
 
-struct ltt_traceable_app *find_app_by_pid(pid_t pid);
-int register_traceable_app(pid_t pid, uid_t uid);
-void unregister_traceable_app(pid_t pid);
-void get_app_list_pids(pid_t *pids);
+int register_traceable_app(struct ust_register_msg *msg, int sock);
+void unregister_traceable_app(int sock);
 unsigned int get_app_count(void);
 
+void lock_apps_list(void);
+void unlock_apps_list(void);
+void clean_traceable_apps_list(void);
+struct ltt_traceable_app_list *get_traceable_apps_list(void);
+
 #endif /* _TRACEABLE_APP_H */
diff --git a/ltt-sessiond/ust-comm.c b/ltt-sessiond/ust-comm.c
new file mode 100644 (file)
index 0000000..accd427
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C)  2011 - David Goulet <david.goulet@polymtl.ca>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#include <lttngerr.h>
+
+#include "ust-comm.h"
+
+/*
+ * Send msg containing a command to an UST application via sock and wait for
+ * the reply.
+ *
+ * Return -1 on error or if reply fails else return 0.
+ */
+int ustcomm_send_command(int sock, struct lttcomm_ust_msg *msg)
+{
+       ssize_t len;
+       struct lttcomm_ust_reply reply;
+
+       DBG("Sending UST command %d to sock %d", msg->cmd, sock);
+
+       /* Send UST msg */
+       len = lttcomm_send_unix_sock(sock, msg, sizeof(*msg));
+       if (len < 0) {
+               goto error;
+       }
+
+       DBG("Receiving UST reply on sock %d", sock);
+
+       /* Get UST reply */
+       len = lttcomm_recv_unix_sock(sock, &reply, sizeof(reply));
+       if (len < 0) {
+               goto error;
+       }
+
+       if (reply.ret_code != LTTCOMM_OK) {
+               goto error;
+       }
+
+       return 0;
+
+error:
+       return -1;
+}
diff --git a/ltt-sessiond/ust-comm.h b/ltt-sessiond/ust-comm.h
new file mode 100644 (file)
index 0000000..8a04f85
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the Free
+ * Software Foundation; only version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place - Suite 330, Boston, MA  02111-1307, USA.
+ */
+
+#ifndef _LTT_UST_COMM_H
+#define _LTT_UST_COMM_H
+
+#include <lttng-sessiond-comm.h>
+
+int ustcomm_send_command(int sock, struct lttcomm_ust_msg *msg);
+
+#endif /* _LTT_UST_COMM_H */
This page took 0.063002 seconds and 4 git commands to generate.