Merge remote-tracking branch 'cbab-github/tests-cleanup' into cbab
authorDavid Goulet <dgoulet@efficios.com>
Mon, 11 Mar 2013 17:00:33 +0000 (13:00 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Mon, 11 Mar 2013 17:00:33 +0000 (13:00 -0400)
Conflicts:
tests/regression/tools/streaming/test_high_throughput_limits
tests/unit/test_kernel_data.c
tests/unit/test_ust_data.c

39 files changed:
extras/bindings/swig/python/Makefile.am
src/bin/lttng-consumerd/lttng-consumerd.c
src/bin/lttng-sessiond/Makefile.am
src/bin/lttng-sessiond/channel.c
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/lttng-sessiond.h
src/bin/lttng-sessiond/lttng-ust-ctl.h
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/trace-kernel.c
src/bin/lttng-sessiond/trace-ust.c
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/bin/lttng-sessiond/ust-clock.h [new file with mode: 0644]
src/bin/lttng-sessiond/ust-consumer.c
src/bin/lttng-sessiond/ust-consumer.h
src/bin/lttng-sessiond/ust-metadata.c [new file with mode: 0644]
src/bin/lttng-sessiond/ust-registry.c [new file with mode: 0644]
src/bin/lttng-sessiond/ust-registry.h [new file with mode: 0644]
src/bin/lttng-sessiond/ust-thread.c [new file with mode: 0644]
src/bin/lttng-sessiond/ust-thread.h [new file with mode: 0644]
src/common/compat/uuid.h
src/common/consumer.c
src/common/consumer.h
src/common/defaults.h
src/common/hashtable/hashtable.c
src/common/hashtable/hashtable.h
src/common/hashtable/utils.c
src/common/hashtable/utils.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h
src/lib/lttng-ctl/filter/filter-visitor-generate-bytecode.c
src/lib/lttng-ctl/lttng-ctl.c
tests/regression/tools/streaming/test_high_throughput_limits
tests/unit/test_kernel_data.c
tests/unit/test_ust_data.c

index c6a69010eadfefdef7622874f725634ea43fa20f..6e7baa69d86fc8eb3ef6bc576859d62ef0607a9a 100644 (file)
@@ -4,16 +4,14 @@ lttng.i: lttng.i.in
 AM_CFLAGS = -I$(PYTHON_INCLUDE) -I$(top_srcdir)/lib/lttng-ctl -I../common \
               $(BUDDY_CFLAGS)
 
-EXTRA_DIST = lttng.i
-python_PYTHON = lttng.py
+EXTRA_DIST = lttng.i.in
+nodist_python_PYTHON = lttng.py
 pyexec_LTLIBRARIES = _lttng.la
 
 MAINTAINERCLEANFILES = lttng_wrap.c lttng.py
 
-_lttng_la_SOURCES = lttng_wrap.c
-
+nodist__lttng_la_SOURCES = lttng_wrap.c
 _lttng_la_LDFLAGS = -module
-
 _lttng_la_LIBADD =     $(top_srcdir)/src/lib/lttng-ctl/liblttng-ctl.la                 \
                        $(top_srcdir)/src/common/sessiond-comm/libsessiond-comm.la
 
index b854aabac9923c9b832b81b338d933b2feafa6d7..84868077c39e5e85624a973d70bfca270670bce2 100644 (file)
@@ -51,8 +51,8 @@
 
 /* TODO : support UST (all direct kernel-ctl accesses). */
 
-/* the two threads (receive fd, poll and metadata) */
-static pthread_t data_thread, metadata_thread, sessiond_thread;
+/* threads (channel handling, poll, metadata, sessiond) */
+static pthread_t channel_thread, data_thread, metadata_thread, sessiond_thread;
 
 /* to count the number of times the user pressed ctrl+c */
 static int sigintcount = 0;
@@ -363,12 +363,20 @@ int main(int argc, char **argv)
        }
        lttng_consumer_set_error_sock(ctx, ret);
 
+       /* Create thread to manage channels */
+       ret = pthread_create(&channel_thread, NULL, consumer_thread_channel_poll,
+                       (void *) ctx);
+       if (ret != 0) {
+               perror("pthread_create");
+               goto error;
+       }
+
        /* Create thread to manage the polling/writing of trace metadata */
        ret = pthread_create(&metadata_thread, NULL, consumer_thread_metadata_poll,
                        (void *) ctx);
        if (ret != 0) {
                perror("pthread_create");
-               goto error;
+               goto metadata_error;
        }
 
        /* Create thread to manage the polling/writing of trace data */
@@ -407,6 +415,13 @@ data_error:
                goto error;
        }
 
+metadata_error:
+       ret = pthread_join(channel_thread, &status);
+       if (ret != 0) {
+               perror("pthread_join");
+               goto error;
+       }
+
        if (!ret) {
                ret = EXIT_SUCCESS;
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_EXIT_SUCCESS);
index 0964c9402c751e5444b76e7890ccf400cd5bf4c1..26a2d13a0cac2be8fb7438297ceac88434e33684 100644 (file)
@@ -9,7 +9,7 @@ bin_PROGRAMS = lttng-sessiond
 lttng_sessiond_SOURCES = utils.c utils.h \
                        trace-kernel.c trace-kernel.h \
                        kernel.c kernel.h \
-                       ust-ctl.h ust-app.h trace-ust.h \
+                       ust-ctl.h ust-app.h trace-ust.h ust-thread.h \
                        context.c context.h \
                        channel.c channel.h \
                        event.c event.h \
@@ -26,7 +26,9 @@ lttng_sessiond_SOURCES = utils.c utils.h \
                        testpoint.h
 
 if HAVE_LIBLTTNG_UST_CTL
-lttng_sessiond_SOURCES += trace-ust.c ust-app.c ust-consumer.c ust-consumer.h
+lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-registry.h ust-app.c \
+                       ust-consumer.c ust-consumer.h ust-thread.c \
+                       ust-metadata.c ust-clock.h
 endif
 
 # Add main.c at the end for compile order
index 7ebe4b16eb49a467dda2420892685283eaf3375b..91be4f41c8ff559296692d04decd0f7bdf4930ff 100644 (file)
@@ -49,8 +49,6 @@ struct lttng_channel *channel_new_default_attr(int dom)
        }
 
        chan->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE;
-       chan->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER;
-       chan->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER;
 
        switch (dom) {
        case LTTNG_DOMAIN_KERNEL:
@@ -58,6 +56,8 @@ struct lttng_channel *channel_new_default_attr(int dom)
                        default_get_kernel_channel_subbuf_size();
                chan->attr.num_subbuf = DEFAULT_KERNEL_CHANNEL_SUBBUF_NUM;
                chan->attr.output = DEFAULT_KERNEL_CHANNEL_OUTPUT;
+               chan->attr.switch_timer_interval = DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER;
+               chan->attr.read_timer_interval = DEFAULT_KERNEL_CHANNEL_READ_TIMER;
                break;
        case LTTNG_DOMAIN_UST:
 #if 0
@@ -68,6 +68,8 @@ struct lttng_channel *channel_new_default_attr(int dom)
                chan->attr.subbuf_size = default_get_ust_channel_subbuf_size();
                chan->attr.num_subbuf = DEFAULT_UST_CHANNEL_SUBBUF_NUM;
                chan->attr.output = DEFAULT_UST_CHANNEL_OUTPUT;
+               chan->attr.switch_timer_interval = DEFAULT_UST_CHANNEL_SWITCH_TIMER;
+               chan->attr.read_timer_interval = DEFAULT_UST_CHANNEL_READ_TIMER;
                break;
        default:
                goto error;     /* Not implemented */
index a47c504c718736e86425bd4c58d95134ec4f8b9c..1a8fbba1eea75f30f4559274a3790f1e1ba599cc 100644 (file)
 /*
  * Used to keep a unique index for each relayd socket created where this value
  * is associated with streams on the consumer so it can match the right relayd
- * to send to.
- *
- * This value should be incremented atomically for safety purposes and future
- * possible concurrent access.
+ * to send to. It must be accessed with the relayd_net_seq_idx_lock
+ * held.
  */
-static unsigned int relayd_net_seq_idx;
+static pthread_mutex_t relayd_net_seq_idx_lock = PTHREAD_MUTEX_INITIALIZER;
+static uint64_t relayd_net_seq_idx;
 
 /*
  * Create a session path used by list_lttng_sessions for the case that the
@@ -566,15 +565,15 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
        }
 
        /* Set the network sequence index if not set. */
-       if (consumer->net_seq_index == -1) {
+       if (consumer->net_seq_index == (uint64_t) -1ULL) {
+               pthread_mutex_lock(&relayd_net_seq_idx_lock);
                /*
                 * Increment net_seq_idx because we are about to transfer the
                 * new relayd socket to the consumer.
+                * Assign unique key so the consumer can match streams.
                 */
-               uatomic_inc(&relayd_net_seq_idx);
-               /* Assign unique key so the consumer can match streams */
-               uatomic_set(&consumer->net_seq_index,
-                               uatomic_read(&relayd_net_seq_idx));
+               consumer->net_seq_index = ++relayd_net_seq_idx;
+               pthread_mutex_unlock(&relayd_net_seq_idx_lock);
        }
 
        /* Send relayd socket to consumer. */
@@ -2136,10 +2135,12 @@ error:
 void cmd_init(void)
 {
        /*
-        * Set network sequence index to 1 for streams to match a relayd socket on
-        * the consumer side.
+        * Set network sequence index to 1 for streams to match a relayd
+        * socket on the consumer side.
         */
-       uatomic_set(&relayd_net_seq_idx, 1);
+       pthread_mutex_lock(&relayd_net_seq_idx_lock);
+       relayd_net_seq_idx = 1;
+       pthread_mutex_unlock(&relayd_net_seq_idx_lock);
 
        DBG("Command subsystem initialized");
 }
index ff5360aae755aa082ec6517cb6c4f10df99c9d9d..92abcf21d07257ce4cfa83ad44dbf79bd833508d 100644 (file)
@@ -23,6 +23,7 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <inttypes.h>
 
 #include <common/common.h>
 #include <common/defaults.h>
@@ -76,7 +77,7 @@ end:
  * negative value is sent back and both parameters are untouched.
  */
 int consumer_recv_status_channel(struct consumer_socket *sock,
-               unsigned long *key, unsigned int *stream_count)
+               uint64_t *key, unsigned int *stream_count)
 {
        int ret;
        struct lttcomm_consumer_status_channel reply;
@@ -359,7 +360,7 @@ struct consumer_output *consumer_create_output(enum consumer_dst_type type)
        /* By default, consumer output is enabled */
        output->enabled = 1;
        output->type = type;
-       output->net_seq_index = -1;
+       output->net_seq_index = (uint64_t) -1ULL;
 
        output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 
@@ -622,8 +623,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                const char *name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
-               unsigned long key,
+               uint64_t relayd_id,
+               uint64_t key,
                unsigned char *uuid)
 {
        assert(msg);
@@ -660,12 +661,12 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
  */
 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
-               int channel_key,
+               uint64_t channel_key,
                uint64_t session_id,
                const char *pathname,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                const char *name,
                unsigned int nb_init_streams,
                enum lttng_event_output output,
@@ -700,8 +701,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
  */
 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
-               int channel_key,
-               int stream_key,
+               uint64_t channel_key,
+               uint64_t stream_key,
                int cpu)
 {
        assert(msg);
@@ -758,7 +759,7 @@ error:
  */
 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
-               enum lttng_stream_type type, unsigned int session_id)
+               enum lttng_stream_type type, uint64_t session_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -865,7 +866,7 @@ error:
  * This function has a different behavior with the consumer i.e. that it waits
  * for a reply from the consumer if yes or no the data is pending.
  */
-int consumer_is_data_pending(unsigned int id,
+int consumer_is_data_pending(uint64_t session_id,
                struct consumer_output *consumer)
 {
        int ret;
@@ -878,9 +879,9 @@ int consumer_is_data_pending(unsigned int id,
 
        msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
 
-       msg.u.data_pending.session_id = (uint64_t) id;
+       msg.u.data_pending.session_id = session_id;
 
-       DBG3("Consumer data pending for id %u", id);
+       DBG3("Consumer data pending for id %" PRIu64, session_id);
 
        /* Send command for each consumer */
        rcu_read_lock();
@@ -924,8 +925,8 @@ int consumer_is_data_pending(unsigned int id,
        }
        rcu_read_unlock();
 
-       DBG("Consumer data is %s pending for session id %u",
-                       ret_code == 1 ? "" : "NOT", id);
+       DBG("Consumer data is %s pending for session id %" PRIu64,
+                       ret_code == 1 ? "" : "NOT", session_id);
        return ret_code;
 
 error_unlock:
index af337baa386b61f81e587609a03d3e7ae800d1a2..3616d467cd4e76e65246ce5c2e07cbf80e75434e 100644 (file)
@@ -126,7 +126,7 @@ struct consumer_output {
         * side. It tells the consumer which streams goes to which relayd with this
         * index. The relayd sockets are index with it on the consumer side.
         */
-       int net_seq_index;
+       uint64_t net_seq_index;
 
        /*
         * Subdirectory path name used for both local and network consumer.
@@ -170,12 +170,12 @@ int consumer_send_channel(struct consumer_socket *sock,
                struct lttcomm_consumer_msg *msg);
 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                struct lttcomm_sock *sock, struct consumer_output *consumer,
-               enum lttng_stream_type type, unsigned int session_id);
+               enum lttng_stream_type type, uint64_t session_id);
 int consumer_send_destroy_relayd(struct consumer_socket *sock,
                struct consumer_output *consumer);
 int consumer_recv_status_reply(struct consumer_socket *sock);
 int consumer_recv_status_channel(struct consumer_socket *sock,
-               unsigned long *key, unsigned int *stream_count);
+               uint64_t *key, unsigned int *stream_count);
 void consumer_output_send_destroy_relayd(struct consumer_output *consumer);
 int consumer_create_socket(struct consumer_data *data,
                struct consumer_output *output);
@@ -195,27 +195,27 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                const char *name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
-               unsigned long key,
+               uint64_t relayd_id,
+               uint64_t key,
                unsigned char *uuid);
 void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
-               int channel_key,
-               int stream_key,
+               uint64_t channel_key,
+               uint64_t stream_key,
                int cpu);
 void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
-               int channel_key,
+               uint64_t channel_key,
                uint64_t session_id,
                const char *pathname,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                const char *name,
                unsigned int nb_init_streams,
                enum lttng_event_output output,
                int type);
-int consumer_is_data_pending(unsigned int id,
+int consumer_is_data_pending(uint64_t session_id,
                struct consumer_output *consumer);
 
 #endif /* _CONSUMER_H */
index 63e9be9989e0e978b04a5c37c0c1a027a41fcc87..9258f38a7c1018d27ee91de0f3817f92efe90bd6 100644 (file)
@@ -23,6 +23,7 @@
 #include <urcu/wfqueue.h>
 
 #include <common/sessiond-comm/sessiond-comm.h>
+#include <common/compat/poll.h>
 #include <common/compat/socket.h>
 
 #include "session.h"
@@ -64,4 +65,13 @@ struct ust_cmd_queue {
        struct cds_wfq_queue queue;
 };
 
+/*
+ * This pipe is used to inform the thread managing application notify
+ * communication that a command is queued and ready to be processed.
+ */
+extern int apps_cmd_notify_pipe[2];
+
+int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size);
+int sessiond_check_thread_quit_pipe(int fd, uint32_t events);
+
 #endif /* _LTT_SESSIOND_H */
index cd833b826c589f4f59a88ec554a823dfe0cc3e29..7dd36a93cc591d20c2d89764e5e13e57d88f6a7c 100644 (file)
@@ -136,13 +136,21 @@ void ustctl_destroy_channel(struct ustctl_consumer_channel *chan);
 
 int ustctl_send_channel_to_sessiond(int sock,
                struct ustctl_consumer_channel *channel);
+int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *consumer_chan);
+
 /*
  * Send a NULL stream to finish iteration over all streams of a given
  * channel.
  */
 int ustctl_send_stream_to_sessiond(int sock,
                struct ustctl_consumer_stream *stream);
+int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream);
 int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream);
+int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream);
+int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream);
 
 /* Create/destroy stream buffers for read */
 struct ustctl_consumer_stream *
@@ -150,9 +158,6 @@ struct ustctl_consumer_stream *
                        int cpu);
 void ustctl_destroy_stream(struct ustctl_consumer_stream *stream);
 
-int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream);
-int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream);
-
 /* For mmap mode, readable without "get" operation */
 int ustctl_get_mmap_len(struct ustctl_consumer_stream *stream,
                unsigned long *len);
index e9529f1bf9378dc46bb3ddeaa250b10fafaef249..f7bb53ef7ead038e479964b6e11bb2259eeb1cd7 100644 (file)
@@ -37,7 +37,6 @@
 #include <config.h>
 
 #include <common/common.h>
-#include <common/compat/poll.h>
 #include <common/compat/socket.h>
 #include <common/defaults.h>
 #include <common/kernel-consumer/kernel-consumer.h>
 #include "fd-limit.h"
 #include "health.h"
 #include "testpoint.h"
+#include "ust-thread.h"
 
 #define CONSUMERD_FILE "lttng-consumerd"
 
 /* Const values */
-const char default_home_dir[] = DEFAULT_HOME_DIR;
 const char default_tracing_group[] = DEFAULT_TRACING_GROUP;
-const char default_ust_sock_dir[] = DEFAULT_UST_SOCK_DIR;
-const char default_global_apps_pipe[] = DEFAULT_GLOBAL_APPS_PIPE;
 
 const char *progname;
 const char *opt_tracing_group;
@@ -149,8 +146,11 @@ static int thread_quit_pipe[2] = { -1, -1 };
  */
 static int apps_cmd_pipe[2] = { -1, -1 };
 
+int apps_cmd_notify_pipe[2] = { -1, -1 };
+
 /* Pthread, Mutexes and Semaphores */
 static pthread_t apps_thread;
+static pthread_t apps_notify_thread;
 static pthread_t reg_apps_thread;
 static pthread_t client_thread;
 static pthread_t kernel_thread;
@@ -279,15 +279,11 @@ void setup_consumerd_path(void)
 /*
  * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
  */
-static int create_thread_poll_set(struct lttng_poll_event *events,
-               unsigned int size)
+int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size)
 {
        int ret;
 
-       if (events == NULL || size == 0) {
-               ret = -1;
-               goto error;
-       }
+       assert(events);
 
        ret = lttng_poll_create(events, size, LTTNG_CLOEXEC);
        if (ret < 0) {
@@ -295,7 +291,7 @@ static int create_thread_poll_set(struct lttng_poll_event *events,
        }
 
        /* Add quit pipe */
-       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN);
+       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
        if (ret < 0) {
                goto error;
        }
@@ -311,7 +307,7 @@ error:
  *
  * Return 1 if it was triggered else 0;
  */
-static int check_thread_quit_pipe(int fd, uint32_t events)
+int sessiond_check_thread_quit_pipe(int fd, uint32_t events)
 {
        if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
                return 1;
@@ -721,7 +717,7 @@ static void *thread_manage_kernel(void *data)
                        /* Clean events object. We are about to populate it again. */
                        lttng_poll_clean(&events);
 
-                       ret = create_thread_poll_set(&events, 2);
+                       ret = sessiond_set_thread_pollset(&events, 2);
                        if (ret < 0) {
                                goto error_poll_create;
                        }
@@ -771,7 +767,7 @@ static void *thread_manage_kernel(void *data)
                        health_code_update();
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
+                       ret = sessiond_check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
@@ -870,7 +866,7 @@ static void *thread_manage_consumer(void *data)
         * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock.
         * Nothing more will be added to this poll set.
         */
-       ret = create_thread_poll_set(&events, 2);
+       ret = sessiond_set_thread_pollset(&events, 2);
        if (ret < 0) {
                goto error_poll;
        }
@@ -917,7 +913,7 @@ restart:
                health_code_update();
 
                /* Thread quit pipe has been closed. Killing thread. */
-               ret = check_thread_quit_pipe(pollfd, revents);
+               ret = sessiond_check_thread_quit_pipe(pollfd, revents);
                if (ret) {
                        err = 0;
                        goto exit;
@@ -1011,7 +1007,7 @@ restart_poll:
                health_code_update();
 
                /* Thread quit pipe has been closed. Killing thread. */
-               ret = check_thread_quit_pipe(pollfd, revents);
+               ret = sessiond_check_thread_quit_pipe(pollfd, revents);
                if (ret) {
                        err = 0;
                        goto exit;
@@ -1093,7 +1089,6 @@ static void *thread_manage_apps(void *data)
 {
        int i, ret, pollfd, err = -1;
        uint32_t revents, nb_fd;
-       struct ust_command ust_cmd;
        struct lttng_poll_event events;
 
        DBG("[thread] Manage application started");
@@ -1109,7 +1104,7 @@ static void *thread_manage_apps(void *data)
 
        health_code_update();
 
-       ret = create_thread_poll_set(&events, 2);
+       ret = sessiond_set_thread_pollset(&events, 2);
        if (ret < 0) {
                goto error_poll_create;
        }
@@ -1153,7 +1148,7 @@ static void *thread_manage_apps(void *data)
                        health_code_update();
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
+                       ret = sessiond_check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
@@ -1165,11 +1160,13 @@ static void *thread_manage_apps(void *data)
                                        ERR("Apps command pipe error");
                                        goto error;
                                } else if (revents & LPOLLIN) {
+                                       int sock;
+
                                        /* Empty pipe */
                                        do {
-                                               ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd));
+                                               ret = read(apps_cmd_pipe[0], &sock, sizeof(sock));
                                        } while (ret < 0 && errno == EINTR);
-                                       if (ret < 0 || ret < sizeof(ust_cmd)) {
+                                       if (ret < 0 || ret < sizeof(sock)) {
                                                PERROR("read apps cmd pipe");
                                                goto error;
                                        }
@@ -1177,70 +1174,23 @@ static void *thread_manage_apps(void *data)
                                        health_code_update();
 
                                        /*
-                                        * @session_lock
-                                        * Lock the global session list so from the register up to
-                                        * the registration done message, no thread can see the
-                                        * application and change its state.
+                                        * We only monitor the error events of the socket. This
+                                        * thread does not handle any incoming data from UST
+                                        * (POLLIN).
                                         */
-                                       session_lock_list();
-
-                                       /* Register applicaton to the session daemon */
-                                       ret = ust_app_register(&ust_cmd.reg_msg,
-                                                       ust_cmd.sock);
-                                       if (ret == -ENOMEM) {
-                                               session_unlock_list();
+                                       ret = lttng_poll_add(&events, sock,
+                                                       LPOLLERR | LPOLLHUP | LPOLLRDHUP);
+                                       if (ret < 0) {
                                                goto error;
-                                       } else if (ret < 0) {
-                                               session_unlock_list();
-                                               break;
                                        }
 
-                                       health_code_update();
+                                       /* Set socket timeout for both receiving and ending */
+                                       (void) lttcomm_setsockopt_rcv_timeout(sock,
+                                                       app_socket_timeout);
+                                       (void) lttcomm_setsockopt_snd_timeout(sock,
+                                                       app_socket_timeout);
 
-                                       /*
-                                        * Validate UST version compatibility.
-                                        */
-                                       ret = ust_app_validate_version(ust_cmd.sock);
-                                       if (ret >= 0) {
-                                               /*
-                                                * Add channel(s) and event(s) to newly registered apps
-                                                * from lttng global UST domain.
-                                                */
-                                               update_ust_app(ust_cmd.sock);
-                                       }
-
-                                       health_code_update();
-
-                                       ret = ust_app_register_done(ust_cmd.sock);
-                                       if (ret < 0) {
-                                               /*
-                                                * If the registration is not possible, we simply
-                                                * unregister the apps and continue
-                                                */
-                                               ust_app_unregister(ust_cmd.sock);
-                                       } else {
-                                               /*
-                                                * We only monitor the error events of the socket. This
-                                                * thread does not handle any incoming data from UST
-                                                * (POLLIN).
-                                                */
-                                               ret = lttng_poll_add(&events, ust_cmd.sock,
-                                                               LPOLLERR & LPOLLHUP & LPOLLRDHUP);
-                                               if (ret < 0) {
-                                                       session_unlock_list();
-                                                       goto error;
-                                               }
-
-                                               /* Set socket timeout for both receiving and ending */
-                                               (void) lttcomm_setsockopt_rcv_timeout(ust_cmd.sock,
-                                                               app_socket_timeout);
-                                               (void) lttcomm_setsockopt_snd_timeout(ust_cmd.sock,
-                                                               app_socket_timeout);
-
-                                               DBG("Apps with sock %d added to poll set",
-                                                               ust_cmd.sock);
-                                       }
-                                       session_unlock_list();
+                                       DBG("Apps with sock %d added to poll set", sock);
 
                                        health_code_update();
 
@@ -1293,6 +1243,38 @@ error_testpoint:
        return NULL;
 }
 
+/*
+ * Send a socket to a thread This is called from the dispatch UST registration
+ * thread once all sockets are set for the application.
+ *
+ * On success, return 0 else a negative value being the errno message of the
+ * write().
+ */
+static int send_socket_to_thread(int fd, int sock)
+{
+       int ret;
+
+       /* Sockets MUST be set or else this should not have been called. */
+       assert(fd >= 0);
+       assert(sock >= 0);
+
+       do {
+               ret = write(fd, &sock, sizeof(sock));
+       } while (ret < 0 && errno == EINTR);
+       if (ret < 0 || ret != sizeof(sock)) {
+               PERROR("write apps pipe %d", fd);
+               if (ret < 0) {
+                       ret = -errno;
+               }
+               goto error;
+       }
+
+       /* All good. Don't send back the write positive ret value. */
+       ret = 0;
+error:
+       return ret;
+}
+
 /*
  * Dispatch request from the registration threads to the application
  * communication thread.
@@ -1302,6 +1284,12 @@ static void *thread_dispatch_ust_registration(void *data)
        int ret;
        struct cds_wfq_node *node;
        struct ust_command *ust_cmd = NULL;
+       struct {
+               struct ust_app *app;
+               struct cds_list_head head;
+       } *wait_node = NULL, *tmp_wait_node;
+
+       CDS_LIST_HEAD(wait_queue);
 
        DBG("[thread] Dispatch UST command started");
 
@@ -1310,6 +1298,8 @@ static void *thread_dispatch_ust_registration(void *data)
                futex_nto1_prepare(&ust_cmd_queue.futex);
 
                do {
+                       struct ust_app *app = NULL;
+
                        /* Dequeue command for registration */
                        node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue);
                        if (node == NULL) {
@@ -1326,34 +1316,122 @@ static void *thread_dispatch_ust_registration(void *data)
                                        ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
                                        ust_cmd->sock, ust_cmd->reg_msg.name,
                                        ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
-                       /*
-                        * Inform apps thread of the new application registration. This
-                        * call is blocking so we can be assured that the data will be read
-                        * at some point in time or wait to the end of the world :)
-                        */
-                       if (apps_cmd_pipe[1] >= 0) {
-                               do {
-                                       ret = write(apps_cmd_pipe[1], ust_cmd,
-                                                       sizeof(struct ust_command));
-                               } while (ret < 0 && errno == EINTR);
-                               if (ret < 0 || ret != sizeof(struct ust_command)) {
-                                       PERROR("write apps cmd pipe");
-                                       if (errno == EBADF) {
-                                               /*
-                                                * We can't inform the application thread to process
-                                                * registration. We will exit or else application
-                                                * registration will not occur and tracing will never
-                                                * start.
-                                                */
-                                               goto error;
+
+                       if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) {
+                               wait_node = zmalloc(sizeof(*wait_node));
+                               if (!wait_node) {
+                                       PERROR("zmalloc wait_node dispatch");
+                                       goto error;
+                               }
+                               CDS_INIT_LIST_HEAD(&wait_node->head);
+
+                               /* Create application object if socket is CMD. */
+                               wait_node->app = ust_app_create(&ust_cmd->reg_msg,
+                                               ust_cmd->sock);
+                               if (!wait_node->app) {
+                                       ret = close(ust_cmd->sock);
+                                       if (ret < 0) {
+                                               PERROR("close ust sock dispatch %d", ust_cmd->sock);
                                        }
+                                       lttng_fd_put(1, LTTNG_FD_APPS);
+                                       free(wait_node);
+                                       continue;
                                }
+                               /*
+                                * Add application to the wait queue so we can set the notify
+                                * socket before putting this object in the global ht.
+                                */
+                               cds_list_add(&wait_node->head, &wait_queue);
+
+                               /*
+                                * We have to continue here since we don't have the notify
+                                * socket and the application MUST be added to the hash table
+                                * only at that moment.
+                                */
+                               continue;
                        } else {
-                               /* Application manager thread is not available. */
+                               /*
+                                * Look for the application in the local wait queue and set the
+                                * notify socket if found.
+                                */
+                               cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
+                                               &wait_queue, head) {
+                                       if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
+                                               wait_node->app->notify_sock = ust_cmd->sock;
+                                               cds_list_del(&wait_node->head);
+                                               app = wait_node->app;
+                                               free(wait_node);
+                                               DBG3("UST app notify socket %d is set", ust_cmd->sock);
+                                               break;
+                                       }
+                               }
+                       }
+
+                       if (app) {
+                               /*
+                                * @session_lock_list
+                                *
+                                * Lock the global session list so from the register up to the
+                                * registration done message, no thread can see the application
+                                * and change its state.
+                                */
+                               session_lock_list();
+                               rcu_read_lock();
+
+                               /*
+                                * Add application to the global hash table. This needs to be
+                                * done before the update to the UST registry can locate the
+                                * application.
+                                */
+                               ust_app_add(app);
+
+                               /* Set app version. This call will print an error if needed. */
+                               (void) ust_app_version(app);
+
+                               /* Send notify socket through the notify pipe. */
+                               ret = send_socket_to_thread(apps_cmd_notify_pipe[1],
+                                               app->notify_sock);
+                               if (ret < 0) {
+                                       rcu_read_unlock();
+                                       session_unlock_list();
+                                       /* No notify thread, stop the UST tracing. */
+                                       goto error;
+                               }
+
+                               /*
+                                * Update newly registered application with the tracing
+                                * registry info already enabled information.
+                                */
+                               update_ust_app(app->sock);
+
+                               /*
+                                * Don't care about return value. Let the manage apps threads
+                                * handle app unregistration upon socket close.
+                                */
+                               (void) ust_app_register_done(app->sock);
+
+                               /*
+                                * Even if the application socket has been closed, send the app
+                                * to the thread and unregistration will take place at that
+                                * place.
+                                */
+                               ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock);
+                               if (ret < 0) {
+                                       rcu_read_unlock();
+                                       session_unlock_list();
+                                       /* No apps. thread, stop the UST tracing. */
+                                       goto error;
+                               }
+
+                               rcu_read_unlock();
+                               session_unlock_list();
+                       } else {
+                               /* Application manager threads are not available. */
                                ret = close(ust_cmd->sock);
                                if (ret < 0) {
                                        PERROR("close ust_cmd sock");
                                }
+                               lttng_fd_put(1, LTTNG_FD_APPS);
                        }
                        free(ust_cmd);
                } while (node != NULL);
@@ -1363,6 +1441,13 @@ static void *thread_dispatch_ust_registration(void *data)
        }
 
 error:
+       /* Clean up wait queue. */
+       cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
+                       &wait_queue, head) {
+               cds_list_del(&wait_node->head);
+               free(wait_node);
+       }
+
        DBG("Dispatch thread dying");
        return NULL;
 }
@@ -1398,7 +1483,7 @@ static void *thread_registration_apps(void *data)
         * Pass 2 as size here for the thread quit pipe and apps socket. Nothing
         * more will be added to this poll set.
         */
-       ret = create_thread_poll_set(&events, 2);
+       ret = sessiond_set_thread_pollset(&events, 2);
        if (ret < 0) {
                goto error_create_poll;
        }
@@ -1445,7 +1530,7 @@ static void *thread_registration_apps(void *data)
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
+                       ret = sessiond_check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
@@ -1490,16 +1575,12 @@ static void *thread_registration_apps(void *data)
                                                sock = -1;
                                                continue;
                                        }
+
                                        health_code_update();
-                                       ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg,
-                                                       sizeof(struct ust_register_msg));
-                                       if (ret < 0 || ret < sizeof(struct ust_register_msg)) {
-                                               if (ret < 0) {
-                                                       PERROR("lttcomm_recv_unix_sock register apps");
-                                               } else {
-                                                       ERR("Wrong size received on apps register");
-                                               }
+                                       ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg);
+                                       if (ret < 0) {
                                                free(ust_cmd);
+                                               /* Close socket of the application. */
                                                ret = close(sock);
                                                if (ret) {
                                                        PERROR("close");
@@ -2966,7 +3047,7 @@ static void *thread_manage_health(void *data)
         * Pass 2 as size here for the thread quit pipe and client_sock. Nothing
         * more will be added to this poll set.
         */
-       ret = create_thread_poll_set(&events, 2);
+       ret = sessiond_set_thread_pollset(&events, 2);
        if (ret < 0) {
                goto error;
        }
@@ -3001,7 +3082,7 @@ restart:
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
+                       ret = sessiond_check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
@@ -3154,7 +3235,7 @@ static void *thread_manage_clients(void *data)
         * Pass 2 as size here for the thread quit pipe and client_sock. Nothing
         * more will be added to this poll set.
         */
-       ret = create_thread_poll_set(&events, 2);
+       ret = sessiond_set_thread_pollset(&events, 2);
        if (ret < 0) {
                goto error_create_poll;
        }
@@ -3206,7 +3287,7 @@ static void *thread_manage_clients(void *data)
                        health_code_update();
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_thread_quit_pipe(pollfd, revents);
+                       ret = sessiond_check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
@@ -4006,7 +4087,7 @@ int main(int argc, char **argv)
                /* Set global SHM for ust */
                if (strlen(wait_shm_path) == 0) {
                        snprintf(wait_shm_path, PATH_MAX,
-                                       DEFAULT_HOME_APPS_WAIT_SHM_PATH, geteuid());
+                                       DEFAULT_HOME_APPS_WAIT_SHM_PATH, getuid());
                }
 
                /* Set health check Unix path */
@@ -4022,6 +4103,7 @@ int main(int argc, char **argv)
 
        DBG("Client socket path %s", client_unix_sock_path);
        DBG("Application socket path %s", apps_unix_sock_path);
+       DBG("Application wait path %s", wait_shm_path);
        DBG("LTTng run directory path: %s", rundir);
 
        /* 32 bits consumerd path setup */
@@ -4130,6 +4212,11 @@ int main(int argc, char **argv)
                goto exit;
        }
 
+       /* Setup the thread apps notify communication pipe. */
+       if (utils_create_pipe_cloexec(apps_cmd_notify_pipe) < 0) {
+               goto exit;
+       }
+
        /* Init UST command queue. */
        cds_wfq_init(&ust_cmd_queue.queue);
 
@@ -4194,6 +4281,14 @@ int main(int argc, char **argv)
                goto exit_apps;
        }
 
+       /* Create thread to manage application notify socket */
+       ret = pthread_create(&apps_notify_thread, NULL,
+                       ust_thread_manage_notify, (void *) NULL);
+       if (ret != 0) {
+               PERROR("pthread_create apps");
+               goto exit_apps;
+       }
+
        /* Don't start this thread if kernel tracing is not requested nor root */
        if (is_root && !opt_no_kernel) {
                /* Create kernel thread to manage kernel event */
index 48be065765b2a548892bd534cea6cb0ec8c1df10..990684be918db721a36b8265813b4d8bdcb38cb7 100644 (file)
@@ -272,8 +272,8 @@ struct ltt_kernel_metadata *trace_kernel_create_metadata(void)
        chan->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE;
        chan->attr.subbuf_size = default_get_metadata_subbuf_size();
        chan->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM;
-       chan->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER;
-       chan->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER;
+       chan->attr.switch_timer_interval = DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER;
+       chan->attr.read_timer_interval = DEFAULT_KERNEL_CHANNEL_READ_TIMER;
        chan->attr.output = DEFAULT_KERNEL_CHANNEL_OUTPUT;
 
        /* Init metadata */
index 5e06a845242b79293d78add5d8f30d4e2bc81579..97a9c77e81f6974625877c67b4e125fde692d9e4 100644 (file)
@@ -412,8 +412,8 @@ struct ltt_ust_metadata *trace_ust_create_metadata(char *path)
        lum->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE;
        lum->attr.subbuf_size = default_get_metadata_subbuf_size();
        lum->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM;
-       lum->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER;
-       lum->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER;
+       lum->attr.switch_timer_interval = DEFAULT_UST_CHANNEL_SWITCH_TIMER;
+       lum->attr.read_timer_interval = DEFAULT_UST_CHANNEL_READ_TIMER;
        lum->attr.output = LTTNG_UST_MMAP;
 
        lum->handle = -1;
index aa188931d7688769bfd81fbf0677a75b031a6092..225d3f86a81a3837b4acbf9ed23c5d3cf01a9416 100644 (file)
@@ -73,6 +73,10 @@ static struct consumer_socket *find_consumer_socket_by_bitness(int bits,
        }
 
        socket = consumer_find_socket(consumer_fd, consumer);
+       if (!socket) {
+               ERR("Consumer socket fd %d not found in consumer obj %p",
+                               consumer_fd, consumer);
+       }
 
 end:
        return socket;
@@ -142,16 +146,18 @@ no_match:
  * Unique add of an ust app event in the given ht. This uses the custom
  * ht_match_ust_app_event match function and the event name as hash.
  */
-static void add_unique_ust_app_event(struct lttng_ht *ht,
+static void add_unique_ust_app_event(struct ust_app_channel *ua_chan,
                struct ust_app_event *event)
 {
        struct cds_lfht_node *node_ptr;
        struct ust_app_ht_key key;
+       struct lttng_ht *ht;
 
-       assert(ht);
-       assert(ht->ht);
+       assert(ua_chan);
+       assert(ua_chan->events);
        assert(event);
 
+       ht = ua_chan->events;
        key.name = event->attr.name;
        key.filter = event->filter;
        key.loglevel = event->attr.loglevel;
@@ -162,6 +168,28 @@ static void add_unique_ust_app_event(struct lttng_ht *ht,
        assert(node_ptr == &event->node.node);
 }
 
+/*
+ * Close the notify socket from the given RCU head object. This MUST be called
+ * through a call_rcu().
+ */
+static void close_notify_sock_rcu(struct rcu_head *head)
+{
+       int ret;
+       struct ust_app_notify_sock_obj *obj =
+               caa_container_of(head, struct ust_app_notify_sock_obj, head);
+
+       /* Must have a valid fd here. */
+       assert(obj->fd >= 0);
+
+       ret = close(obj->fd);
+       if (ret) {
+               ERR("close notify sock %d RCU", obj->fd);
+       }
+       lttng_fd_put(LTTNG_FD_APPS, 1);
+
+       free(obj);
+}
+
 /*
  * Delete ust context safely. RCU read lock must be held before calling
  * this function.
@@ -175,9 +203,9 @@ void delete_ust_app_ctx(int sock, struct ust_app_ctx *ua_ctx)
 
        if (ua_ctx->obj) {
                ret = ustctl_release_object(sock, ua_ctx->obj);
-               if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
-                       ERR("UST app sock %d release context obj failed with ret %d",
-                                       sock, ret);
+               if (ret < 0 && ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
+                       ERR("UST app sock %d release ctx obj handle %d failed with ret %d",
+                                       sock, ua_ctx->obj->handle, ret);
                }
                free(ua_ctx->obj);
        }
@@ -236,7 +264,8 @@ void delete_ust_app_stream(int sock, struct ust_app_stream *stream)
  * this function.
  */
 static
-void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan)
+void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
+               struct ust_app *app)
 {
        int ret;
        struct lttng_ht_iter iter;
@@ -271,7 +300,13 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan)
        }
        lttng_ht_destroy(ua_chan->events);
 
+       /* Wipe and free registry. */
+       ust_registry_channel_destroy(&ua_chan->session->registry, &ua_chan->registry);
+
        if (ua_chan->obj != NULL) {
+               /* Remove channel from application UST object descriptor. */
+               iter.iter.node = &ua_chan->ust_objd_node.node;
+               lttng_ht_del(app->ust_objd, &iter);
                ret = ustctl_release_object(sock, ua_chan->obj);
                if (ret < 0 && ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
                        ERR("UST app sock %d release channel obj failed with ret %d",
@@ -283,29 +318,178 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan)
        free(ua_chan);
 }
 
+/*
+ * For a given application and session, push metadata to consumer. The session
+ * lock MUST be acquired here before calling this.
+ *
+ * Return 0 on success else a negative error.
+ */
+static int push_metadata(struct ust_app *app, struct ust_app_session *ua_sess)
+{
+       int ret;
+       char *metadata_str = NULL;
+       size_t len, offset;
+       struct consumer_socket *socket;
+
+       assert(app);
+       assert(ua_sess);
+
+       if (!ua_sess->consumer || !ua_sess->metadata) {
+               /* No consumer means no stream associated so just return gracefully. */
+               ret = 0;
+               goto end;
+       }
+
+       rcu_read_lock();
+
+       /* Get consumer socket to use to push the metadata.*/
+       socket = find_consumer_socket_by_bitness(app->bits_per_long,
+                       ua_sess->consumer);
+       if (!socket) {
+               ret = -1;
+               goto error_rcu_unlock;
+       }
+
+       /*
+        * TODO: Currently, we hold the socket lock around sampling of the next
+        * metadata segment to ensure we send metadata over the consumer socket in
+        * the correct order. This makes the registry lock nest inside the socket
+        * lock.
+        *
+        * Please note that this is a temporary measure: we should move this lock
+        * back into ust_consumer_push_metadata() when the consumer gets the
+        * ability to reorder the metadata it receives.
+        */
+       pthread_mutex_lock(socket->lock);
+       pthread_mutex_lock(&ua_sess->registry.lock);
+
+       offset = ua_sess->registry.metadata_len_sent;
+       len = ua_sess->registry.metadata_len - ua_sess->registry.metadata_len_sent;
+       if (len == 0) {
+               DBG3("No metadata to push for session id %d", ua_sess->id);
+               ret = 0;
+               goto error_reg_unlock;
+       }
+       assert(len > 0);
+
+       /* Allocate only what we have to send. */
+       metadata_str = zmalloc(len);
+       if (!metadata_str) {
+               PERROR("zmalloc ust app metadata string");
+               ret = -ENOMEM;
+               goto error_reg_unlock;
+       }
+       /* Copy what we haven't send out. */
+       memcpy(metadata_str, ua_sess->registry.metadata + offset, len);
+
+       pthread_mutex_unlock(&ua_sess->registry.lock);
+
+       ret = ust_consumer_push_metadata(socket, ua_sess, metadata_str, len,
+                       offset);
+       if (ret < 0) {
+               pthread_mutex_unlock(socket->lock);
+               goto error_rcu_unlock;
+       }
+
+       /* Update len sent of the registry. */
+       pthread_mutex_lock(&ua_sess->registry.lock);
+       ua_sess->registry.metadata_len_sent += len;
+       pthread_mutex_unlock(&ua_sess->registry.lock);
+       pthread_mutex_unlock(socket->lock);
+
+       rcu_read_unlock();
+       free(metadata_str);
+       return 0;
+
+error_reg_unlock:
+       pthread_mutex_unlock(&ua_sess->registry.lock);
+       pthread_mutex_unlock(socket->lock);
+error_rcu_unlock:
+       rcu_read_unlock();
+       free(metadata_str);
+end:
+       return ret;
+}
+
+/*
+ * Send to the consumer a close metadata command for the given session. Once
+ * done, the metadata channel is deleted and the session metadata pointer is
+ * nullified. The session lock MUST be acquired here unless the application is
+ * in the destroy path.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int close_metadata(struct ust_app *app, struct ust_app_session *ua_sess)
+{
+       int ret;
+       struct consumer_socket *socket;
+
+       assert(app);
+       assert(ua_sess);
+
+       /* Ignore if no metadata. Valid since it can be called on unregister. */
+       if (!ua_sess->metadata) {
+               ret = 0;
+               goto error;
+       }
+
+       rcu_read_lock();
+
+       /* Get consumer socket to use to push the metadata.*/
+       socket = find_consumer_socket_by_bitness(app->bits_per_long,
+                       ua_sess->consumer);
+       if (!socket) {
+               ret = -1;
+               goto error_rcu_unlock;
+       }
+
+       ret = ust_consumer_close_metadata(socket, ua_sess->metadata);
+       if (ret < 0) {
+               goto error_rcu_unlock;
+       }
+
+error_rcu_unlock:
+       /* Destroy metadata on our side since we must not use it anymore. */
+       delete_ust_app_channel(-1, ua_sess->metadata, app);
+       ua_sess->metadata = NULL;
+
+       rcu_read_unlock();
+error:
+       return ret;
+}
+
 /*
  * Delete ust app session safely. RCU read lock must be held before calling
  * this function.
  */
 static
-void delete_ust_app_session(int sock, struct ust_app_session *ua_sess)
+void delete_ust_app_session(int sock, struct ust_app_session *ua_sess,
+               struct ust_app *app)
 {
        int ret;
        struct lttng_ht_iter iter;
        struct ust_app_channel *ua_chan;
 
+       assert(ua_sess);
+
        if (ua_sess->metadata) {
-               delete_ust_app_channel(sock, ua_sess->metadata);
+               /* Push metadata for application before freeing the application. */
+               (void) push_metadata(app, ua_sess);
+
+               /* And ask to close it for this session. */
+               (void) close_metadata(app, ua_sess);
        }
 
        cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
                        node.node) {
                ret = lttng_ht_del(ua_sess->channels, &iter);
                assert(!ret);
-               delete_ust_app_channel(sock, ua_chan);
+               delete_ust_app_channel(sock, ua_chan, app);
        }
        lttng_ht_destroy(ua_sess->channels);
 
+       ust_registry_session_destroy(&ua_sess->registry);
+
        if (ua_sess->handle != -1) {
                ret = ustctl_release_handle(sock, ua_sess->handle);
                if (ret < 0 && ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
@@ -338,7 +522,7 @@ void delete_ust_app(struct ust_app *app)
        cds_list_for_each_entry_safe(ua_sess, tmp_ua_sess, &app->teardown_head,
                        teardown_node) {
                /* Free every object in the session and the session. */
-               delete_ust_app_session(sock, ua_sess);
+               delete_ust_app_session(sock, ua_sess, app);
        }
 
        /*
@@ -385,7 +569,7 @@ void delete_ust_app_rcu(struct rcu_head *head)
  * Delete the session from the application ht and delete the data structure by
  * freeing every object inside and releasing them.
  */
-static void destroy_session(struct ust_app *app,
+static void destroy_app_session(struct ust_app *app,
                struct ust_app_session *ua_sess)
 {
        int ret;
@@ -402,7 +586,7 @@ static void destroy_session(struct ust_app *app,
        }
 
        /* Once deleted, free the data structure. */
-       delete_ust_app_session(app->sock, ua_sess);
+       delete_ust_app_session(app->sock, ua_sess, app);
 
 end:
        return;
@@ -412,7 +596,7 @@ end:
  * Alloc new UST app session.
  */
 static
-struct ust_app_session *alloc_ust_app_session(void)
+struct ust_app_session *alloc_ust_app_session(struct ust_app *app)
 {
        struct ust_app_session *ua_sess;
 
@@ -425,9 +609,15 @@ struct ust_app_session *alloc_ust_app_session(void)
 
        ua_sess->handle = -1;
        ua_sess->channels = lttng_ht_new(0, LTTNG_HT_TYPE_STRING);
-
-       if ((lttng_uuid_generate(ua_sess->uuid))) {
-               ERR("Failed to generate UST uuid");
+       pthread_mutex_init(&ua_sess->lock, NULL);
+       if (ust_registry_session_init(&ua_sess->registry, app,
+                       app->bits_per_long,
+                       app->uint8_t_alignment,
+                       app->uint16_t_alignment,
+                       app->uint32_t_alignment,
+                       app->uint64_t_alignment,
+                       app->long_alignment,
+                       app->byte_order)) {
                goto error;
        }
 
@@ -444,6 +634,7 @@ error_free:
  */
 static
 struct ust_app_channel *alloc_ust_app_channel(char *name,
+               struct ust_app_session *ua_sess,
                struct lttng_ust_channel_attr *attr)
 {
        struct ust_app_channel *ua_chan;
@@ -468,6 +659,9 @@ struct ust_app_channel *alloc_ust_app_channel(char *name,
 
        CDS_INIT_LIST_HEAD(&ua_chan->streams.head);
 
+       /* Initialize UST registry. */
+       ust_registry_channel_init(&ua_sess->registry, &ua_chan->registry);
+
        /* Copy attributes */
        if (attr) {
                /* Translate from lttng_ust_channel to ustctl_consumer_channel_attr. */
@@ -614,6 +808,29 @@ error:
        return NULL;
 }
 
+/*
+ * Find an ust_app using the notify sock and return it. RCU read side lock must
+ * be held before calling this helper function.
+ */
+static struct ust_app *find_app_by_notify_sock(int sock)
+{
+       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_iter iter;
+
+       lttng_ht_lookup(ust_app_ht_by_notify_sock, (void *)((unsigned long) sock),
+                       &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node == NULL) {
+               DBG2("UST app find by notify sock %d not found", sock);
+               goto error;
+       }
+
+       return caa_container_of(node, struct ust_app, notify_sock_n);
+
+error:
+       return NULL;
+}
+
 /*
  * Lookup for an ust app event based on event name, filter bytecode and the
  * event loglevel.
@@ -652,6 +869,8 @@ end:
 
 /*
  * Create the channel context on the tracer.
+ *
+ * Called with UST app session lock held.
  */
 static
 int create_ust_channel_context(struct ust_app_channel *ua_chan,
@@ -675,7 +894,8 @@ int create_ust_channel_context(struct ust_app_channel *ua_chan,
 
        ua_ctx->handle = ua_ctx->obj->handle;
 
-       DBG2("UST app context created successfully for channel %s", ua_chan->name);
+       DBG2("UST app context handle %d created successfully for channel %s",
+                       ua_ctx->handle, ua_chan->name);
 
 error:
        health_code_update();
@@ -840,7 +1060,8 @@ error:
 }
 
 /*
- * Create the specified channel onto the UST tracer for a UST session.
+ * Create the specified channel onto the UST tracer for a UST session. This
+ * MUST be called with UST app session lock held.
  *
  * Return 0 on success. On error, a negative value is returned.
  */
@@ -858,6 +1079,7 @@ static int create_ust_channel(struct ust_app *app,
        assert(ua_chan);
        assert(consumer);
 
+       rcu_read_lock();
        health_code_update();
 
        /* Get the right consumer socket for the application. */
@@ -880,7 +1102,7 @@ static int create_ust_channel(struct ust_app *app,
 
        /*
         * Compute the number of fd needed before receiving them. It must be 2 per
-        * stream.
+        * stream (2 being the default value here).
         */
        nb_fd = DEFAULT_UST_STREAM_FD_NUM * ua_chan->expected_stream_count;
 
@@ -908,6 +1130,8 @@ static int create_ust_channel(struct ust_app *app,
                goto error;
        }
 
+       health_code_update();
+
        /* Send all streams to application. */
        cds_list_for_each_entry_safe(stream, stmp, &ua_chan->streams.head, list) {
                ret = ust_consumer_send_stream_to_ust(app, ua_chan, stream);
@@ -921,6 +1145,11 @@ static int create_ust_channel(struct ust_app *app,
 
        /* Flag the channel that it is sent to the application. */
        ua_chan->is_sent = 1;
+       /* Assign session to channel. */
+       ua_chan->session = ua_sess;
+       /* Initialize ust objd object using the received handle and add it. */
+       lttng_ht_node_init_ulong(&ua_chan->ust_objd_node, ua_chan->handle);
+       lttng_ht_add_unique_ulong(app->ust_objd, &ua_chan->ust_objd_node);
 
        health_code_update();
 
@@ -932,6 +1161,7 @@ static int create_ust_channel(struct ust_app *app,
                }
        }
 
+       rcu_read_unlock();
        return 0;
 
 error_destroy:
@@ -946,11 +1176,14 @@ error_fd_get:
        (void) ust_consumer_destroy_channel(socket, ua_chan);
 error:
        health_code_update();
+       rcu_read_unlock();
        return ret;
 }
 
 /*
  * Create the specified event onto the UST tracer for a UST session.
+ *
+ * Should be called with session mutex held.
  */
 static
 int create_ust_event(struct ust_app *app, struct ust_app_session *ua_sess,
@@ -1091,7 +1324,7 @@ static void shadow_copy_channel(struct ust_app_channel *ua_chan,
                                continue;
                        }
                        shadow_copy_event(ua_event, uevent);
-                       add_unique_ust_app_event(ua_chan->events, ua_event);
+                       add_unique_ust_app_event(ua_chan, ua_event);
                }
        }
 
@@ -1148,7 +1381,7 @@ static void shadow_copy_session(struct ust_app_session *ua_sess,
 
                DBG2("Channel %s not found on shadow session copy, creating it",
                                uchan->name);
-               ua_chan = alloc_ust_app_channel(uchan->name, &uchan->attr);
+               ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr);
                if (ua_chan == NULL) {
                        /* malloc failed FIXME: Might want to do handle ENOMEM .. */
                        continue;
@@ -1226,7 +1459,7 @@ static int create_ust_app_session(struct ltt_ust_session *usess,
        if (ua_sess == NULL) {
                DBG2("UST app pid: %d session id %d not found, creating it",
                                app->pid, usess->id);
-               ua_sess = alloc_ust_app_session();
+               ua_sess = alloc_ust_app_session(app);
                if (ua_sess == NULL) {
                        /* Only malloc can failed so something is really wrong */
                        ret = -ENOMEM;
@@ -1247,7 +1480,7 @@ static int create_ust_app_session(struct ltt_ust_session *usess,
                        } else {
                                DBG("UST app creating session failed. Application is dead");
                        }
-                       delete_ust_app_session(-1, ua_sess);
+                       delete_ust_app_session(-1, ua_sess, app);
                        if (ret != -ENOMEM) {
                                /*
                                 * Tracer is probably gone or got an internal error so let's
@@ -1267,10 +1500,19 @@ static int create_ust_app_session(struct ltt_ust_session *usess,
                DBG2("UST app session created successfully with handle %d", ret);
        }
 
+       /*
+        * Assign consumer if not already set. For one application, there is only
+        * one possible consumer has of now.
+        */
+       if (!ua_sess->consumer) {
+               ua_sess->consumer = usess->consumer;
+       }
+
        *ua_sess_ptr = ua_sess;
        if (is_created) {
                *is_created = created;
        }
+
        /* Everything went well. */
        ret = 0;
 
@@ -1281,6 +1523,8 @@ error:
 
 /*
  * Create a context for the channel on the tracer.
+ *
+ * Called with UST app session lock held.
  */
 static
 int create_ust_app_channel_context(struct ust_app_session *ua_sess,
@@ -1322,6 +1566,8 @@ error:
 
 /*
  * Enable on the tracer side a ust app event for the session and channel.
+ *
+ * Called with UST app session lock held.
  */
 static
 int enable_ust_app_event(struct ust_app_session *ua_sess,
@@ -1412,6 +1658,8 @@ error:
 /*
  * Create UST app channel and create it on the tracer. Set ua_chanp of the
  * newly created channel if not NULL.
+ *
+ * Called with UST app session lock held.
  */
 static int create_ust_app_channel(struct ust_app_session *ua_sess,
                struct ltt_ust_channel *uchan, struct ust_app *app,
@@ -1431,7 +1679,7 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess,
                goto end;
        }
 
-       ua_chan = alloc_ust_app_channel(uchan->name, &uchan->attr);
+       ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr);
        if (ua_chan == NULL) {
                /* Only malloc can fail here */
                ret = -ENOMEM;
@@ -1447,12 +1695,12 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess,
                goto error;
        }
 
-       /* Only add the channel if successful on the tracer side. */
-       lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node);
-
        DBG2("UST app create channel %s for PID %d completed", ua_chan->name,
                        app->pid);
 
+       /* Only add the channel if successful on the tracer side. */
+       lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node);
+
 end:
        if (ua_chanp) {
                *ua_chanp = ua_chan;
@@ -1462,12 +1710,14 @@ end:
        return 0;
 
 error:
-       delete_ust_app_channel(ua_chan->is_sent ? app->sock : -1, ua_chan);
+       delete_ust_app_channel(ua_chan->is_sent ? app->sock : -1, ua_chan, app);
        return ret;
 }
 
 /*
  * Create UST app event and create it on the tracer side.
+ *
+ * Called with ust app session mutex held.
  */
 static
 int create_ust_app_event(struct ust_app_session *ua_sess,
@@ -1502,7 +1752,7 @@ int create_ust_app_event(struct ust_app_session *ua_sess,
                goto error;
        }
 
-       add_unique_ust_app_event(ua_chan->events, ua_event);
+       add_unique_ust_app_event(ua_chan, ua_event);
 
        DBG2("UST app create event %s for PID %d completed", ua_event->name,
                        app->pid);
@@ -1518,15 +1768,19 @@ error:
 
 /*
  * Create UST metadata and open it on the tracer side.
+ *
+ * Called with UST app session lock held.
  */
 static int create_ust_app_metadata(struct ust_app_session *ua_sess,
                struct ust_app *app, struct consumer_output *consumer)
 {
        int ret = 0;
        struct ust_app_channel *metadata;
+       struct consumer_socket *socket;
 
        assert(ua_sess);
        assert(app);
+       assert(consumer);
 
        if (ua_sess->metadata) {
                /* Already exist. Return success. */
@@ -1534,7 +1788,7 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
        }
 
        /* Allocate UST metadata */
-       metadata = alloc_ust_app_channel(DEFAULT_METADATA_NAME, NULL);
+       metadata = alloc_ust_app_channel(DEFAULT_METADATA_NAME, ua_sess, NULL);
        if (!metadata) {
                /* malloc() failed */
                ret = -ENOMEM;
@@ -1545,24 +1799,48 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
        metadata->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE;
        metadata->attr.subbuf_size = default_get_metadata_subbuf_size();
        metadata->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM;
-       metadata->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER;
-       metadata->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER;
+       metadata->attr.switch_timer_interval = DEFAULT_UST_CHANNEL_SWITCH_TIMER;
+       metadata->attr.read_timer_interval = DEFAULT_UST_CHANNEL_READ_TIMER;
        metadata->attr.output = LTTNG_UST_MMAP;
        metadata->attr.type = LTTNG_UST_CHAN_METADATA;
 
-       ret = create_ust_channel(app, ua_sess, metadata, consumer);
+       /* Get the right consumer socket for the application. */
+       socket = find_consumer_socket_by_bitness(app->bits_per_long, consumer);
+       if (!socket) {
+               ret = -EINVAL;
+               goto error_consumer;
+       }
+
+       /*
+        * Ask the metadata channel creation to the consumer. The metadata object
+        * will be created by the consumer and kept their. However, the stream is
+        * never added or monitored until we do a first push metadata to the
+        * consumer.
+        */
+       ret = ust_consumer_ask_channel(ua_sess, metadata, consumer, socket);
+       if (ret < 0) {
+               goto error_consumer;
+       }
+
+       /*
+        * The setup command will make the metadata stream be sent to the relayd,
+        * if applicable, and the thread managing the metadatas. This is important
+        * because after this point, if an error occurs, the only way the stream
+        * can be deleted is to be monitored in the consumer.
+        */
+       ret = ust_consumer_setup_metadata(socket, metadata);
        if (ret < 0) {
-               goto error_create;
+               goto error_consumer;
        }
 
        ua_sess->metadata = metadata;
 
-       DBG2("UST metadata opened for app pid %d", app->pid);
+       DBG2("UST metadata created for app pid %d", app->pid);
 
 end:
        return 0;
-error_create:
-       delete_ust_app_channel(metadata->is_sent ? app->sock : -1, metadata);
+error_consumer:
+       delete_ust_app_channel(-1, metadata, app);
 error:
        return ret;
 }
@@ -1576,114 +1854,159 @@ struct lttng_ht *ust_app_get_ht(void)
 }
 
 /*
- * Return ust app pointer or NULL if not found.
+ * Return ust app pointer or NULL if not found. RCU read side lock MUST be
+ * acquired before calling this function.
  */
 struct ust_app *ust_app_find_by_pid(pid_t pid)
 {
+       struct ust_app *app = NULL;
        struct lttng_ht_node_ulong *node;
        struct lttng_ht_iter iter;
 
-       rcu_read_lock();
        lttng_ht_lookup(ust_app_ht, (void *)((unsigned long) pid), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node == NULL) {
                DBG2("UST app no found with pid %d", pid);
                goto error;
        }
-       rcu_read_unlock();
 
        DBG2("Found UST app by pid %d", pid);
 
-       return caa_container_of(node, struct ust_app, pid_n);
+       app = caa_container_of(node, struct ust_app, pid_n);
 
 error:
-       rcu_read_unlock();
-       return NULL;
+       return app;
 }
 
 /*
- * Using pid and uid (of the app), allocate a new ust_app struct and
- * add it to the global traceable app list.
+ * Allocate and init an UST app object using the registration information and
+ * the command socket. This is called when the command socket connects to the
+ * session daemon.
  *
- * On success, return 0, else return malloc -ENOMEM, or -EINVAL if app
- * bitness is not supported.
+ * The object is returned on success or else NULL.
  */
-int ust_app_register(struct ust_register_msg *msg, int sock)
+struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock)
 {
-       struct ust_app *lta;
-       int ret;
+       struct ust_app *lta = NULL;
+
+       assert(msg);
+       assert(sock >= 0);
+
+       DBG3("UST app creating application for socket %d", sock);
 
        if ((msg->bits_per_long == 64 &&
                                (uatomic_read(&ust_consumerd64_fd) == -EINVAL))
                        || (msg->bits_per_long == 32 &&
                                (uatomic_read(&ust_consumerd32_fd) == -EINVAL))) {
                ERR("Registration failed: application \"%s\" (pid: %d) has "
-                       "%d-bit long, but no consumerd for this long size is available.\n",
-                       msg->name, msg->pid, msg->bits_per_long);
-               ret = close(sock);
-               if (ret) {
-                       PERROR("close");
-               }
-               lttng_fd_put(LTTNG_FD_APPS, 1);
-               return -EINVAL;
-       }
-       if (msg->major != LTTNG_UST_COMM_MAJOR) {
-               ERR("Registration failed: application \"%s\" (pid: %d) has "
-                       "communication protocol version %u.%u, but sessiond supports 2.x.\n",
-                       msg->name, msg->pid, msg->major, msg->minor);
-               ret = close(sock);
-               if (ret) {
-                       PERROR("close");
-               }
-               lttng_fd_put(LTTNG_FD_APPS, 1);
-               return -EINVAL;
+                               "%d-bit long, but no consumerd for this size is available.\n",
+                               msg->name, msg->pid, msg->bits_per_long);
+               goto error;
        }
+
        lta = zmalloc(sizeof(struct ust_app));
        if (lta == NULL) {
                PERROR("malloc");
-               return -ENOMEM;
+               goto error;
        }
 
        lta->ppid = msg->ppid;
        lta->uid = msg->uid;
        lta->gid = msg->gid;
-       lta->compatible = 0;  /* Not compatible until proven */
+
        lta->bits_per_long = msg->bits_per_long;
+       lta->uint8_t_alignment = msg->uint8_t_alignment;
+       lta->uint16_t_alignment = msg->uint16_t_alignment;
+       lta->uint32_t_alignment = msg->uint32_t_alignment;
+       lta->uint64_t_alignment = msg->uint64_t_alignment;
+       lta->long_alignment = msg->long_alignment;
+       lta->byte_order = msg->byte_order;
+
        lta->v_major = msg->major;
        lta->v_minor = msg->minor;
-       strncpy(lta->name, msg->name, sizeof(lta->name));
-       lta->name[16] = '\0';
        lta->sessions = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       lta->ust_objd = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       lta->notify_sock = -1;
+
+       /* Copy name and make sure it's NULL terminated. */
+       strncpy(lta->name, msg->name, sizeof(lta->name));
+       lta->name[UST_APP_PROCNAME_LEN] = '\0';
+
+       /*
+        * Before this can be called, when receiving the registration information,
+        * the application compatibility is checked. So, at this point, the
+        * application can work with this session daemon.
+        */
+       lta->compatible = 1;
 
        lta->pid = msg->pid;
-       lttng_ht_node_init_ulong(&lta->pid_n, (unsigned long)lta->pid);
+       lttng_ht_node_init_ulong(&lta->pid_n, (unsigned long) lta->pid);
        lta->sock = sock;
-       lttng_ht_node_init_ulong(&lta->sock_n, (unsigned long)lta->sock);
+       lttng_ht_node_init_ulong(&lta->sock_n, (unsigned long) lta->sock);
 
        CDS_INIT_LIST_HEAD(&lta->teardown_head);
 
+error:
+       return lta;
+}
+
+/*
+ * For a given application object, add it to every hash table.
+ */
+void ust_app_add(struct ust_app *app)
+{
+       assert(app);
+       assert(app->notify_sock >= 0);
+
        rcu_read_lock();
 
        /*
         * On a re-registration, we want to kick out the previous registration of
         * that pid
         */
-       lttng_ht_add_replace_ulong(ust_app_ht, &lta->pid_n);
+       lttng_ht_add_replace_ulong(ust_app_ht, &app->pid_n);
 
        /*
         * The socket _should_ be unique until _we_ call close. So, a add_unique
         * for the ust_app_ht_by_sock is used which asserts fail if the entry was
         * already in the table.
         */
-       lttng_ht_add_unique_ulong(ust_app_ht_by_sock, &lta->sock_n);
+       lttng_ht_add_unique_ulong(ust_app_ht_by_sock, &app->sock_n);
+
+       /* Add application to the notify socket hash table. */
+       lttng_ht_node_init_ulong(&app->notify_sock_n, app->notify_sock);
+       lttng_ht_add_unique_ulong(ust_app_ht_by_notify_sock, &app->notify_sock_n);
+
+       DBG("App registered with pid:%d ppid:%d uid:%d gid:%d sock:%d name:%s "
+                       "notify_sock:%d (version %d.%d)", app->pid, app->ppid, app->uid,
+                       app->gid, app->sock, app->name, app->notify_sock, app->v_major,
+                       app->v_minor);
 
        rcu_read_unlock();
+}
 
-       DBG("App registered with pid:%d ppid:%d uid:%d gid:%d sock:%d name:%s"
-                       " (version %d.%d)", lta->pid, lta->ppid, lta->uid, lta->gid,
-                       lta->sock, lta->name, lta->v_major, lta->v_minor);
+/*
+ * Set the application version into the object.
+ *
+ * Return 0 on success else a negative value either an errno code or a
+ * LTTng-UST error code.
+ */
+int ust_app_version(struct ust_app *app)
+{
+       int ret;
 
-       return 0;
+       assert(app);
+
+       ret = ustctl_tracer_version(app->sock, &app->version);
+       if (ret < 0) {
+               if (ret != -LTTNG_UST_ERR_EXITING && ret != -EPIPE) {
+                       ERR("UST app %d verson failed with ret %d", app->sock, ret);
+               } else {
+                       DBG3("UST app %d verion failed. Application is dead", app->sock);
+               }
+       }
+
+       return ret;
 }
 
 /*
@@ -1705,27 +2028,30 @@ void ust_app_unregister(int sock)
        /* Get the node reference for a call_rcu */
        lttng_ht_lookup(ust_app_ht_by_sock, (void *)((unsigned long) sock), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
-       if (node == NULL) {
-               ERR("Unable to find app by sock %d", sock);
-               goto error;
-       }
+       assert(node);
 
        lta = caa_container_of(node, struct ust_app, sock_n);
-
        DBG("PID %d unregistering with sock %d", lta->pid, sock);
 
        /* Remove application from PID hash table */
        ret = lttng_ht_del(ust_app_ht_by_sock, &iter);
        assert(!ret);
 
-       /* Assign second node for deletion */
-       iter.iter.node = &lta->pid_n.node;
+       /*
+        * Remove application from notify hash table. The thread handling the
+        * notify socket could have deleted the node so ignore on error because
+        * either way it's valid. The close of that socket is handled by the other
+        * thread.
+        */
+       iter.iter.node = &lta->notify_sock_n.node;
+       (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
 
        /*
         * Ignore return value since the node might have been removed before by an
         * add replace during app registration because the PID can be reassigned by
         * the OS.
         */
+       iter.iter.node = &lta->pid_n.node;
        ret = lttng_ht_del(ust_app_ht, &iter);
        if (ret) {
                DBG3("Unregister app by PID %d failed. This can happen on pid reuse",
@@ -1745,13 +2071,29 @@ void ust_app_unregister(int sock)
                 * Add session to list for teardown. This is safe since at this point we
                 * are the only one using this list.
                 */
+               pthread_mutex_lock(&ua_sess->lock);
+
+               /*
+                * Normally, this is done in the delete session process which is
+                * executed in the call rcu below. However, upon registration we can't
+                * afford to wait for the grace period before pushing data or else the
+                * data pending feature can race between the unregistration and stop
+                * command where the data pending command is sent *before* the grace
+                * period ended.
+                *
+                * The close metadata below nullifies the metadata pointer in the
+                * session so the delete session will NOT push/close a second time.
+                */
+               (void) push_metadata(lta, ua_sess);
+               (void) close_metadata(lta, ua_sess);
+
                cds_list_add(&ua_sess->teardown_node, &lta->teardown_head);
+               pthread_mutex_unlock(&ua_sess->lock);
        }
 
        /* Free memory */
        call_rcu(&lta->pid_n.head, delete_ust_app_rcu);
 
-error:
        rcu_read_unlock();
        return;
 }
@@ -1989,9 +2331,17 @@ void ust_app_clean_list(void)
                assert(!ret);
        }
 
+       /* Cleanup notify socket hash table */
+       cds_lfht_for_each_entry(ust_app_ht_by_notify_sock->ht, &iter.iter, app,
+                       notify_sock_n.node) {
+               ret = lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+               assert(!ret);
+       }
+
        /* Destroy is done only when the ht is empty */
        lttng_ht_destroy(ust_app_ht);
        lttng_ht_destroy(ust_app_ht_by_sock);
+       lttng_ht_destroy(ust_app_ht_by_notify_sock);
 
        rcu_read_unlock();
 }
@@ -2003,6 +2353,7 @@ void ust_app_ht_alloc(void)
 {
        ust_app_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        ust_app_ht_by_sock = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       ust_app_ht_by_notify_sock = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 }
 
 /*
@@ -2293,9 +2644,11 @@ int ust_app_create_channel_glb(struct ltt_ust_session *usess,
                }
                assert(ua_sess);
 
+               pthread_mutex_lock(&ua_sess->lock);
                /* Create channel onto application. We don't need the chan ref. */
                ret = create_ust_app_channel(ua_sess, uchan, app, usess->consumer,
                                LTTNG_UST_CHAN_PER_CPU, NULL);
+               pthread_mutex_unlock(&ua_sess->lock);
                if (ret < 0) {
                        if (ret == -ENOMEM) {
                                /* No more memory is a fatal error. Stop right now. */
@@ -2303,7 +2656,7 @@ int ust_app_create_channel_glb(struct ltt_ust_session *usess,
                        }
                        /* Cleanup the created session if it's the case. */
                        if (created) {
-                               destroy_session(app, ua_sess);
+                               destroy_app_session(app, ua_sess);
                        }
                }
        }
@@ -2353,6 +2706,8 @@ int ust_app_enable_event_glb(struct ltt_ust_session *usess,
                        continue;
                }
 
+               pthread_mutex_lock(&ua_sess->lock);
+
                /* Lookup channel in the ust app session */
                lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter);
                ua_chan_node = lttng_ht_iter_get_node_str(&uiter);
@@ -2367,13 +2722,16 @@ int ust_app_enable_event_glb(struct ltt_ust_session *usess,
                if (ua_event == NULL) {
                        DBG3("UST app enable event %s not found for app PID %d."
                                        "Skipping app", uevent->attr.name, app->pid);
-                       continue;
+                       goto next_app;
                }
 
                ret = enable_ust_app_event(ua_sess, ua_event, app);
                if (ret < 0) {
+                       pthread_mutex_unlock(&ua_sess->lock);
                        goto error;
                }
+       next_app:
+               pthread_mutex_unlock(&ua_sess->lock);
        }
 
 error:
@@ -2415,6 +2773,7 @@ int ust_app_create_event_glb(struct ltt_ust_session *usess,
                        continue;
                }
 
+               pthread_mutex_lock(&ua_sess->lock);
                /* Lookup channel in the ust app session */
                lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter);
                ua_chan_node = lttng_ht_iter_get_node_str(&uiter);
@@ -2424,6 +2783,7 @@ int ust_app_create_event_glb(struct ltt_ust_session *usess,
                ua_chan = caa_container_of(ua_chan_node, struct ust_app_channel, node);
 
                ret = create_ust_app_event(ua_sess, ua_chan, uevent, app);
+               pthread_mutex_unlock(&ua_sess->lock);
                if (ret < 0) {
                        if (ret != -LTTNG_UST_ERR_EXIST) {
                                /* Possible value at this point: -ENOMEM. If so, we stop! */
@@ -2462,6 +2822,8 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app)
                goto end;
        }
 
+       pthread_mutex_lock(&ua_sess->lock);
+
        /* Upon restart, we skip the setup, already done */
        if (ua_sess->started) {
                goto skip_setup;
@@ -2475,7 +2837,7 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app)
                if (ret < 0) {
                        if (ret != -EEXIST) {
                                ERR("Trace directory creation error");
-                               goto error_rcu_unlock;
+                               goto error_unlock;
                        }
                }
        }
@@ -2483,7 +2845,7 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app)
        /* Create the metadata for the application. */
        ret = create_ust_app_metadata(ua_sess, app, usess->consumer);
        if (ret < 0) {
-               goto error_rcu_unlock;
+               goto error_unlock;
        }
 
        health_code_update();
@@ -2498,12 +2860,14 @@ skip_setup:
                } else {
                        DBG("UST app start session failed. Application is dead.");
                }
-               goto error_rcu_unlock;
+               goto error_unlock;
        }
 
        /* Indicate that the session has been started once */
        ua_sess->started = 1;
 
+       pthread_mutex_unlock(&ua_sess->lock);
+
        health_code_update();
 
        /* Quiescent wait after starting trace */
@@ -2518,7 +2882,8 @@ end:
        health_code_update();
        return 0;
 
-error_rcu_unlock:
+error_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
        rcu_read_unlock();
        health_code_update();
        return -1;
@@ -2539,14 +2904,16 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app)
        rcu_read_lock();
 
        if (!app->compatible) {
-               goto end;
+               goto end_no_session;
        }
 
        ua_sess = lookup_session_by_app(usess, app);
        if (ua_sess == NULL) {
-               goto end;
+               goto end_no_session;
        }
 
+       pthread_mutex_lock(&ua_sess->lock);
+
        /*
         * If started = 0, it means that stop trace has been called for a session
         * that was never started. It's possible since we can have a fail start
@@ -2596,7 +2963,7 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app)
                                DBG3("UST app failed to flush %s. Application is dead.",
                                                ua_chan->name);
                                /* No need to continue. */
-                               goto end;
+                               break;
                        }
                        /* Continuing flushing all buffers */
                        continue;
@@ -2605,25 +2972,19 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app)
 
        health_code_update();
 
-       assert(ua_sess->metadata->is_sent);
-       /* Flush all buffers before stopping */
-       ret = ustctl_sock_flush_buffer(app->sock, ua_sess->metadata->obj);
+       ret = push_metadata(app, ua_sess);
        if (ret < 0) {
-               if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
-                       ERR("UST app PID %d metadata flush failed with ret %d", app->pid,
-                                       ret);
-                       goto error_rcu_unlock;
-               } else {
-                       DBG3("UST app failed to flush metadata. Application is dead.");
-               }
+               goto error_rcu_unlock;
        }
 
-end:
+       pthread_mutex_unlock(&ua_sess->lock);
+end_no_session:
        rcu_read_unlock();
        health_code_update();
        return 0;
 
 error_rcu_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
        rcu_read_unlock();
        health_code_update();
        return -1;
@@ -2656,7 +3017,7 @@ static int destroy_trace(struct ltt_ust_session *usess, struct ust_app *app)
        ua_sess = caa_container_of(node, struct ust_app_session, node);
 
        health_code_update();
-       destroy_session(app, ua_sess);
+       destroy_app_session(app, ua_sess);
 
        health_code_update();
 
@@ -2666,7 +3027,6 @@ static int destroy_trace(struct ltt_ust_session *usess, struct ust_app *app)
                ERR("UST app wait quiescent failed for app pid %d ret %d",
                                app->pid, ret);
        }
-
 end:
        rcu_read_unlock();
        health_code_update();
@@ -2715,6 +3075,7 @@ int ust_app_stop_trace_all(struct ltt_ust_session *usess)
        cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
                ret = ust_app_stop_trace(usess, app);
                if (ret < 0) {
+                       ERR("UST app stop trace failed with ret %d", ret);
                        /* Continue to next apps even on error */
                        continue;
                }
@@ -2774,7 +3135,11 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock)
 
        app = find_app_by_sock(sock);
        if (app == NULL) {
-               ERR("Failed to find app sock %d", sock);
+               /*
+                * Application can be unregistered before so this is possible hence
+                * simply stopping the update.
+                */
+               DBG3("UST app update failed to find app sock %d", sock);
                goto error;
        }
 
@@ -2789,8 +3154,10 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock)
        }
        assert(ua_sess);
 
+       pthread_mutex_lock(&ua_sess->lock);
+
        /*
-        * We can iterate safely here over all UST app session sicne the create ust
+        * We can iterate safely here over all UST app session since the create ust
         * app session above made a shadow copy of the UST global domain from the
         * ltt ust session.
         */
@@ -2803,14 +3170,14 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock)
                         * descriptor are available or ENOMEM so stopping here is the only
                         * thing we can do for now.
                         */
-                       goto error;
+                       goto error_unlock;
                }
 
                cds_lfht_for_each_entry(ua_chan->ctx->ht, &iter_ctx.iter, ua_ctx,
                                node.node) {
                        ret = create_ust_channel_context(ua_chan, ua_ctx, app);
                        if (ret < 0) {
-                               goto error;
+                               goto error_unlock;
                        }
                }
 
@@ -2820,11 +3187,13 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock)
                                node.node) {
                        ret = create_ust_event(app, ua_sess, ua_chan, ua_event);
                        if (ret < 0) {
-                               goto error;
+                               goto error_unlock;
                        }
                }
        }
 
+       pthread_mutex_unlock(&ua_sess->lock);
+
        if (usess->start_trace) {
                ret = ust_app_start_trace(usess, app);
                if (ret < 0) {
@@ -2838,9 +3207,11 @@ void ust_app_global_update(struct ltt_ust_session *usess, int sock)
        rcu_read_unlock();
        return;
 
+error_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
 error:
        if (ua_sess) {
-               destroy_session(app, ua_sess);
+               destroy_app_session(app, ua_sess);
        }
        rcu_read_unlock();
        return;
@@ -2874,19 +3245,21 @@ int ust_app_add_ctx_channel_glb(struct ltt_ust_session *usess,
                        continue;
                }
 
+               pthread_mutex_lock(&ua_sess->lock);
                /* Lookup channel in the ust app session */
                lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter);
                ua_chan_node = lttng_ht_iter_get_node_str(&uiter);
                if (ua_chan_node == NULL) {
-                       continue;
+                       goto next_app;
                }
                ua_chan = caa_container_of(ua_chan_node, struct ust_app_channel,
                                node);
-
                ret = create_ust_app_channel_context(ua_sess, ua_chan, &uctx->ctx, app);
                if (ret < 0) {
-                       continue;
+                       goto next_app;
                }
+       next_app:
+               pthread_mutex_unlock(&ua_sess->lock);
        }
 
        rcu_read_unlock();
@@ -2915,20 +3288,22 @@ int ust_app_enable_event_pid(struct ltt_ust_session *usess,
        if (app == NULL) {
                ERR("UST app enable event per PID %d not found", pid);
                ret = -1;
-               goto error;
+               goto end;
        }
 
        if (!app->compatible) {
                ret = 0;
-               goto error;
+               goto end;
        }
 
        ua_sess = lookup_session_by_app(usess, app);
        if (!ua_sess) {
                /* The application has problem or is probably dead. */
-               goto error;
+               ret = 0;
+               goto end;
        }
 
+       pthread_mutex_lock(&ua_sess->lock);
        /* Lookup channel in the ust app session */
        lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &iter);
        ua_chan_node = lttng_ht_iter_get_node_str(&iter);
@@ -2942,16 +3317,18 @@ int ust_app_enable_event_pid(struct ltt_ust_session *usess,
        if (ua_event == NULL) {
                ret = create_ust_app_event(ua_sess, ua_chan, uevent, app);
                if (ret < 0) {
-                       goto error;
+                       goto end_unlock;
                }
        } else {
                ret = enable_ust_app_event(ua_sess, ua_event, app);
                if (ret < 0) {
-                       goto error;
+                       goto end_unlock;
                }
        }
 
-error:
+end_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
+end:
        rcu_read_unlock();
        return ret;
 }
@@ -3019,52 +3396,6 @@ error:
        return ret;
 }
 
-/*
- * Validate version of UST apps and set the compatible bit.
- */
-int ust_app_validate_version(int sock)
-{
-       int ret;
-       struct ust_app *app;
-
-       rcu_read_lock();
-
-       app = find_app_by_sock(sock);
-       assert(app);
-
-       health_code_update();
-
-       ret = ustctl_tracer_version(sock, &app->version);
-       if (ret < 0) {
-               if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
-                       ERR("UST app tracer version failed for app pid %d", app->pid);
-               }
-               goto error;
-       }
-
-       /* Validate version */
-       if (app->version.major != UST_APP_MAJOR_VERSION) {
-               goto error;
-       }
-
-       DBG2("UST app PID %d is compatible with internal major version %d "
-                       "(supporting == %d)", app->pid, app->version.major,
-                       UST_APP_MAJOR_VERSION);
-       app->compatible = 1;
-       rcu_read_unlock();
-       health_code_update();
-       return 0;
-
-error:
-       DBG2("UST app PID %d is not compatible with internal major version %d "
-                       "(supporting == %d)", app->pid, app->version.major,
-                       UST_APP_MAJOR_VERSION);
-       app->compatible = 0;
-       rcu_read_unlock();
-       health_code_update();
-       return -1;
-}
-
 /*
  * Calibrate registered applications.
  */
@@ -3110,3 +3441,409 @@ int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate)
 
        return ret;
 }
+
+/*
+ * Receive registration and populate the given msg structure.
+ *
+ * On success return 0 else a negative value returned by the ustctl call.
+ */
+int ust_app_recv_registration(int sock, struct ust_register_msg *msg)
+{
+       int ret;
+       uint32_t pid, ppid, uid, gid;
+
+       assert(msg);
+
+       ret = ustctl_recv_reg_msg(sock, &msg->type, &msg->major, &msg->minor,
+                       &pid, &ppid, &uid, &gid,
+                       &msg->bits_per_long,
+                       &msg->uint8_t_alignment,
+                       &msg->uint16_t_alignment,
+                       &msg->uint32_t_alignment,
+                       &msg->uint64_t_alignment,
+                       &msg->long_alignment,
+                       &msg->byte_order,
+                       msg->name);
+       if (ret < 0) {
+               switch (-ret) {
+               case EPIPE:
+               case ECONNRESET:
+               case LTTNG_UST_ERR_EXITING:
+                       DBG3("UST app recv reg message failed. Application died");
+                       break;
+               case LTTNG_UST_ERR_UNSUP_MAJOR:
+                       ERR("UST app recv reg unsupported version %d.%d. Supporting %d.%d",
+                                       msg->major, msg->minor, LTTNG_UST_ABI_MAJOR_VERSION,
+                                       LTTNG_UST_ABI_MINOR_VERSION);
+                       break;
+               default:
+                       ERR("UST app recv reg message failed with ret %d", ret);
+                       break;
+               }
+               goto error;
+       }
+       msg->pid = (pid_t) pid;
+       msg->ppid = (pid_t) ppid;
+       msg->uid = (uid_t) uid;
+       msg->gid = (gid_t) gid;
+
+error:
+       return ret;
+}
+
+/*
+ * Return a ust app channel object using the application object and the channel
+ * object descriptor has a key. If not found, NULL is returned. A RCU read side
+ * lock MUST be acquired before calling this function.
+ */
+static struct ust_app_channel *find_channel_by_objd(struct ust_app *app,
+               int objd)
+{
+       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_iter iter;
+       struct ust_app_channel *ua_chan = NULL;
+
+       assert(app);
+
+       lttng_ht_lookup(app->ust_objd, (void *)((unsigned long) objd), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node == NULL) {
+               DBG2("UST app channel find by objd %d not found", objd);
+               goto error;
+       }
+
+       ua_chan = caa_container_of(node, struct ust_app_channel, ust_objd_node);
+
+error:
+       return ua_chan;
+}
+
+/*
+ * Reply to a register channel notification from an application on the notify
+ * socket. The channel metadata is also created.
+ *
+ * The session UST registry lock is acquired in this function.
+ *
+ * On success 0 is returned else a negative value.
+ */
+static int reply_ust_register_channel(int sock, int sobjd, int cobjd,
+               size_t nr_fields, struct ustctl_field *fields)
+{
+       int ret, ret_code = 0;
+       uint32_t chan_id, reg_count;
+       enum ustctl_channel_header type;
+       struct ust_app *app;
+       struct ust_app_channel *ua_chan;
+       struct ust_app_session *ua_sess;
+
+       rcu_read_lock();
+
+       /* Lookup application. If not found, there is a code flow error. */
+       app = find_app_by_notify_sock(sock);
+       if (!app) {
+               DBG("Application socket %d is being teardown. Abort event notify",
+                               sock);
+               ret = 0;
+               goto error_rcu_unlock;
+       }
+
+       /* Lookup channel by UST object descriptor. Should always be found. */
+       ua_chan = find_channel_by_objd(app, cobjd);
+       assert(ua_chan);
+       assert(ua_chan->session);
+       ua_sess = ua_chan->session;
+       assert(ua_sess);
+
+       pthread_mutex_lock(&ua_sess->registry.lock);
+
+       if (ust_registry_is_max_id(ua_chan->session->registry.used_channel_id)) {
+               ret_code = -1;
+               chan_id = -1U;
+               type = -1;
+               goto reply;
+       }
+
+       /* Don't assign ID to metadata. */
+       if (ua_chan->attr.type == LTTNG_UST_CHAN_METADATA) {
+               chan_id = -1U;
+       } else {
+               chan_id = ust_registry_get_next_chan_id(&ua_chan->session->registry);
+       }
+
+       reg_count = ust_registry_get_event_count(&ua_chan->registry);
+       if (reg_count < 31) {
+               type = USTCTL_CHANNEL_HEADER_COMPACT;
+       } else {
+               type = USTCTL_CHANNEL_HEADER_LARGE;
+       }
+
+       ua_chan->registry.nr_ctx_fields = nr_fields;
+       ua_chan->registry.ctx_fields = fields;
+       ua_chan->registry.chan_id = chan_id;
+       ua_chan->registry.header_type = type;
+
+       /* Append to metadata */
+       if (!ret_code) {
+               ret_code = ust_metadata_channel_statedump(&ua_chan->session->registry,
+                               &ua_chan->registry);
+               if (ret_code) {
+                       ERR("Error appending channel metadata (errno = %d)", ret_code);
+                       goto reply;
+               }
+       }
+
+reply:
+       DBG3("UST app replying to register channel with id %u, type: %d, ret: %d",
+                       chan_id, type, ret_code);
+
+       ret = ustctl_reply_register_channel(sock, chan_id, type, ret_code);
+       if (ret < 0) {
+               if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
+                       ERR("UST app reply channel failed with ret %d", ret);
+               } else {
+                       DBG3("UST app reply channel failed. Application died");
+               }
+               goto error;
+       }
+
+error:
+       pthread_mutex_unlock(&ua_sess->registry.lock);
+error_rcu_unlock:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Add event to the UST channel registry. When the event is added to the
+ * registry, the metadata is also created. Once done, this replies to the
+ * application with the appropriate error code.
+ *
+ * The session UST registry lock is acquired in the function.
+ *
+ * On success 0 is returned else a negative value.
+ */
+static int add_event_ust_registry(int sock, int sobjd, int cobjd, char *name,
+               char *sig, size_t nr_fields, struct ustctl_field *fields, int loglevel,
+               char *model_emf_uri)
+{
+       int ret, ret_code;
+       uint32_t event_id = 0;
+       struct ust_app *app;
+       struct ust_app_channel *ua_chan;
+       struct ust_app_session *ua_sess;
+
+       rcu_read_lock();
+
+       /* Lookup application. If not found, there is a code flow error. */
+       app = find_app_by_notify_sock(sock);
+       if (!app) {
+               DBG("Application socket %d is being teardown. Abort event notify",
+                               sock);
+               ret = 0;
+               goto error_rcu_unlock;
+       }
+
+       /* Lookup channel by UST object descriptor. Should always be found. */
+       ua_chan = find_channel_by_objd(app, cobjd);
+       assert(ua_chan);
+       assert(ua_chan->session);
+       ua_sess = ua_chan->session;
+
+       pthread_mutex_lock(&ua_sess->registry.lock);
+
+       ret_code = ust_registry_create_event(&ua_sess->registry,
+                       &ua_chan->registry, sobjd, cobjd, name, sig, nr_fields, fields,
+                       loglevel, model_emf_uri, &event_id);
+
+       /*
+        * The return value is returned to ustctl so in case of an error, the
+        * application can be notified. In case of an error, it's important not to
+        * return a negative error or else the application will get closed.
+        */
+       ret = ustctl_reply_register_event(sock, event_id, ret_code);
+       if (ret < 0) {
+               if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
+                       ERR("UST app reply event failed with ret %d", ret);
+               } else {
+                       DBG3("UST app reply event failed. Application died");
+               }
+               /*
+                * No need to wipe the create event since the application socket will
+                * get close on error hence cleaning up everything by itself.
+                */
+               goto error;
+       }
+
+       DBG3("UST registry event %s has been added successfully", name);
+
+error:
+       pthread_mutex_unlock(&ua_sess->registry.lock);
+error_rcu_unlock:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Handle application notification through the given notify socket.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int ust_app_recv_notify(int sock)
+{
+       int ret;
+       enum ustctl_notify_cmd cmd;
+
+       DBG3("UST app receiving notify from sock %d", sock);
+
+       ret = ustctl_recv_notify(sock, &cmd);
+       if (ret < 0) {
+               if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
+                       ERR("UST app recv notify failed with ret %d", ret);
+               } else {
+                       DBG3("UST app recv notify failed. Application died");
+               }
+               goto error;
+       }
+
+       switch (cmd) {
+       case USTCTL_NOTIFY_CMD_EVENT:
+       {
+               int sobjd, cobjd, loglevel;
+               char name[LTTNG_UST_SYM_NAME_LEN], *sig, *model_emf_uri;
+               size_t nr_fields;
+               struct ustctl_field *fields;
+
+               DBG2("UST app ustctl register event received");
+
+               ret = ustctl_recv_register_event(sock, &sobjd, &cobjd, name, &loglevel,
+                               &sig, &nr_fields, &fields, &model_emf_uri);
+               if (ret < 0) {
+                       if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
+                               ERR("UST app recv event failed with ret %d", ret);
+                       } else {
+                               DBG3("UST app recv event failed. Application died");
+                       }
+                       goto error;
+               }
+
+               /* Add event to the UST registry coming from the notify socket. */
+               ret = add_event_ust_registry(sock, sobjd, cobjd, name, sig, nr_fields,
+                               fields, loglevel, model_emf_uri);
+               if (ret < 0) {
+                       goto error;
+               }
+
+               break;
+       }
+       case USTCTL_NOTIFY_CMD_CHANNEL:
+       {
+               int sobjd, cobjd;
+               size_t nr_fields;
+               struct ustctl_field *fields;
+
+               DBG2("UST app ustctl register channel received");
+
+               ret = ustctl_recv_register_channel(sock, &sobjd, &cobjd, &nr_fields,
+                               &fields);
+               if (ret < 0) {
+                       if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
+                               ERR("UST app recv channel failed with ret %d", ret);
+                       } else {
+                               DBG3("UST app recv channel failed. Application died");
+                       }
+                       goto error;
+               }
+
+               ret = reply_ust_register_channel(sock, sobjd, cobjd, nr_fields,
+                               fields);
+               if (ret < 0) {
+                       goto error;
+               }
+
+               break;
+       }
+       default:
+               /* Should NEVER happen. */
+               assert(0);
+       }
+
+error:
+       return ret;
+}
+
+/*
+ * Once the notify socket hangs up, this is called. First, it tries to find the
+ * corresponding application. On failure, the call_rcu to close the socket is
+ * executed. If an application is found, it tries to delete it from the notify
+ * socket hash table. Whathever the result, it proceeds to the call_rcu.
+ *
+ * Note that an object needs to be allocated here so on ENOMEM failure, the
+ * call RCU is not done but the rest of the cleanup is.
+ */
+void ust_app_notify_sock_unregister(int sock)
+{
+       int err_enomem = 0;
+       struct lttng_ht_iter iter;
+       struct ust_app *app;
+       struct ust_app_notify_sock_obj *obj;
+
+       assert(sock >= 0);
+
+       rcu_read_lock();
+
+       obj = zmalloc(sizeof(*obj));
+       if (!obj) {
+               /*
+                * An ENOMEM is kind of uncool. If this strikes we continue the
+                * procedure but the call_rcu will not be called. In this case, we
+                * accept the fd leak rather than possibly creating an unsynchronized
+                * state between threads.
+                *
+                * TODO: The notify object should be created once the notify socket is
+                * registered and stored independantely from the ust app object. The
+                * tricky part is to synchronize the teardown of the application and
+                * this notify object. Let's keep that in mind so we can avoid this
+                * kind of shenanigans with ENOMEM in the teardown path.
+                */
+               err_enomem = 1;
+       } else {
+               obj->fd = sock;
+       }
+
+       DBG("UST app notify socket unregister %d", sock);
+
+       /*
+        * Lookup application by notify socket. If this fails, this means that the
+        * hash table delete has already been done by the application
+        * unregistration process so we can safely close the notify socket in a
+        * call RCU.
+        */
+       app = find_app_by_notify_sock(sock);
+       if (!app) {
+               goto close_socket;
+       }
+
+       iter.iter.node = &app->notify_sock_n.node;
+
+       /*
+        * Whatever happens here either we fail or succeed, in both cases we have
+        * to close the socket after a grace period to continue to the call RCU
+        * here. If the deletion is successful, the application is not visible
+        * anymore by other threads and is it fails it means that it was already
+        * deleted from the hash table so either way we just have to close the
+        * socket.
+        */
+       (void) lttng_ht_del(ust_app_ht_by_notify_sock, &iter);
+
+close_socket:
+       rcu_read_unlock();
+
+       /*
+        * Close socket after a grace period to avoid for the socket to be reused
+        * before the application object is freed creating potential race between
+        * threads trying to add unique in the global hash table.
+        */
+       if (!err_enomem) {
+               call_rcu(&obj->head, close_notify_sock_rcu);
+       }
+}
index e8bb9a980ab1cf8f122016405638c8735341cfdb..c6294d0a4626df9d51b8d602cb72ccbeb89d3cb6 100644 (file)
 
 #include <common/compat/uuid.h>
 #include "trace-ust.h"
-
-/* lttng-ust supported version. */
-#define LTTNG_UST_COMM_MAJOR          2        /* comm protocol major version */
-#define UST_APP_MAJOR_VERSION         3 /* Internal UST version supported */
+#include "ust-registry.h"
 
 #define UST_APP_EVENT_LIST_SIZE 32
 
+/* Process name (short). */
+#define UST_APP_PROCNAME_LEN   16
+
 struct lttng_filter_bytecode;
 struct lttng_ust_filter_bytecode;
 
 extern int ust_consumerd64_fd, ust_consumerd32_fd;
 
+/*
+ * Object used to close the notify socket in a call_rcu(). Since the
+ * application might not be found, we need an independant object containing the
+ * notify socket fd.
+ */
+struct ust_app_notify_sock_obj {
+       int fd;
+       struct rcu_head head;
+};
+
 struct ust_app_ht_key {
        const char *name;
        const struct lttng_ust_filter_bytecode *filter;
@@ -44,14 +54,23 @@ struct ust_app_ht_key {
  * Application registration data structure.
  */
 struct ust_register_msg {
+       enum ustctl_socket_type type;
        uint32_t major;
        uint32_t minor;
+       uint32_t abi_major;
+       uint32_t abi_minor;
        pid_t pid;
        pid_t ppid;
        uid_t uid;
        gid_t gid;
        uint32_t bits_per_long;
-       char name[16];
+       uint32_t uint8_t_alignment;
+       uint32_t uint16_t_alignment;
+       uint32_t uint32_t_alignment;
+       uint32_t uint64_t_alignment;
+       uint32_t long_alignment;
+       int byte_order;         /* BIG_ENDIAN or LITTLE_ENDIAN */
+       char name[LTTNG_UST_ABI_PROCNAME_LEN];
 };
 
 /*
@@ -66,6 +85,12 @@ struct lttng_ht *ust_app_ht;
  */
 struct lttng_ht *ust_app_ht_by_sock;
 
+/*
+ * Global applications HT used by the session daemon. This table is indexed by
+ * socket using the notify_sock_n node and notify_sock value of an ust_app.
+ */
+struct lttng_ht *ust_app_ht_by_notify_sock;
+
 /* Stream list containing ust_app_stream. */
 struct ust_app_stream_list {
        unsigned int count;
@@ -97,7 +122,6 @@ struct ust_app_stream {
        struct lttng_ust_object_data *obj;
        /* Using a list of streams to keep order. */
        struct cds_list_head list;
-       struct ustctl_consumer_stream *ustream;
 };
 
 struct ust_app_channel {
@@ -106,26 +130,47 @@ struct ust_app_channel {
        /* Channel and streams were sent to the UST tracer. */
        int is_sent;
        /* Unique key used to identify the channel on the consumer side. */
-       unsigned long key;
+       uint64_t key;
        /* Number of stream that this channel is expected to receive. */
        unsigned int expected_stream_count;
        char name[LTTNG_UST_SYM_NAME_LEN];
        struct lttng_ust_object_data *obj;
        struct ustctl_consumer_channel_attr attr;
-       struct ustctl_consumer_channel *channel;
        struct ust_app_stream_list streams;
+       /* Session pointer that owns this object. */
+       struct ust_app_session *session;
        struct lttng_ht *ctx;
        struct lttng_ht *events;
+       /*
+        * UST event registry. The ONLY writer is the application thread.
+        */
+       struct ust_registry_channel registry;
+       /*
+        * Node indexed by channel name in the channels' hash table of a session.
+        */
        struct lttng_ht_node_str node;
+       /*
+        * Node indexed by UST channel object descriptor (handle). Stored in the
+        * ust_objd hash table in the ust_app object.
+        */
+       struct lttng_ht_node_ulong ust_objd_node;
 };
 
 struct ust_app_session {
+       /*
+        * Lock protecting this session's ust app interaction. Held
+        * across command send/recv to/from app. Never nests within the
+        * session registry lock.
+        */
+       pthread_mutex_t lock;
+
        int enabled;
        /* started: has the session been in started state at any time ? */
        int started;  /* allows detection of start vs restart. */
        int handle;   /* used has unique identifier for app session */
        int id;       /* session unique identifier */
        struct ust_app_channel *metadata;
+       struct ust_registry_session registry;
        struct lttng_ht *channels; /* Registered channels */
        struct lttng_ht_node_ulong node;
        char path[PATH_MAX];
@@ -133,8 +178,11 @@ struct ust_app_session {
        uid_t uid;
        gid_t gid;
        struct cds_list_head teardown_node;
-       /* Universal unique identifier used by the tracer. */
-       unsigned char uuid[UUID_STR_LEN];
+       /*
+        * Once at least *one* session is created onto the application, the
+        * corresponding consumer is set so we can use it on unregistration.
+        */
+       struct consumer_output *consumer;
 };
 
 /*
@@ -143,21 +191,33 @@ struct ust_app_session {
  */
 struct ust_app {
        int sock;
+       int notify_sock;
        pid_t pid;
        pid_t ppid;
        uid_t uid;           /* User ID that owns the apps */
        gid_t gid;           /* Group ID that owns the apps */
-       int bits_per_long;
+
+       /* App ABI */
+       uint32_t bits_per_long;
+       uint32_t uint8_t_alignment;
+       uint32_t uint16_t_alignment;
+       uint32_t uint32_t_alignment;
+       uint32_t uint64_t_alignment;
+       uint32_t long_alignment;
+       int byte_order;         /* BIG_ENDIAN or LITTLE_ENDIAN */
+
        int compatible; /* If the lttng-ust tracer version does not match the
                                           supported version of the session daemon, this flag is
                                           set to 0 (NOT compatible) else 1. */
        struct lttng_ust_tracer_version version;
-       uint32_t v_major;    /* Verion major number */
-       uint32_t v_minor;    /* Verion minor number */
-       char name[17];       /* Process name (short) */
+       uint32_t v_major;    /* Version major number */
+       uint32_t v_minor;    /* Version minor number */
+       /* Extra for the NULL byte. */
+       char name[UST_APP_PROCNAME_LEN + 1];
        struct lttng_ht *sessions;
        struct lttng_ht_node_ulong pid_n;
        struct lttng_ht_node_ulong sock_n;
+       struct lttng_ht_node_ulong notify_sock_n;
        /*
         * This is a list of ust app session that, once the app is going into
         * teardown mode, in the RCU call, each node in this list is removed and
@@ -168,6 +228,10 @@ struct ust_app {
         * when a session is destroyed.
         */
        struct cds_list_head teardown_head;
+       /*
+        * Hash table containing ust_app_channel indexed by channel objd.
+        */
+       struct lttng_ht *ust_objd;
 };
 
 #ifdef HAVE_LIBLTTNG_UST_CTL
@@ -178,6 +242,7 @@ int ust_app_register_done(int sock)
 {
        return ustctl_register_done(sock);
 }
+int ust_app_version(struct ust_app *app);
 void ust_app_unregister(int sock);
 unsigned long ust_app_list_count(void);
 int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app);
@@ -217,9 +282,13 @@ void ust_app_clean_list(void);
 void ust_app_ht_alloc(void);
 struct lttng_ht *ust_app_get_ht(void);
 struct ust_app *ust_app_find_by_pid(pid_t pid);
-int ust_app_validate_version(int sock);
 int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate);
 struct ust_app_stream *ust_app_alloc_stream(void);
+int ust_app_recv_registration(int sock, struct ust_register_msg *msg);
+int ust_app_recv_notify(int sock);
+void ust_app_add(struct ust_app *app);
+struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock);
+void ust_app_notify_sock_unregister(int sock);
 
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
@@ -264,6 +333,11 @@ int ust_app_register_done(int sock)
        return -ENOSYS;
 }
 static inline
+int ust_app_version(struct ust_app *app)
+{
+       return -ENOSYS;
+}
+static inline
 void ust_app_unregister(int sock)
 {
 }
@@ -374,15 +448,33 @@ int ust_app_disable_event_pid(struct ltt_ust_session *usess,
        return 0;
 }
 static inline
-int ust_app_validate_version(int sock)
+int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate)
 {
        return 0;
 }
 static inline
-int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate)
+int ust_app_recv_registration(int sock, struct ust_register_msg *msg)
 {
        return 0;
 }
+static inline
+int ust_app_recv_notify(int sock)
+{
+       return 0;
+}
+static inline
+struct ust_app *ust_app_create(struct ust_register_msg *msg, int sock)
+{
+       return NULL;
+}
+static inline
+void ust_app_add(struct ust_app *app)
+{
+}
+static inline
+void ust_app_notify_sock_unregister(int sock)
+{
+}
 
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
diff --git a/src/bin/lttng-sessiond/ust-clock.h b/src/bin/lttng-sessiond/ust-clock.h
new file mode 100644 (file)
index 0000000..7d9c99a
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2010  Pierre-Marc Fournier
+ * Copyright (C) 2011  Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; version 2.1 of
+ * the License.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+ */
+
+#ifndef _UST_CLOCK_H
+#define _UST_CLOCK_H
+
+#include <time.h>
+#include <sys/time.h>
+#include <stdint.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <errno.h>
+
+#include <common/compat/uuid.h>
+
+/* TRACE CLOCK */
+
+/*
+ * Currently using the kernel MONOTONIC clock, waiting for kernel-side
+ * LTTng to implement mmap'd trace clock.
+ */
+
+/* Choosing correct trace clock */
+
+static __inline__
+uint64_t trace_clock_read64(void)
+{
+       struct timespec ts;
+
+       clock_gettime(CLOCK_MONOTONIC, &ts);
+       return ((uint64_t) ts.tv_sec * 1000000000ULL) + ts.tv_nsec;
+}
+
+static __inline__
+uint64_t trace_clock_freq(void)
+{
+       return 1000000000ULL;
+}
+
+static __inline__
+int trace_clock_uuid(char *uuid)
+{
+       int ret = 0;
+       size_t len;
+       FILE *fp;
+
+       /*
+        * boot_id needs to be read once before being used concurrently
+        * to deal with a Linux kernel race. A fix is proposed for
+        * upstream, but the work-around is needed for older kernels.
+        */
+       fp = fopen("/proc/sys/kernel/random/boot_id", "r");
+       if (!fp) {
+               return -ENOENT;
+       }
+       len = fread(uuid, 1, UUID_STR_LEN - 1, fp);
+       if (len < UUID_STR_LEN - 1) {
+               ret = -EINVAL;
+               goto end;
+       }
+       uuid[UUID_STR_LEN - 1] = '\0';
+end:
+       fclose(fp);
+       return ret;
+}
+
+#endif /* _UST_CLOCK_H */
index 7c1ae402c8ded33c75532d76530ba5ddf195001d..33cb6ff376c16c92db8bf605963054b364b59555 100644 (file)
@@ -21,6 +21,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <inttypes.h>
 
 #include <common/common.h>
 #include <common/consumer.h>
@@ -99,7 +100,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
                struct consumer_socket *socket)
 {
        int ret;
-       unsigned long key;
+       uint64_t key;
        char *pathname = NULL;
        struct lttcomm_consumer_msg msg;
 
@@ -132,7 +133,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
                        ua_sess->gid,
                        consumer->net_seq_index,
                        ua_chan->key,
-                       ua_sess->uuid);
+                       ua_sess->registry.uuid);
 
        health_code_update();
 
@@ -151,7 +152,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
        /* We need at least one where 1 stream for 1 cpu. */
        assert(ua_chan->expected_stream_count > 0);
 
-       DBG2("UST ask channel %lu successfully done with %u stream(s)", key,
+       DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key,
                        ua_chan->expected_stream_count);
 
 error:
@@ -228,7 +229,6 @@ int ust_consumer_get_channel(struct consumer_socket *socket,
                }
                goto error;
        }
-       ua_chan->handle = ua_chan->obj->handle;
 
        /* Next, get all streams. */
        while (1) {
@@ -345,6 +345,7 @@ int ust_consumer_send_stream_to_ust(struct ust_app *app,
                }
                goto error;
        }
+       channel->handle = channel->obj->handle;
 
 error:
        return ret;
@@ -383,3 +384,131 @@ int ust_consumer_send_channel_to_ust(struct ust_app *app,
 error:
        return ret;
 }
+
+/*
+ * Send metadata string to consumer.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_push_metadata(struct consumer_socket *socket,
+               struct ust_app_session *ua_sess, char *metadata_str,
+               size_t len, size_t target_offset)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(socket);
+       assert(socket->fd >= 0);
+       assert(ua_sess);
+       assert(ua_sess->metadata);
+
+       DBG2("UST consumer push metadata to consumer socket %d", socket->fd);
+
+       msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
+       msg.u.push_metadata.key = ua_sess->metadata->key;
+       msg.u.push_metadata.target_offset = target_offset;
+       msg.u.push_metadata.len = len;
+
+       /*
+        * TODO: reenable these locks when the consumerd gets the ability to
+        * reorder the metadata it receives. This fits with locking in
+        * src/bin/lttng-sessiond/ust-app.c:push_metadata()
+        *
+        * pthread_mutex_lock(socket->lock);
+        */
+
+       health_code_update();
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+       DBG3("UST consumer push metadata on sock %d of len %lu", socket->fd, len);
+
+       ret = lttcomm_send_unix_sock(socket->fd, metadata_str, len);
+       if (ret < 0) {
+               fprintf(stderr, "send error: %d\n", ret);
+               goto error;
+       }
+
+       health_code_update();
+       ret = consumer_recv_status_reply(socket);
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       health_code_update();
+       /*
+        * pthread_mutex_unlock(socket->lock);
+        */
+       return ret;
+}
+
+/*
+ * Send a close metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_close_metadata(struct consumer_socket *socket,
+               struct ust_app_channel *ua_chan)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(ua_chan);
+       assert(socket);
+       assert(socket->fd >= 0);
+
+       DBG2("UST consumer close metadata channel key %lu", ua_chan->key);
+
+       msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
+       msg.u.close_metadata.key = ua_chan->key;
+
+       pthread_mutex_lock(socket->lock);
+       health_code_update();
+
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       health_code_update();
+       pthread_mutex_unlock(socket->lock);
+       return ret;
+}
+
+/*
+ * Send a setup metdata command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int ust_consumer_setup_metadata(struct consumer_socket *socket,
+               struct ust_app_channel *ua_chan)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(ua_chan);
+       assert(socket);
+       assert(socket->fd >= 0);
+
+       DBG2("UST consumer setup metadata channel key %lu", ua_chan->key);
+
+       msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
+       msg.u.setup_metadata.key = ua_chan->key;
+
+       pthread_mutex_lock(socket->lock);
+       health_code_update();
+
+       ret = consumer_send_msg(socket, &msg);
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       health_code_update();
+       pthread_mutex_unlock(socket->lock);
+       return ret;
+}
index f48ea42f27bfdfaff0e34cee692f3364ad590817..8739af5520c2c8d96838930c62e6332e2dbffbc4 100644 (file)
@@ -37,4 +37,14 @@ int ust_consumer_send_stream_to_ust(struct ust_app *app,
 int ust_consumer_send_channel_to_ust(struct ust_app *app,
                struct ust_app_session *ua_sess, struct ust_app_channel *channel);
 
+int ust_consumer_push_metadata(struct consumer_socket *socket,
+               struct ust_app_session *ua_sess, char *metadata_str,
+               size_t len, size_t target_offset);
+
+int ust_consumer_close_metadata(struct consumer_socket *socket,
+               struct ust_app_channel *ua_chan);
+
+int ust_consumer_setup_metadata(struct consumer_socket *socket,
+               struct ust_app_channel *ua_chan);
+
 #endif /* _UST_CONSUMER_H */
diff --git a/src/bin/lttng-sessiond/ust-metadata.c b/src/bin/lttng-sessiond/ust-metadata.c
new file mode 100644 (file)
index 0000000..45512ac
--- /dev/null
@@ -0,0 +1,674 @@
+/*
+ * ust-metadata.c
+ *
+ * LTTng-UST metadata generation
+ *
+ * Copyright (C) 2010-2013 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#include <stdint.h>
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <limits.h>
+#include <unistd.h>
+#include <inttypes.h>
+#include <common/common.h>
+
+#include "ust-registry.h"
+#include "ust-clock.h"
+#include "ust-app.h"
+
+#ifndef max_t
+#define max_t(type, a, b)      ((type) ((a) > (b) ? (a) : (b)))
+#endif
+
+static inline
+int fls(unsigned int x)
+{
+       int r = 32;
+
+       if (!x)
+               return 0;
+       if (!(x & 0xFFFF0000U)) {
+               x <<= 16;
+               r -= 16;
+       }
+       if (!(x & 0xFF000000U)) {
+               x <<= 8;
+               r -= 8;
+       }
+       if (!(x & 0xF0000000U)) {
+               x <<= 4;
+               r -= 4;
+       }
+       if (!(x & 0xC0000000U)) {
+               x <<= 2;
+               r -= 2;
+       }
+       if (!(x & 0x80000000U)) {
+               x <<= 1;
+               r -= 1;
+       }
+       return r;
+}
+
+static inline
+int get_count_order(unsigned int count)
+{
+       int order;
+
+       order = fls(count) - 1;
+       if (count & (count - 1))
+               order++;
+       return order;
+}
+
+/*
+ * Returns offset where to write in metadata array, or negative error value on error.
+ */
+static
+ssize_t metadata_reserve(struct ust_registry_session *session, size_t len)
+{
+       size_t new_len = session->metadata_len + len;
+       size_t new_alloc_len = new_len;
+       size_t old_alloc_len = session->metadata_alloc_len;
+       ssize_t ret;
+
+       if (new_alloc_len > (UINT32_MAX >> 1))
+               return -EINVAL;
+       if ((old_alloc_len << 1) > (UINT32_MAX >> 1))
+               return -EINVAL;
+
+       if (new_alloc_len > old_alloc_len) {
+               char *newptr;
+
+               new_alloc_len =
+                       max_t(size_t, 1U << get_count_order(new_alloc_len), old_alloc_len << 1);
+               newptr = realloc(session->metadata, new_alloc_len);
+               if (!newptr)
+                       return -ENOMEM;
+               session->metadata = newptr;
+               /* We zero directly the memory from start of allocation. */
+               memset(&session->metadata[old_alloc_len], 0, new_alloc_len - old_alloc_len);
+               session->metadata_alloc_len = new_alloc_len;
+       }
+       ret = session->metadata_len;
+       session->metadata_len += len;
+       return ret;
+}
+
+/*
+ * We have exclusive access to our metadata buffer (protected by the
+ * ust_lock), so we can do racy operations such as looking for
+ * remaining space left in packet and write, since mutual exclusion
+ * protects us from concurrent writes.
+ */
+static
+int lttng_metadata_printf(struct ust_registry_session *session,
+               const char *fmt, ...)
+{
+       char *str = NULL;
+       size_t len;
+       va_list ap;
+       ssize_t offset;
+       int ret;
+
+       va_start(ap, fmt);
+       ret = vasprintf(&str, fmt, ap);
+       va_end(ap);
+       if (ret < 0)
+               return -ENOMEM;
+
+       len = strlen(str);
+       offset = metadata_reserve(session, len);
+       if (offset < 0) {
+               ret = offset;
+               goto end;
+       }
+       memcpy(&session->metadata[offset], str, len);
+       DBG3("Append to metadata: \"%s\"", str);
+       ret = 0;
+
+end:
+       free(str);
+       return ret;
+}
+
+static
+int _lttng_field_statedump(struct ust_registry_session *session,
+               const struct ustctl_field *field)
+{
+       int ret = 0;
+       const char *bo_be = " byte_order = be;";
+       const char *bo_le = " byte_order = le;";
+       const char *bo_native = "";
+       const char *bo_reverse;
+
+       if (session->byte_order == BIG_ENDIAN)
+               bo_reverse = bo_le;
+       else
+               bo_reverse = bo_be;
+
+       switch (field->type.atype) {
+       case ustctl_atype_integer:
+               ret = lttng_metadata_printf(session,
+                       "               integer { size = %u; align = %u; signed = %u; encoding = %s; base = %u;%s } _%s;\n",
+                       field->type.u.basic.integer.size,
+                       field->type.u.basic.integer.alignment,
+                       field->type.u.basic.integer.signedness,
+                       (field->type.u.basic.integer.encoding == ustctl_encode_none)
+                               ? "none"
+                               : (field->type.u.basic.integer.encoding == ustctl_encode_UTF8)
+                                       ? "UTF8"
+                                       : "ASCII",
+                       field->type.u.basic.integer.base,
+                       field->type.u.basic.integer.reverse_byte_order ? bo_reverse : bo_native,
+                       field->name);
+               break;
+       case ustctl_atype_float:
+               ret = lttng_metadata_printf(session,
+                       "               floating_point { exp_dig = %u; mant_dig = %u; align = %u;%s } _%s;\n",
+                       field->type.u.basic._float.exp_dig,
+                       field->type.u.basic._float.mant_dig,
+                       field->type.u.basic._float.alignment,
+                       field->type.u.basic.integer.reverse_byte_order ? bo_reverse : bo_native,
+                       field->name);
+               break;
+       case ustctl_atype_enum:
+               return -EINVAL;
+       case ustctl_atype_array:
+       {
+               const struct ustctl_basic_type *elem_type;
+
+               elem_type = &field->type.u.array.elem_type;
+               ret = lttng_metadata_printf(session,
+                       "               integer { size = %u; align = %u; signed = %u; encoding = %s; base = %u;%s } _%s[%u];\n",
+                       elem_type->u.basic.integer.size,
+                       elem_type->u.basic.integer.alignment,
+                       elem_type->u.basic.integer.signedness,
+                       (elem_type->u.basic.integer.encoding == ustctl_encode_none)
+                               ? "none"
+                               : (elem_type->u.basic.integer.encoding == ustctl_encode_UTF8)
+                                       ? "UTF8"
+                                       : "ASCII",
+                       elem_type->u.basic.integer.base,
+                       elem_type->u.basic.integer.reverse_byte_order ? bo_reverse : bo_native,
+                       field->name, field->type.u.array.length);
+               break;
+       }
+       case ustctl_atype_sequence:
+       {
+               const struct ustctl_basic_type *elem_type;
+               const struct ustctl_basic_type *length_type;
+
+               elem_type = &field->type.u.sequence.elem_type;
+               length_type = &field->type.u.sequence.length_type;
+               ret = lttng_metadata_printf(session,
+                       "               integer { size = %u; align = %u; signed = %u; encoding = %s; base = %u;%s } __%s_length;\n",
+                       length_type->u.basic.integer.size,
+                       (unsigned int) length_type->u.basic.integer.alignment,
+                       length_type->u.basic.integer.signedness,
+                       (length_type->u.basic.integer.encoding == ustctl_encode_none)
+                               ? "none"
+                               : ((length_type->u.basic.integer.encoding == ustctl_encode_UTF8)
+                                       ? "UTF8"
+                                       : "ASCII"),
+                       length_type->u.basic.integer.base,
+                       length_type->u.basic.integer.reverse_byte_order ? bo_reverse : bo_native,
+                       field->name);
+               if (ret)
+                       return ret;
+
+               ret = lttng_metadata_printf(session,
+                       "               integer { size = %u; align = %u; signed = %u; encoding = %s; base = %u;%s } _%s[ __%s_length ];\n",
+                       elem_type->u.basic.integer.size,
+                       (unsigned int) elem_type->u.basic.integer.alignment,
+                       elem_type->u.basic.integer.signedness,
+                       (elem_type->u.basic.integer.encoding == ustctl_encode_none)
+                               ? "none"
+                               : ((elem_type->u.basic.integer.encoding == ustctl_encode_UTF8)
+                                       ? "UTF8"
+                                       : "ASCII"),
+                       elem_type->u.basic.integer.base,
+                       elem_type->u.basic.integer.reverse_byte_order ? bo_reverse : bo_native,
+                       field->name,
+                       field->name);
+               break;
+       }
+
+       case ustctl_atype_string:
+               /* Default encoding is UTF8 */
+               ret = lttng_metadata_printf(session,
+                       "               string%s _%s;\n",
+                       field->type.u.basic.string.encoding == ustctl_encode_ASCII ?
+                               " { encoding = ASCII; }" : "",
+                       field->name);
+               break;
+       default:
+               return -EINVAL;
+       }
+       return ret;
+}
+
+static
+int _lttng_context_metadata_statedump(struct ust_registry_session *session,
+               size_t nr_ctx_fields,
+               struct ustctl_field *ctx)
+{
+       int ret = 0;
+       int i;
+
+       if (!ctx)
+               return 0;
+       for (i = 0; i < nr_ctx_fields; i++) {
+               const struct ustctl_field *field = &ctx[i];
+
+               ret = _lttng_field_statedump(session, field);
+               if (ret)
+                       return ret;
+       }
+       return ret;
+}
+
+static
+int _lttng_fields_metadata_statedump(struct ust_registry_session *session,
+               struct ust_registry_event *event)
+{
+       int ret = 0;
+       int i;
+
+       for (i = 0; i < event->nr_fields; i++) {
+               const struct ustctl_field *field = &event->fields[i];
+
+               ret = _lttng_field_statedump(session, field);
+               if (ret)
+                       return ret;
+       }
+       return ret;
+}
+
+/*
+ * Should be called with session registry mutex held.
+ */
+int ust_metadata_event_statedump(struct ust_registry_session *session,
+               struct ust_registry_channel *chan,
+               struct ust_registry_event *event)
+{
+       int ret = 0;
+
+       /* Don't dump metadata events */
+       if (chan->chan_id == -1U)
+               return 0;
+
+       ret = lttng_metadata_printf(session,
+               "event {\n"
+               "       name = \"%s\";\n"
+               "       id = %u;\n"
+               "       stream_id = %u;\n",
+               event->name,
+               event->id,
+               chan->chan_id);
+       if (ret)
+               goto end;
+
+       ret = lttng_metadata_printf(session,
+               "       loglevel = %d;\n",
+               event->loglevel);
+       if (ret)
+               goto end;
+
+       if (event->model_emf_uri) {
+               ret = lttng_metadata_printf(session,
+                       "       model.emf.uri = \"%s\";\n",
+                       event->model_emf_uri);
+               if (ret)
+                       goto end;
+       }
+
+#if 0 /* context for events not supported */
+       if (event->ctx) {
+               ret = lttng_metadata_printf(session,
+                       "       context := struct {\n");
+               if (ret)
+                       goto end;
+       }
+       ret = _lttng_context_metadata_statedump(session, event->ctx);
+       if (ret)
+               goto end;
+       if (event->ctx) {
+               ret = lttng_metadata_printf(session,
+                       "       };\n");
+               if (ret)
+                       goto end;
+       }
+#endif
+       ret = lttng_metadata_printf(session,
+               "       fields := struct {\n"
+               );
+       if (ret)
+               goto end;
+
+       ret = _lttng_fields_metadata_statedump(session, event);
+       if (ret)
+               goto end;
+
+       ret = lttng_metadata_printf(session,
+               "       };\n"
+               "};\n\n");
+       if (ret)
+               goto end;
+
+end:
+       return ret;
+}
+
+/*
+ * Should be called with session registry mutex held.
+ */
+int ust_metadata_channel_statedump(struct ust_registry_session *session,
+               struct ust_registry_channel *chan)
+{
+       int ret = 0;
+
+       /* Don't dump metadata events */
+       if (chan->chan_id == -1U)
+               return 0;
+
+       if (!chan->header_type)
+               return -EINVAL;
+
+       ret = lttng_metadata_printf(session,
+               "stream {\n"
+               "       id = %u;\n"
+               "       event.header := %s;\n"
+               "       packet.context := struct packet_context;\n",
+               chan->chan_id,
+               chan->header_type == USTCTL_CHANNEL_HEADER_COMPACT ?
+                       "struct event_header_compact" :
+                       "struct event_header_large");
+       if (ret)
+               goto end;
+
+       if (chan->ctx_fields) {
+               ret = lttng_metadata_printf(session,
+                       "       event.context := struct {\n");
+               if (ret)
+                       goto end;
+       }
+       ret = _lttng_context_metadata_statedump(session,
+               chan->nr_ctx_fields,
+               chan->ctx_fields);
+       if (ret)
+               goto end;
+       if (chan->ctx_fields) {
+               ret = lttng_metadata_printf(session,
+                       "       };\n");
+               if (ret)
+                       goto end;
+       }
+
+       ret = lttng_metadata_printf(session,
+               "};\n\n");
+
+end:
+       return ret;
+}
+
+static
+int _lttng_stream_packet_context_declare(struct ust_registry_session *session)
+{
+       return lttng_metadata_printf(session,
+               "struct packet_context {\n"
+               "       uint64_clock_monotonic_t timestamp_begin;\n"
+               "       uint64_clock_monotonic_t timestamp_end;\n"
+               "       uint64_t content_size;\n"
+               "       uint64_t packet_size;\n"
+               "       unsigned long events_discarded;\n"
+               "       uint32_t cpu_id;\n"
+               "};\n\n"
+               );
+}
+
+/*
+ * Compact header:
+ * id: range: 0 - 30.
+ * id 31 is reserved to indicate an extended header.
+ *
+ * Large header:
+ * id: range: 0 - 65534.
+ * id 65535 is reserved to indicate an extended header.
+ */
+static
+int _lttng_event_header_declare(struct ust_registry_session *session)
+{
+       return lttng_metadata_printf(session,
+       "struct event_header_compact {\n"
+       "       enum : uint5_t { compact = 0 ... 30, extended = 31 } id;\n"
+       "       variant <id> {\n"
+       "               struct {\n"
+       "                       uint27_clock_monotonic_t timestamp;\n"
+       "               } compact;\n"
+       "               struct {\n"
+       "                       uint32_t id;\n"
+       "                       uint64_clock_monotonic_t timestamp;\n"
+       "               } extended;\n"
+       "       } v;\n"
+       "} align(%u);\n"
+       "\n"
+       "struct event_header_large {\n"
+       "       enum : uint16_t { compact = 0 ... 65534, extended = 65535 } id;\n"
+       "       variant <id> {\n"
+       "               struct {\n"
+       "                       uint32_clock_monotonic_t timestamp;\n"
+       "               } compact;\n"
+       "               struct {\n"
+       "                       uint32_t id;\n"
+       "                       uint64_clock_monotonic_t timestamp;\n"
+       "               } extended;\n"
+       "       } v;\n"
+       "} align(%u);\n\n",
+       session->uint32_t_alignment,
+       session->uint16_t_alignment
+       );
+}
+
+/*
+ * Approximation of NTP time of day to clock monotonic correlation,
+ * taken at start of trace.
+ * Yes, this is only an approximation. Yes, we can (and will) do better
+ * in future versions.
+ */
+static
+uint64_t measure_clock_offset(void)
+{
+       uint64_t offset, monotonic[2], realtime;
+       struct timespec rts = { 0, 0 };
+       int ret;
+
+       monotonic[0] = trace_clock_read64();
+       ret = clock_gettime(CLOCK_REALTIME, &rts);
+       if (ret < 0)
+               return 0;
+       monotonic[1] = trace_clock_read64();
+       offset = (monotonic[0] + monotonic[1]) >> 1;
+       realtime = (uint64_t) rts.tv_sec * 1000000000ULL;
+       realtime += rts.tv_nsec;
+       offset = realtime - offset;
+       return offset;
+}
+
+
+/*
+ * Should be called with session registry mutex held.
+ */
+int ust_metadata_session_statedump(struct ust_registry_session *session,
+               struct ust_app *app)
+{
+       unsigned char *uuid_c;
+       char uuid_s[UUID_STR_LEN],
+               clock_uuid_s[UUID_STR_LEN];
+       int ret = 0;
+       char hostname[HOST_NAME_MAX];
+
+       uuid_c = session->uuid;
+
+       snprintf(uuid_s, sizeof(uuid_s),
+               "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x",
+               uuid_c[0], uuid_c[1], uuid_c[2], uuid_c[3],
+               uuid_c[4], uuid_c[5], uuid_c[6], uuid_c[7],
+               uuid_c[8], uuid_c[9], uuid_c[10], uuid_c[11],
+               uuid_c[12], uuid_c[13], uuid_c[14], uuid_c[15]);
+
+       ret = lttng_metadata_printf(session,
+               "typealias integer { size = 8; align = %u; signed = false; } := uint8_t;\n"
+               "typealias integer { size = 16; align = %u; signed = false; } := uint16_t;\n"
+               "typealias integer { size = 32; align = %u; signed = false; } := uint32_t;\n"
+               "typealias integer { size = 64; align = %u; signed = false; } := uint64_t;\n"
+               "typealias integer { size = %u; align = %u; signed = false; } := unsigned long;\n"
+               "typealias integer { size = 5; align = 1; signed = false; } := uint5_t;\n"
+               "typealias integer { size = 27; align = 1; signed = false; } := uint27_t;\n"
+               "\n"
+               "trace {\n"
+               "       major = %u;\n"
+               "       minor = %u;\n"
+               "       uuid = \"%s\";\n"
+               "       byte_order = %s;\n"
+               "       packet.header := struct {\n"
+               "               uint32_t magic;\n"
+               "               uint8_t  uuid[16];\n"
+               "               uint32_t stream_id;\n"
+               "       };\n"
+               "};\n\n",
+               session->uint8_t_alignment,
+               session->uint16_t_alignment,
+               session->uint32_t_alignment,
+               session->uint64_t_alignment,
+               session->bits_per_long,
+               session->long_alignment,
+               CTF_SPEC_MAJOR,
+               CTF_SPEC_MINOR,
+               uuid_s,
+               session->byte_order == BIG_ENDIAN ? "be" : "le"
+               );
+       if (ret)
+               goto end;
+
+       /* ignore error, just use empty string if error. */
+       hostname[0] = '\0';
+       ret = gethostname(hostname, sizeof(hostname));
+       if (ret && errno == ENAMETOOLONG)
+               hostname[HOST_NAME_MAX - 1] = '\0';
+       ret = lttng_metadata_printf(session,
+               "env {\n"
+               "       hostname = \"%s\";\n"
+               "       domain = \"ust\";\n"
+               "       tracer_name = \"lttng-ust\";\n"
+               "       tracer_major = %u;\n"
+               "       tracer_minor = %u;\n"
+               "       tracer_patchlevel = %u;\n",
+               hostname,
+               app->version.major,
+               app->version.minor,
+               app->version.patchlevel
+               );
+       if (ret)
+               goto end;
+
+       /*
+        * If per-application registry, we can output extra information
+        * about the application.
+        */
+       if (app) {
+               ret = lttng_metadata_printf(session,
+                       "       vpid = %d;\n"
+                       "       procname = \"%s\";\n",
+                       (int) app->pid,
+                       app->name
+                       );
+               if (ret)
+                       goto end;
+       }
+
+       ret = lttng_metadata_printf(session,
+               "};\n\n"
+               );
+       if (ret)
+               goto end;
+
+
+       ret = lttng_metadata_printf(session,
+               "clock {\n"
+               "       name = %s;\n",
+               "monotonic"
+               );
+       if (ret)
+               goto end;
+
+       if (!trace_clock_uuid(clock_uuid_s)) {
+               ret = lttng_metadata_printf(session,
+                       "       uuid = \"%s\";\n",
+                       clock_uuid_s
+                       );
+               if (ret)
+                       goto end;
+       }
+
+       ret = lttng_metadata_printf(session,
+               "       description = \"Monotonic Clock\";\n"
+               "       freq = %" PRIu64 "; /* Frequency, in Hz */\n"
+               "       /* clock value offset from Epoch is: offset * (1/freq) */\n"
+               "       offset = %" PRIu64 ";\n"
+               "};\n\n",
+               trace_clock_freq(),
+               measure_clock_offset()
+               );
+       if (ret)
+               goto end;
+
+       ret = lttng_metadata_printf(session,
+               "typealias integer {\n"
+               "       size = 27; align = 1; signed = false;\n"
+               "       map = clock.monotonic.value;\n"
+               "} := uint27_clock_monotonic_t;\n"
+               "\n"
+               "typealias integer {\n"
+               "       size = 32; align = %u; signed = false;\n"
+               "       map = clock.monotonic.value;\n"
+               "} := uint32_clock_monotonic_t;\n"
+               "\n"
+               "typealias integer {\n"
+               "       size = 64; align = %u; signed = false;\n"
+               "       map = clock.monotonic.value;\n"
+               "} := uint64_clock_monotonic_t;\n\n",
+               session->uint32_t_alignment,
+               session->uint64_t_alignment
+               );
+       if (ret)
+               goto end;
+
+       ret = _lttng_stream_packet_context_declare(session);
+       if (ret)
+               goto end;
+
+       ret = _lttng_event_header_declare(session);
+       if (ret)
+               goto end;
+
+end:
+       return ret;
+}
diff --git a/src/bin/lttng-sessiond/ust-registry.c b/src/bin/lttng-sessiond/ust-registry.c
new file mode 100644 (file)
index 0000000..31d33df
--- /dev/null
@@ -0,0 +1,372 @@
+/*
+ * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#define _GNU_SOURCE
+#include <assert.h>
+
+#include <common/common.h>
+#include "ust-registry.h"
+
+/*
+ * Hash table match function for event in the registry.
+ */
+static int ht_match_event(struct cds_lfht_node *node, const void *_key)
+{
+       struct ust_registry_event *event;
+       const struct ust_registry_event *key;
+
+       assert(node);
+       assert(_key);
+
+       event = caa_container_of(node, struct ust_registry_event, node.node);
+       assert(event);
+       key = _key;
+
+       /* It has to be a perfect match. */
+       if (strncmp(event->name, key->name, sizeof(event->name)) != 0) {
+               goto no_match;
+       }
+
+       /* It has to be a perfect match. */
+       if (strncmp(event->signature, key->signature,
+                               strlen(event->signature) != 0)) {
+               goto no_match;
+       }
+
+       /* Match */
+       return 1;
+
+no_match:
+       return 0;
+}
+
+/*
+ * Allocate event and initialize it. This does NOT set a valid event id from a
+ * registry.
+ */
+static struct ust_registry_event *alloc_event(int session_objd,
+               int channel_objd, char *name, char *sig, size_t nr_fields,
+               struct ustctl_field *fields, int loglevel, char *model_emf_uri)
+{
+       struct ust_registry_event *event = NULL;
+
+       event = zmalloc(sizeof(*event));
+       if (!event) {
+               PERROR("zmalloc ust registry event");
+               goto error;
+       }
+
+       event->session_objd = session_objd;
+       event->channel_objd = channel_objd;
+       /* Allocated by ustctl. */
+       event->signature = sig;
+       event->nr_fields = nr_fields;
+       event->fields = fields;
+       event->loglevel = loglevel;
+       event->model_emf_uri = model_emf_uri;
+       if (name) {
+               /* Copy event name and force NULL byte. */
+               strncpy(event->name, name, sizeof(event->name));
+               event->name[sizeof(event->name) - 1] = '\0';
+       }
+       lttng_ht_node_init_str(&event->node, event->name);
+
+error:
+       return event;
+}
+
+/*
+ * Free event data structure. This does NOT delete it from any hash table. It's
+ * safe to pass a NULL pointer. This shoudl be called inside a call RCU if the
+ * event is previously deleted from a rcu hash table.
+ */
+static void destroy_event(struct ust_registry_event *event)
+{
+       if (!event) {
+               return;
+       }
+
+       free(event->fields);
+       free(event->model_emf_uri);
+       free(event->signature);
+       free(event);
+}
+
+/*
+ * Destroy event function call of the call RCU.
+ */
+static void destroy_event_rcu(struct rcu_head *head)
+{
+       struct lttng_ht_node_str *node =
+               caa_container_of(head, struct lttng_ht_node_str, head);
+       struct ust_registry_event *event =
+               caa_container_of(node, struct ust_registry_event, node);
+
+       destroy_event(event);
+}
+
+/*
+ * Find an event using the name and signature in the given registry. RCU read
+ * side lock MUST be acquired before calling this function and as long as the
+ * event reference is kept by the caller.
+ *
+ * On success, the event pointer is returned else NULL.
+ */
+struct ust_registry_event *ust_registry_find_event(
+               struct ust_registry_channel *chan, char *name, char *sig)
+{
+       struct lttng_ht_node_str *node;
+       struct lttng_ht_iter iter;
+       struct ust_registry_event *event = NULL;
+       struct ust_registry_event key;
+
+       assert(chan);
+       assert(name);
+       assert(sig);
+
+       /* Setup key for the match function. */
+       strncpy(key.name, name, sizeof(key.name));
+       key.name[sizeof(key.name) - 1] = '\0';
+       key.signature = sig;
+
+       cds_lfht_lookup(chan->ht->ht, chan->ht->hash_fct(name, lttng_ht_seed),
+                       chan->ht->match_fct, &key, &iter.iter);
+       node = lttng_ht_iter_get_node_str(&iter);
+       if (!node) {
+               goto end;
+       }
+       event = caa_container_of(node, struct ust_registry_event, node);
+
+end:
+       return event;
+}
+
+/*
+ * Create a ust_registry_event from the given parameters and add it to the
+ * registry hash table. If event_id is valid, it is set with the newly created
+ * event id.
+ *
+ * On success, return 0 else a negative value. The created event MUST be unique
+ * so on duplicate entry -EINVAL is returned. On error, event_id is untouched.
+ *
+ * Should be called with session registry mutex held.
+ */
+int ust_registry_create_event(struct ust_registry_session *session,
+               struct ust_registry_channel *chan,
+               int session_objd, int channel_objd, char *name, char *sig,
+               size_t nr_fields, struct ustctl_field *fields, int loglevel,
+               char *model_emf_uri, uint32_t *event_id)
+{
+       int ret;
+       struct cds_lfht_node *nptr;
+       struct ust_registry_event *event = NULL;
+
+       assert(session);
+       assert(chan);
+       assert(name);
+       assert(sig);
+
+       /*
+        * This should not happen but since it comes from the UST tracer, an
+        * external party, don't assert and simply validate values.
+        */
+       if (session_objd < 0 || channel_objd < 0) {
+               ret = -EINVAL;
+               goto error;
+       }
+
+       /* Check if we've reached the maximum possible id. */
+       if (ust_registry_is_max_id(chan->used_event_id)) {
+               ret = -ENOENT;
+               goto error;
+       }
+
+       event = alloc_event(session_objd, channel_objd, name, sig, nr_fields,
+                       fields, loglevel, model_emf_uri);
+       if (!event) {
+               ret = -ENOMEM;
+               goto error;
+       }
+
+       event->id = ust_registry_get_next_event_id(chan);
+
+       DBG3("UST registry creating event with event: %s, sig: %s, id: %u, "
+                       "chan_objd: %u, sess_objd: %u", event->name, event->signature,
+                       event->id, event->channel_objd, event->session_objd);
+
+       rcu_read_lock();
+       /*
+        * This is an add unique with a custom match function for event. The node
+        * are matched using the event name and signature.
+        */
+       nptr = cds_lfht_add_unique(chan->ht->ht, chan->ht->hash_fct(event->node.key,
+                               lttng_ht_seed), chan->ht->match_fct, event, &event->node.node);
+       if (nptr != &event->node.node) {
+               ERR("UST registry create event add unique failed for event: %s, "
+                               "sig: %s, id: %u, chan_objd: %u, sess_objd: %u", event->name,
+                               event->signature, event->id, event->channel_objd,
+                               event->session_objd);
+               ret = -EINVAL;
+               goto error_unlock;
+       }
+
+       /* Set event id if user wants it. */
+       if (event_id) {
+               *event_id = event->id;
+       }
+       rcu_read_unlock();
+
+       /* Append to metadata */
+       ret = ust_metadata_event_statedump(session, chan, event);
+       if (ret) {
+               ERR("Error appending event metadata (errno = %d)", ret);
+               return ret;
+       }
+
+       return 0;
+
+error_unlock:
+       rcu_read_unlock();
+error:
+       destroy_event(event);
+       return ret;
+}
+
+/*
+ * For a given event in a registry, delete the entry and destroy the event.
+ * This MUST be called within a RCU read side lock section.
+ */
+void ust_registry_destroy_event(struct ust_registry_channel *chan,
+               struct ust_registry_event *event)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+
+       assert(chan);
+       assert(event);
+
+       /* Delete the node first. */
+       iter.iter.node = &event->node.node;
+       ret = lttng_ht_del(chan->ht, &iter);
+       assert(!ret);
+
+       call_rcu(&event->node.head, destroy_event_rcu);
+
+       return;
+}
+
+/*
+ * Initialize registry with default values.
+ */
+void ust_registry_channel_init(struct ust_registry_session *session,
+               struct ust_registry_channel *chan)
+{
+       assert(chan);
+
+       memset(chan, 0, sizeof(struct ust_registry_channel));
+
+       chan->ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING);
+       assert(chan->ht);
+
+       /* Set custom match function. */
+       chan->ht->match_fct = ht_match_event;
+}
+
+/*
+ * Destroy every element of the registry and free the memory. This does NOT
+ * free the registry pointer since it might not have been allocated before so
+ * it's the caller responsability.
+ *
+ * This MUST be called within a RCU read side lock section.
+ */
+void ust_registry_channel_destroy(struct ust_registry_session *session,
+               struct ust_registry_channel *chan)
+{
+       struct lttng_ht_iter iter;
+       struct ust_registry_event *event;
+
+       assert(chan);
+
+       /* Destroy all event associated with this registry. */
+       cds_lfht_for_each_entry(chan->ht->ht, &iter.iter, event, node.node) {
+               /* Delete the node from the ht and free it. */
+               ust_registry_destroy_event(chan, event);
+       }
+       lttng_ht_destroy(chan->ht);
+}
+
+/*
+ * Initialize registry with default values.
+ */
+int ust_registry_session_init(struct ust_registry_session *session,
+               struct ust_app *app,
+               uint32_t bits_per_long,
+               uint32_t uint8_t_alignment,
+               uint32_t uint16_t_alignment,
+               uint32_t uint32_t_alignment,
+               uint32_t uint64_t_alignment,
+               uint32_t long_alignment,
+               int byte_order)
+{
+       int ret;
+
+       assert(session);
+
+       memset(session, 0, sizeof(struct ust_registry_session));
+
+       pthread_mutex_init(&session->lock, NULL);
+       session->bits_per_long = bits_per_long;
+       session->uint8_t_alignment = uint8_t_alignment;
+       session->uint16_t_alignment = uint16_t_alignment;
+       session->uint32_t_alignment = uint32_t_alignment;
+       session->uint64_t_alignment = uint64_t_alignment;
+       session->long_alignment = long_alignment;
+       session->byte_order = byte_order;
+
+       ret = lttng_uuid_generate(session->uuid);
+       if (ret) {
+               ERR("Failed to generate UST uuid (errno = %d)", ret);
+               goto error;
+       }
+
+       pthread_mutex_lock(&session->lock);
+       ret = ust_metadata_session_statedump(session, app);
+       pthread_mutex_unlock(&session->lock);
+       if (ret) {
+               ERR("Failed to generate session metadata (errno = %d)", ret);
+               goto error;
+       }
+
+       return 0;
+
+error:
+       return -1;
+}
+
+/*
+ * Destroy session registry. This does NOT free the given pointer since it
+ * might get passed as a reference. The registry lock should NOT be acquired.
+ */
+void ust_registry_session_destroy(struct ust_registry_session *reg)
+{
+       int ret;
+
+       /* On error, EBUSY can be returned if lock. Code flow error. */
+       ret = pthread_mutex_destroy(&reg->lock);
+       assert(!ret);
+
+       free(reg->metadata);
+}
diff --git a/src/bin/lttng-sessiond/ust-registry.h b/src/bin/lttng-sessiond/ust-registry.h
new file mode 100644 (file)
index 0000000..5efa082
--- /dev/null
@@ -0,0 +1,207 @@
+/*
+ * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef LTTNG_UST_REGISTRY_H
+#define LTTNG_UST_REGISTRY_H
+
+#include <pthread.h>
+#include <stdint.h>
+#include <lttng/ust-ctl.h>
+
+#include <common/hashtable/hashtable.h>
+#include <common/compat/uuid.h>
+
+#define CTF_SPEC_MAJOR 1
+#define CTF_SPEC_MINOR 8
+
+struct ust_app;
+
+struct ust_registry_session {
+       /*
+        * With multiple writers and readers, use this lock to access
+        * the registry. Use defined macros above to lock it.
+        * Can nest within the ust app session lock.
+        */
+       pthread_mutex_t lock;
+       /* Next channel ID available for a newly registered channel. */
+       uint32_t next_channel_id;
+       /* Once this value reaches UINT32_MAX, no more id can be allocated. */
+       uint32_t used_channel_id;
+       /* Universal unique identifier used by the tracer. */
+       unsigned char uuid[UUID_LEN];
+
+       /* session ABI description */
+
+       /* Size of long, in bits */
+       unsigned int bits_per_long;
+       /* Alignment, in bits */
+       unsigned int uint8_t_alignment,
+               uint16_t_alignment,
+               uint32_t_alignment,
+               uint64_t_alignment,
+               long_alignment;
+       /* endianness */
+       int byte_order; /* BIG_ENDIAN or LITTLE_ENDIAN */
+
+       /* Generated metadata. */
+       char *metadata;         /* NOT null-terminated ! Use memcpy. */
+       size_t metadata_len, metadata_alloc_len;
+       /* Length of bytes sent to the consumer. */
+       size_t metadata_len_sent;
+};
+
+struct ust_registry_channel {
+       /* Id set when replying to a register channel. */
+       uint32_t chan_id;
+       enum ustctl_channel_header header_type;
+
+       /*
+        * Hash table containing events sent by the UST tracer. MUST be accessed
+        * with a RCU read side lock acquired.
+        */
+       struct lttng_ht *ht;
+       /* Next event ID available for a newly registered event. */
+       uint32_t next_event_id;
+       /* Once this value reaches UINT32_MAX, no more id can be allocated. */
+       uint32_t used_event_id;
+       /*
+        * Context fields of the registry. Context are per channel. Allocated by a
+        * register channel notification from the UST tracer.
+        */
+       size_t nr_ctx_fields;
+       struct ustctl_field *ctx_fields;
+};
+
+/*
+ * Event registered from a UST tracer sent to the session daemon. This is
+ * indexed and matched by <event_name/signature>.
+ */
+struct ust_registry_event {
+       int id;
+       /* Both objd are set by the tracer. */
+       int session_objd;
+       int channel_objd;
+       /* Name of the event returned by the tracer. */
+       char name[LTTNG_UST_SYM_NAME_LEN];
+       char *signature;
+       int loglevel;
+       size_t nr_fields;
+       struct ustctl_field *fields;
+       char *model_emf_uri;
+       /*
+        * Node in the ust-registry hash table. The event name is used to
+        * initialize the node and the event_name/signature for the match function.
+        */
+       struct lttng_ht_node_str node;
+};
+
+/*
+ * Validate that the id has reached the maximum allowed or not.
+ *
+ * Return 0 if NOT else 1.
+ */
+static inline int ust_registry_is_max_id(uint32_t id)
+{
+       return (id == UINT32_MAX) ? 1 : 0;
+}
+
+/*
+ * Return next available event id and increment the used counter. The
+ * ust_registry_is_max_id function MUST be called before in order to validate
+ * if the maximum number of IDs have been reached. If not, it is safe to call
+ * this function.
+ *
+ * Return a unique channel ID. If max is reached, the used_event_id counter is
+ * returned.
+ */
+static inline uint32_t ust_registry_get_next_event_id(
+               struct ust_registry_channel *r)
+{
+       if (ust_registry_is_max_id(r->used_event_id)) {
+               return r->used_event_id;
+       }
+
+       r->used_event_id++;
+       return r->next_event_id++;
+}
+
+/*
+ * Return next available channel id and increment the used counter. The
+ * ust_registry_is_max_id function MUST be called before in order to validate
+ * if the maximum number of IDs have been reached. If not, it is safe to call
+ * this function.
+ *
+ * Return a unique channel ID. If max is reached, the used_channel_id counter
+ * is returned.
+ */
+static inline uint32_t ust_registry_get_next_chan_id(
+               struct ust_registry_session *r)
+{
+       if (ust_registry_is_max_id(r->used_channel_id)) {
+               return r->used_channel_id;
+       }
+
+       r->used_channel_id++;
+       return r->next_channel_id++;
+}
+
+/*
+ * Return registry event count. This is read atomically.
+ */
+static inline uint32_t ust_registry_get_event_count(
+               struct ust_registry_channel *r)
+{
+       return (uint32_t) uatomic_read(&r->used_event_id);
+}
+
+void ust_registry_channel_init(struct ust_registry_session *session,
+               struct ust_registry_channel *chan);
+void ust_registry_channel_destroy(struct ust_registry_session *session,
+               struct ust_registry_channel *chan);
+
+int ust_registry_session_init(struct ust_registry_session *session,
+               struct ust_app *app,
+               uint32_t bits_per_long,
+               uint32_t uint8_t_alignment,
+               uint32_t uint16_t_alignment,
+               uint32_t uint32_t_alignment,
+               uint32_t uint64_t_alignment,
+               uint32_t long_alignment,
+               int byte_order);
+
+void ust_registry_session_destroy(struct ust_registry_session *session);
+
+int ust_registry_create_event(struct ust_registry_session *session,
+               struct ust_registry_channel *channel,
+               int session_objd, int channel_objd, char *name, char *sig,
+               size_t nr_fields, struct ustctl_field *fields, int loglevel,
+               char *model_emf_uri, uint32_t *event_id);
+struct ust_registry_event *ust_registry_find_event(
+               struct ust_registry_channel *chan, char *name, char *sig);
+void ust_registry_destroy_event(struct ust_registry_channel *chan,
+               struct ust_registry_event *event);
+
+/* app can be NULL for registry shared across applications. */
+int ust_metadata_session_statedump(struct ust_registry_session *session,
+               struct ust_app *app);
+int ust_metadata_channel_statedump(struct ust_registry_session *session,
+               struct ust_registry_channel *chan);
+int ust_metadata_event_statedump(struct ust_registry_session *session,
+               struct ust_registry_channel *chan,
+               struct ust_registry_event *event);
+
+#endif /* LTTNG_UST_REGISTRY_H */
diff --git a/src/bin/lttng-sessiond/ust-thread.c b/src/bin/lttng-sessiond/ust-thread.c
new file mode 100644 (file)
index 0000000..552b7dd
--- /dev/null
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#define _GNU_SOURCE
+#include <assert.h>
+
+#include <common/common.h>
+#include <common/utils.h>
+
+#include "fd-limit.h"
+#include "lttng-sessiond.h"
+#include "ust-thread.h"
+
+/*
+ * This thread manage application notify communication.
+ */
+void *ust_thread_manage_notify(void *data)
+{
+       int i, ret, pollfd;
+       uint32_t revents, nb_fd;
+       struct lttng_poll_event events;
+
+       DBG("[ust-thread] Manage application notify command");
+
+       rcu_register_thread();
+       rcu_thread_online();
+
+       ret = sessiond_set_thread_pollset(&events, 2);
+       if (ret < 0) {
+               goto error_poll_create;
+       }
+
+       /* Add notify pipe to the pollset. */
+       ret = lttng_poll_add(&events, apps_cmd_notify_pipe[0], LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               goto error;
+       }
+
+       while (1) {
+               DBG3("[ust-thread] Manage notify polling on %d fds",
+                               LTTNG_POLL_GETNB(&events));
+
+               /* Inifinite blocking call, waiting for transmission */
+restart:
+               ret = lttng_poll_wait(&events, -1);
+               if (ret < 0) {
+                       /*
+                        * Restart interrupted system call.
+                        */
+                       if (errno == EINTR) {
+                               goto restart;
+                       }
+                       goto error;
+               }
+
+               nb_fd = ret;
+
+               for (i = 0; i < nb_fd; i++) {
+                       /* Fetch once the poll data */
+                       revents = LTTNG_POLL_GETEV(&events, i);
+                       pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       /* Thread quit pipe has been closed. Killing thread. */
+                       ret = sessiond_check_thread_quit_pipe(pollfd, revents);
+                       if (ret) {
+                               goto exit;
+                       }
+
+                       /* Inspect the apps cmd pipe */
+                       if (pollfd == apps_cmd_notify_pipe[0]) {
+                               int sock;
+
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Apps notify command pipe error");
+                                       goto error;
+                               } else if (!(revents & LPOLLIN)) {
+                                       /* No POLLIN and not a catched error, stop the thread. */
+                                       ERR("Notify command pipe failed. revent: %u", revents);
+                                       goto error;
+                               }
+
+                               do {
+                                       /* Get socket from dispatch thread. */
+                                       ret = read(apps_cmd_notify_pipe[0], &sock, sizeof(sock));
+                               } while (ret < 0 && errno == EINTR);
+                               if (ret < 0 || ret < sizeof(sock)) {
+                                       PERROR("read apps notify pipe");
+                                       goto error;
+                               }
+
+                               ret = lttng_poll_add(&events, sock,
+                                               LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
+                               if (ret < 0) {
+                                       /*
+                                        * It's possible we've reached the max poll fd allowed.
+                                        * Let's close the socket but continue normal execution.
+                                        */
+                                       ret = close(sock);
+                                       if (ret) {
+                                               PERROR("close notify socket %d", sock);
+                                       }
+                                       lttng_fd_put(LTTNG_FD_APPS, 1);
+                                       continue;
+                               }
+                               DBG3("UST thread notify added sock %d to pollset", sock);
+                       } else {
+                               /*
+                                * At this point, we know that a registered application
+                                * triggered the event.
+                                */
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       /* Removing from the poll set */
+                                       ret = lttng_poll_del(&events, pollfd);
+                                       if (ret < 0) {
+                                               goto error;
+                                       }
+
+                                       /* The socket is closed after a grace period here. */
+                                       ust_app_notify_sock_unregister(pollfd);
+                               } else if (revents & (LPOLLIN | LPOLLPRI)) {
+                                       ret = ust_app_recv_notify(pollfd);
+                                       if (ret < 0) {
+                                               /*
+                                                * If the notification failed either the application is
+                                                * dead or an internal error happened. In both cases,
+                                                * we can only continue here. If the application is
+                                                * dead, an unregistration will follow or else the
+                                                * application will notice that we are not responding
+                                                * on that socket and will close it.
+                                                */
+                                               continue;
+                                       }
+                               } else {
+                                       ERR("Unknown poll events %u for sock %d", revents, pollfd);
+                                       continue;
+                               }
+                       }
+               }
+       }
+
+exit:
+error:
+       lttng_poll_clean(&events);
+error_poll_create:
+       utils_close_pipe(apps_cmd_notify_pipe);
+       apps_cmd_notify_pipe[0] = apps_cmd_notify_pipe[1] = -1;
+       DBG("Application notify communication apps thread cleanup complete");
+       rcu_thread_offline();
+       rcu_unregister_thread();
+       return NULL;
+}
diff --git a/src/bin/lttng-sessiond/ust-thread.h b/src/bin/lttng-sessiond/ust-thread.h
new file mode 100644 (file)
index 0000000..0292df9
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef UST_THREAD_H
+#define UST_THREAD_H
+
+#ifdef HAVE_LIBLTTNG_UST_CTL
+
+void *ust_thread_manage_notify(void *data);
+
+#else /* HAVE_LIBLTTNG_UST_CTL */
+
+void *ust_thread_manage_notify(void *data)
+{
+       return NULL;
+}
+
+#endif /* HAVE_LIBLTTNG_UST_CTL */
+
+#endif /* UST_THREAD_H */
index 5bf8beb406f0b018dd40bb25959c7b8018f610c6..35faf538875053ccc240ceb2253230d39cdf695e 100644 (file)
  * Includes final \0.
  */
 #define UUID_STR_LEN           37
+#define UUID_LEN               16
 
 #ifdef LTTNG_HAVE_LIBUUID
 #include <uuid/uuid.h>
 
+/*
+ * uuid_out is of len UUID_LEN.
+ */
 static inline
 int lttng_uuid_generate(unsigned char *uuid_out)
 {
@@ -43,6 +47,9 @@ int lttng_uuid_generate(unsigned char *uuid_out)
 #include <uuid.h>
 #include <stdint.h>
 
+/*
+ * uuid_out is of len UUID_LEN.
+ */
 static inline
 int lttng_uuid_generate(unsigned char *uuid_out)
 {
index 09b3bee330457d46483a85d1f2f5402e25241b9e..300fd2a2fc896108ac60c43ce920a5f633be111b 100644 (file)
@@ -47,6 +47,16 @@ struct lttng_consumer_global_data consumer_data = {
        .type = LTTNG_CONSUMER_UNKNOWN,
 };
 
+enum consumer_channel_action {
+       CONSUMER_CHANNEL_ADD,
+       CONSUMER_CHANNEL_QUIT,
+};
+
+struct consumer_channel_msg {
+       enum consumer_channel_action action;
+       struct lttng_consumer_channel *chan;
+};
+
 /*
  * Flag to inform the polling thread to quit when all fd hung up. Updated by
  * the consumer_thread_receive_fds when it notices that all fds has hung up.
@@ -78,28 +88,59 @@ static void notify_thread_pipe(int wpipe)
        } while (ret < 0 && errno == EINTR);
 }
 
+static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_channel *chan,
+               enum consumer_channel_action action)
+{
+       struct consumer_channel_msg msg;
+       int ret;
+
+       msg.action = action;
+       msg.chan = chan;
+       do {
+               ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
+       } while (ret < 0 && errno == EINTR);
+}
+
+static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_channel **chan,
+               enum consumer_channel_action *action)
+{
+       struct consumer_channel_msg msg;
+       int ret;
+
+       do {
+               ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
+       } while (ret < 0 && errno == EINTR);
+       if (ret > 0) {
+               *action = msg.action;
+               *chan = msg.chan;
+       }
+       return ret;
+}
+
 /*
  * Find a stream. The consumer_data.lock must be locked during this
  * call.
  */
-static struct lttng_consumer_stream *find_stream(int key,
+static struct lttng_consumer_stream *find_stream(uint64_t key,
                struct lttng_ht *ht)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_consumer_stream *stream = NULL;
 
        assert(ht);
 
-       /* Negative keys are lookup failures */
-       if (key < 0) {
+       /* -1ULL keys are lookup failures */
+       if (key == (uint64_t) -1ULL) {
                return NULL;
        }
 
        rcu_read_lock();
 
-       lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+       lttng_ht_lookup(ht, &key, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                stream = caa_container_of(node, struct lttng_consumer_stream, node);
        }
@@ -116,13 +157,13 @@ static void steal_stream_key(int key, struct lttng_ht *ht)
        rcu_read_lock();
        stream = find_stream(key, ht);
        if (stream) {
-               stream->key = -1;
+               stream->key = -1ULL;
                /*
                 * We don't want the lookup to match, but we still need
                 * to iterate on this stream when iterating over the hash table. Just
                 * change the node key.
                 */
-               stream->node.key = -1;
+               stream->node.key = -1ULL;
        }
        rcu_read_unlock();
 }
@@ -133,19 +174,19 @@ static void steal_stream_key(int key, struct lttng_ht *ht)
  * RCU read side lock MUST be acquired before calling this function and
  * protects the channel ptr.
  */
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_consumer_channel *channel = NULL;
 
-       /* Negative keys are lookup failures */
-       if (key < 0) {
+       /* -1ULL keys are lookup failures */
+       if (key == (uint64_t) -1ULL) {
                return NULL;
        }
 
-       lttng_ht_lookup(consumer_data.channel_ht, (void *) key, &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+       lttng_ht_lookup(consumer_data.channel_ht, &key, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                channel = caa_container_of(node, struct lttng_consumer_channel, node);
        }
@@ -155,8 +196,8 @@ struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
 
 static void free_stream_rcu(struct rcu_head *head)
 {
-       struct lttng_ht_node_ulong *node =
-               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_ht_node_u64 *node =
+               caa_container_of(head, struct lttng_ht_node_u64, head);
        struct lttng_consumer_stream *stream =
                caa_container_of(node, struct lttng_consumer_stream, node);
 
@@ -165,8 +206,8 @@ static void free_stream_rcu(struct rcu_head *head)
 
 static void free_channel_rcu(struct rcu_head *head)
 {
-       struct lttng_ht_node_ulong *node =
-               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_ht_node_u64 *node =
+               caa_container_of(head, struct lttng_ht_node_u64, head);
        struct lttng_consumer_channel *channel =
                caa_container_of(node, struct lttng_consumer_channel, node);
 
@@ -178,8 +219,8 @@ static void free_channel_rcu(struct rcu_head *head)
  */
 static void free_relayd_rcu(struct rcu_head *head)
 {
-       struct lttng_ht_node_ulong *node =
-               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_ht_node_u64 *node =
+               caa_container_of(head, struct lttng_ht_node_u64, head);
        struct consumer_relayd_sock_pair *relayd =
                caa_container_of(node, struct consumer_relayd_sock_pair, node);
 
@@ -233,7 +274,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        int ret;
        struct lttng_ht_iter iter;
 
-       DBG("Consumer delete channel key %d", channel->key);
+       DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
 
@@ -425,7 +466,10 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        ret = lttng_ht_del(ht, &iter);
        assert(!ret);
 
-       /* Remove node session id from the consumer_data stream ht */
+       iter.iter.node = &stream->node_channel_id.node;
+       ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
+       assert(!ret);
+
        iter.iter.node = &stream->node_session_id.node;
        ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
        assert(!ret);
@@ -490,8 +534,8 @@ free_stream_rcu:
        call_rcu(&stream->node.head, free_stream_rcu);
 }
 
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
-               int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+               uint64_t stream_key,
                enum lttng_consumer_stream_state state,
                const char *channel_name,
                uid_t uid,
@@ -540,13 +584,16 @@ struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
        }
 
        /* Key is always the wait_fd for streams. */
-       lttng_ht_node_init_ulong(&stream->node, stream->key);
+       lttng_ht_node_init_u64(&stream->node, stream->key);
+
+       /* Init node per channel id key */
+       lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
 
        /* Init session id node with the stream session id */
-       lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+       lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
 
-       DBG3("Allocated stream %s (key %d, relayd_id %d, session_id %" PRIu64,
-                       stream->name, stream->key, stream->net_seq_idx, stream->session_id);
+       DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64,
+                       stream->name, stream->key, channel_key, stream->net_seq_idx, stream->session_id);
 
        rcu_read_unlock();
        return stream;
@@ -573,7 +620,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
        assert(stream);
        assert(ht);
 
-       DBG3("Adding consumer stream %d", stream->key);
+       DBG3("Adding consumer stream %" PRIu64, stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->lock);
@@ -582,14 +629,17 @@ static int add_stream(struct lttng_consumer_stream *stream,
        /* Steal stream identifier to avoid having streams with the same key */
        steal_stream_key(stream->key, ht);
 
-       lttng_ht_add_unique_ulong(ht, &stream->node);
+       lttng_ht_add_unique_u64(ht, &stream->node);
+
+       lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
+                       &stream->node_channel_id);
 
        /*
         * Add stream to the stream_list_ht of the consumer data. No need to steal
         * the key since the HT does not use it and we allow to add redundant keys
         * into this table.
         */
-       lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+       lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
 
        /* Check and cleanup relayd */
        relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -629,18 +679,18 @@ static int add_stream(struct lttng_consumer_stream *stream,
 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret = 0;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
 
        assert(relayd);
 
        lttng_ht_lookup(consumer_data.relayd_ht,
-                       (void *)((unsigned long) relayd->net_seq_idx), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+                       &relayd->net_seq_idx, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                goto end;
        }
-       lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
+       lttng_ht_add_unique_u64(consumer_data.relayd_ht, &relayd->node);
 
 end:
        return ret;
@@ -668,7 +718,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
        obj->net_seq_idx = net_seq_idx;
        obj->refcount = 0;
        obj->destroy_flag = 0;
-       lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
+       lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
        pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
 
 error:
@@ -682,20 +732,20 @@ error:
  * RCU read-side lock must be held across this call and while using the
  * returned object.
  */
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
 {
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        /* Negative keys are lookup failures */
-       if (key < 0) {
+       if (key == (uint64_t) -1ULL) {
                goto error;
        }
 
-       lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
+       lttng_ht_lookup(consumer_data.relayd_ht, &key,
                        &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
        }
@@ -801,10 +851,13 @@ struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
        strncpy(channel->name, name, sizeof(channel->name));
        channel->name[sizeof(channel->name) - 1] = '\0';
 
-       lttng_ht_node_init_ulong(&channel->node, channel->key);
+       lttng_ht_node_init_u64(&channel->node, channel->key);
+
+       channel->wait_fd = -1;
+
        CDS_INIT_LIST_HEAD(&channel->streams.head);
 
-       DBG("Allocated channel (key %d)", channel->key)
+       DBG("Allocated channel (key %" PRIu64 ")", channel->key)
 
 end:
        return channel;
@@ -813,31 +866,37 @@ end:
 /*
  * Add a channel to the global list protected by a mutex.
  */
-int consumer_add_channel(struct lttng_consumer_channel *channel)
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+               struct lttng_consumer_local_data *ctx)
 {
        int ret = 0;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht,
-                       (void *)((unsigned long) channel->key), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+                       &channel->key, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                /* Channel already exist. Ignore the insertion */
-               ERR("Consumer add channel key %d already exists!", channel->key);
+               ERR("Consumer add channel key %" PRIu64 " already exists!",
+                       channel->key);
                ret = -1;
                goto end;
        }
 
-       lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+       lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
 
 end:
        rcu_read_unlock();
        pthread_mutex_unlock(&consumer_data.lock);
 
+       if (!ret && channel->wait_fd != -1 &&
+                       channel->metadata_stream == NULL) {
+               notify_channel_pipe(ctx, channel, CONSUMER_CHANNEL_ADD);
+       }
        return ret;
 }
 
@@ -978,6 +1037,8 @@ void lttng_consumer_cleanup(void)
 
        cleanup_relayd_ht();
 
+       lttng_ht_destroy(consumer_data.stream_per_chan_id_ht);
+
        /*
         * This HT contains streams that are freed by either the metadata thread or
         * the data thread so we do *nothing* on the hash table and simply destroy
@@ -1062,7 +1123,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(int stream_key, uint32_t state))
 {
-       int ret, i;
+       int ret;
        struct lttng_consumer_local_data *ctx;
 
        assert(consumer_data.type == LTTNG_CONSUMER_UNKNOWN ||
@@ -1114,6 +1175,12 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_thread_pipe;
        }
 
+       ret = pipe(ctx->consumer_channel_pipe);
+       if (ret < 0) {
+               PERROR("Error creating channel pipe");
+               goto error_channel_pipe;
+       }
+
        ret = utils_create_pipe(ctx->consumer_metadata_pipe);
        if (ret < 0) {
                goto error_metadata_pipe;
@@ -1129,26 +1196,14 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 error_splice_pipe:
        utils_close_pipe(ctx->consumer_metadata_pipe);
 error_metadata_pipe:
+       utils_close_pipe(ctx->consumer_channel_pipe);
+error_channel_pipe:
        utils_close_pipe(ctx->consumer_thread_pipe);
 error_thread_pipe:
-       for (i = 0; i < 2; i++) {
-               int err;
-
-               err = close(ctx->consumer_should_quit[i]);
-               if (err) {
-                       PERROR("close");
-               }
-       }
+       utils_close_pipe(ctx->consumer_should_quit);
 error_poll_fcntl:
 error_quit_pipe:
-       for (i = 0; i < 2; i++) {
-               int err;
-
-               err = close(ctx->consumer_data_pipe[i]);
-               if (err) {
-                       PERROR("close");
-               }
-       }
+       utils_close_pipe(ctx->consumer_data_pipe);
 error_poll_pipe:
        free(ctx);
 error:
@@ -1168,30 +1223,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        if (ret) {
                PERROR("close");
        }
-       ret = close(ctx->consumer_thread_pipe[0]);
-       if (ret) {
-               PERROR("close");
-       }
-       ret = close(ctx->consumer_thread_pipe[1]);
-       if (ret) {
-               PERROR("close");
-       }
-       ret = close(ctx->consumer_data_pipe[0]);
-       if (ret) {
-               PERROR("close");
-       }
-       ret = close(ctx->consumer_data_pipe[1]);
-       if (ret) {
-               PERROR("close");
-       }
-       ret = close(ctx->consumer_should_quit[0]);
-       if (ret) {
-               PERROR("close");
-       }
-       ret = close(ctx->consumer_should_quit[1]);
-       if (ret) {
-               PERROR("close");
-       }
+       utils_close_pipe(ctx->consumer_thread_pipe);
+       utils_close_pipe(ctx->consumer_channel_pipe);
+       utils_close_pipe(ctx->consumer_data_pipe);
+       utils_close_pipe(ctx->consumer_should_quit);
        utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
        unlink(ctx->consumer_command_sock_path);
@@ -1634,7 +1669,6 @@ int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
                assert(0);
                return -ENOSYS;
        }
-
 }
 
 /*
@@ -1728,6 +1762,32 @@ static void destroy_stream_ht(struct lttng_ht *ht)
        lttng_ht_destroy(ht);
 }
 
+void lttng_consumer_close_metadata(void)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               /*
+                * The Kernel consumer has a different metadata scheme so we don't
+                * close anything because the stream will be closed by the session
+                * daemon.
+                */
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               /*
+                * Close all metadata streams. The metadata hash table is passed and
+                * this call iterates over it by closing all wakeup fd. This is safe
+                * because at this point we are sure that the metadata producer is
+                * either dead or blocked.
+                */
+               lttng_ustconsumer_close_metadata(metadata_ht);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+       }
+}
+
 /*
  * Clean up a metadata stream and free its memory.
  */
@@ -1780,7 +1840,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        ret = lttng_ht_del(ht, &iter);
        assert(!ret);
 
-       /* Remove node session id from the consumer_data stream ht */
+       iter.iter.node = &stream->node_channel_id.node;
+       ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
+       assert(!ret);
+
        iter.iter.node = &stream->node_session_id.node;
        ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
        assert(!ret);
@@ -1852,12 +1915,12 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
 
        assert(stream);
        assert(ht);
 
-       DBG3("Adding metadata stream %d to hash table", stream->key);
+       DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->lock);
@@ -1873,8 +1936,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
         * Lookup the stream just to make sure it does not exist in our internal
         * state. This should NEVER happen.
         */
-       lttng_ht_lookup(ht, (void *)((unsigned long) stream->key), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
+       lttng_ht_lookup(ht, &stream->key, &iter);
+       node = lttng_ht_iter_get_node_u64(&iter);
        assert(!node);
 
        /* Find relayd and, if one is found, increment refcount. */
@@ -1897,14 +1960,17 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_dec(&stream->chan->nb_init_stream_left);
        }
 
-       lttng_ht_add_unique_ulong(ht, &stream->node);
+       lttng_ht_add_unique_u64(ht, &stream->node);
+
+       lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
+               &stream->node_channel_id);
 
        /*
         * Add stream to the stream_list_ht of the consumer data. No need to steal
         * the key since the HT does not use it and we allow to add redundant keys
         * into this table.
         */
-       lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+       lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
 
        rcu_read_unlock();
 
@@ -1976,17 +2042,17 @@ void *consumer_thread_metadata_poll(void *data)
        uint32_t revents, nb_fd;
        struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
-       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_node_u64 *node;
        struct lttng_poll_event events;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
 
        rcu_register_thread();
 
-       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!metadata_ht) {
                /* ENOMEM at this point. Better to bail out. */
-               goto error;
+               goto end_ht;
        }
 
        DBG("Thread metadata poll started");
@@ -1995,7 +2061,7 @@ void *consumer_thread_metadata_poll(void *data)
        ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
        if (ret < 0) {
                ERR("Poll set creation failed");
-               goto end;
+               goto end_poll;
        }
 
        ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
@@ -2092,9 +2158,12 @@ restart:
                        }
 
                        rcu_read_lock();
-                       lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
-                                       &iter);
-                       node = lttng_ht_iter_get_node_ulong(&iter);
+                       {
+                               uint64_t tmp_id = (uint64_t) pollfd;
+
+                               lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
+                       }
+                       node = lttng_ht_iter_get_node_u64(&iter);
                        assert(node);
 
                        stream = caa_container_of(node, struct lttng_consumer_stream,
@@ -2151,10 +2220,11 @@ restart:
 error:
 end:
        DBG("Metadata poll thread exiting");
-       lttng_poll_clean(&events);
 
+       lttng_poll_clean(&events);
+end_poll:
        destroy_stream_ht(metadata_ht);
-
+end_ht:
        rcu_unregister_thread();
        return NULL;
 }
@@ -2176,7 +2246,7 @@ void *consumer_thread_data_poll(void *data)
 
        rcu_register_thread();
 
-       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (data_ht == NULL) {
                /* ENOMEM at this point. Better to bail out. */
                goto end;
@@ -2285,7 +2355,7 @@ void *consumer_thread_data_poll(void *data)
 
                        ret = add_stream(new_stream, data_ht);
                        if (ret) {
-                               ERR("Consumer add stream %d failed. Continuing",
+                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
                                                new_stream->key);
                                /*
                                 * At this point, if the add_stream fails, it is not in the
@@ -2417,6 +2487,222 @@ end:
        return NULL;
 }
 
+/*
+ * Close wake-up end of each stream belonging to the channel. This will
+ * allow the poll() on the stream read-side to detect when the
+ * write-side (application) finally closes them.
+ */
+static
+void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
+{
+       struct lttng_ht *ht;
+       struct lttng_consumer_stream *stream;
+       struct lttng_ht_iter iter;
+
+       ht = consumer_data.stream_per_chan_id_ht;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&channel->key, lttng_ht_seed),
+                       ht->match_fct, &channel->key,
+                       &iter.iter, stream, node_channel_id.node) {
+               switch (consumer_data.type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * Note: a mutex is taken internally within
+                        * liblttng-ust-ctl to protect timer wakeup_fd
+                        * use from concurrent close.
+                        */
+                       lttng_ustconsumer_close_stream_wakeup(stream);
+                       break;
+               default:
+                       ERR("Unknown consumer_data type");
+                       assert(0);
+               }
+       }
+       rcu_read_unlock();
+}
+
+static void destroy_channel_ht(struct lttng_ht *ht)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_channel *channel;
+       int ret;
+
+       if (ht == NULL) {
+               return;
+       }
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
+               ret = lttng_ht_del(ht, &iter);
+               assert(ret != 0);
+       }
+       rcu_read_unlock();
+
+       lttng_ht_destroy(ht);
+}
+
+/*
+ * This thread polls the channel fds to detect when they are being
+ * closed. It closes all related streams if the channel is detected as
+ * closed. It is currently only used as a shim layer for UST because the
+ * consumerd needs to keep the per-stream wakeup end of pipes open for
+ * periodical flush.
+ */
+void *consumer_thread_channel_poll(void *data)
+{
+       int ret, i, pollfd;
+       uint32_t revents, nb_fd;
+       struct lttng_consumer_channel *chan = NULL;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_u64 *node;
+       struct lttng_poll_event events;
+       struct lttng_consumer_local_data *ctx = data;
+       struct lttng_ht *channel_ht;
+
+       rcu_register_thread();
+
+       channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!channel_ht) {
+               /* ENOMEM at this point. Better to bail out. */
+               goto end_ht;
+       }
+
+       DBG("Thread channel poll started");
+
+       /* Size is set to 1 for the consumer_channel pipe */
+       ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
+       if (ret < 0) {
+               ERR("Poll set creation failed");
+               goto end_poll;
+       }
+
+       ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
+       if (ret < 0) {
+               goto end;
+       }
+
+       /* Main loop */
+       DBG("Channel main loop started");
+
+       while (1) {
+               /* Only the channel pipe is set */
+               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       goto end;
+               }
+
+restart:
+               DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               ret = lttng_poll_wait(&events, -1);
+               DBG("Channel event catched in thread");
+               if (ret < 0) {
+                       if (errno == EINTR) {
+                               ERR("Poll EINTR catched");
+                               goto restart;
+                       }
+                       goto end;
+               }
+
+               nb_fd = ret;
+
+               /* From here, the event is a channel wait fd */
+               for (i = 0; i < nb_fd; i++) {
+                       revents = LTTNG_POLL_GETEV(&events, i);
+                       pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       /* Just don't waste time if no returned events for the fd */
+                       if (!revents) {
+                               continue;
+                       }
+                       if (pollfd == ctx->consumer_channel_pipe[0]) {
+                               if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Channel thread pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+                                       continue;
+                               } else if (revents & LPOLLIN) {
+                                       enum consumer_channel_action action;
+
+                                       ret = read_channel_pipe(ctx, &chan, &action);
+                                       if (ret <= 0) {
+                                               ERR("Error reading channel pipe");
+                                               continue;
+                                       }
+
+                                       switch (action) {
+                                       case CONSUMER_CHANNEL_ADD:
+                                               DBG("Adding channel %d to poll set",
+                                                       chan->wait_fd);
+
+                                               lttng_ht_node_init_u64(&chan->wait_fd_node,
+                                                       chan->wait_fd);
+                                               lttng_ht_add_unique_u64(channel_ht,
+                                                               &chan->wait_fd_node);
+                                               /* Add channel to the global poll events list */
+                                               lttng_poll_add(&events, chan->wait_fd,
+                                                               LPOLLIN | LPOLLPRI);
+                                               break;
+                                       case CONSUMER_CHANNEL_QUIT:
+                                               /*
+                                                * Remove the pipe from the poll set and continue the loop
+                                                * since their might be data to consume.
+                                                */
+                                               lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+                                               continue;
+                                       default:
+                                               ERR("Unknown action");
+                                               break;
+                                       }
+                               }
+
+                               /* Handle other stream */
+                               continue;
+                       }
+
+                       rcu_read_lock();
+                       {
+                               uint64_t tmp_id = (uint64_t) pollfd;
+
+                               lttng_ht_lookup(channel_ht, &tmp_id, &iter);
+                       }
+                       node = lttng_ht_iter_get_node_u64(&iter);
+                       assert(node);
+
+                       chan = caa_container_of(node, struct lttng_consumer_channel,
+                                       wait_fd_node);
+
+                       /* Check for error event */
+                       if (revents & (LPOLLERR | LPOLLHUP)) {
+                               DBG("Channel fd %d is hup|err.", pollfd);
+
+                               lttng_poll_del(&events, chan->wait_fd);
+                               ret = lttng_ht_del(channel_ht, &iter);
+                               assert(ret == 0);
+                               consumer_close_channel_streams(chan);
+                       }
+
+                       /* Release RCU lock for the channel looked up */
+                       rcu_read_unlock();
+               }
+       }
+
+end:
+       lttng_poll_clean(&events);
+end_poll:
+       destroy_channel_ht(channel_ht);
+end_ht:
+       DBG("Channel poll thread exiting");
+       rcu_unregister_thread();
+       return NULL;
+}
+
 /*
  * This thread listens on the consumerd socket and receives the file
  * descriptors from the session daemon.
@@ -2521,6 +2807,14 @@ void *consumer_thread_sessiond_poll(void *data)
 end:
        DBG("Consumer thread sessiond poll exiting");
 
+       /*
+        * Close metadata streams since the producer is the session daemon which
+        * just died.
+        *
+        * NOTE: for now, this only applies to the UST tracer.
+        */
+       lttng_consumer_close_metadata();
+
        /*
         * when all fds have hung up, the polling thread
         * can exit cleanly
@@ -2533,6 +2827,8 @@ end:
         */
        notify_thread_pipe(ctx->consumer_data_pipe[1]);
 
+       notify_channel_pipe(ctx, NULL, CONSUMER_CHANNEL_QUIT);
+
        /* Cleaning up possibly open sockets. */
        if (sock >= 0) {
                ret = close(sock);
@@ -2597,9 +2893,10 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
  */
 void lttng_consumer_init(void)
 {
-       consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-       consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-       consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
 }
 
 /*
@@ -2730,7 +3027,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                goto error;
        }
 
-       DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+       DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
 
@@ -2867,8 +3164,8 @@ int consumer_data_pending(uint64_t id)
        }
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
-                       ht->match_fct, (void *)((unsigned long) id),
+                       ht->hash_fct(&id, lttng_ht_seed),
+                       ht->match_fct, &id,
                        &iter.iter, stream, node_session_id.node) {
                /* If this call fails, the stream is being used hence data pending. */
                ret = stream_try_lock(stream);
index 92f9e20957f09351834a366a38a1a708498c8dad..29836e5b09cf6e25220e20baac2d3fd48b5cfa08 100644 (file)
@@ -49,6 +49,9 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_ASK_CHANNEL_CREATION,
        LTTNG_CONSUMER_GET_CHANNEL,
        LTTNG_CONSUMER_DESTROY_CHANNEL,
+       LTTNG_CONSUMER_PUSH_METADATA,
+       LTTNG_CONSUMER_CLOSE_METADATA,
+       LTTNG_CONSUMER_SETUP_METADATA,
 };
 
 /* State of each fd in consumer */
@@ -77,7 +80,7 @@ enum consumer_channel_output {
 
 enum consumer_channel_type {
        CONSUMER_CHANNEL_TYPE_METADATA  = 0,
-       CONSUMER_CHANNEL_TYPE_DATA              = 1,
+       CONSUMER_CHANNEL_TYPE_DATA      = 1,
 };
 
 struct stream_list {
@@ -87,9 +90,9 @@ struct stream_list {
 
 struct lttng_consumer_channel {
        /* HT node used for consumer_data.channel_ht */
-       struct lttng_ht_node_ulong node;
+       struct lttng_ht_node_u64 node;
        /* Indexed key. Incremented value in the consumer. */
-       int key;
+       uint64_t key;
        /* Number of streams referencing this channel */
        int refcount;
        /* Tracing session id on the session daemon side. */
@@ -102,7 +105,7 @@ struct lttng_consumer_channel {
        uid_t uid;
        gid_t gid;
        /* Relayd id of the channel. -1 if it does not apply. */
-       int relayd_id;
+       int64_t relayd_id;
        /*
         * Number of streams NOT initialized yet. This is used in order to not
         * delete this channel if streams are getting initialized.
@@ -122,6 +125,22 @@ struct lttng_consumer_channel {
         * LTTNG_CONSUMER_GET_CHANNEL.
         */
        struct stream_list streams;
+       /*
+        * Set if the channel is metadata. We keep a reference to the stream
+        * because we have to flush data once pushed by the session daemon. For a
+        * regular channel, this is always set to NULL.
+        */
+       struct lttng_consumer_stream *metadata_stream;
+       /*
+        * Metadata written so far. Helps keeping track of
+        * contiguousness and order.
+        */
+       uint64_t contig_metadata_written;
+
+       /* for UST */
+       int wait_fd;
+       /* Node within channel thread ht */
+       struct lttng_ht_node_u64 wait_fd_node;
 };
 
 /*
@@ -130,14 +149,16 @@ struct lttng_consumer_channel {
  */
 struct lttng_consumer_stream {
        /* HT node used by the data_ht and metadata_ht */
-       struct lttng_ht_node_ulong node;
+       struct lttng_ht_node_u64 node;
+       /* stream indexed per channel key node */
+       struct lttng_ht_node_u64 node_channel_id;
        /* HT node used in consumer_data.stream_list_ht */
-       struct lttng_ht_node_ulong node_session_id;
+       struct lttng_ht_node_u64 node_session_id;
        /* Pointer to associated channel. */
        struct lttng_consumer_channel *chan;
 
        /* Key by which the stream is indexed for 'node'. */
-       int key;
+       uint64_t key;
        /*
         * File descriptor of the data output file. This can be either a file or a
         * socket fd for relayd streaming.
@@ -167,7 +188,7 @@ struct lttng_consumer_stream {
        uid_t uid;
        gid_t gid;
        /* Network sequence number. Indicating on which relayd socket it goes. */
-       int net_seq_idx;
+       uint64_t net_seq_idx;
        /* Identify if the stream is the metadata */
        unsigned int metadata_flag;
        /* Used when the stream is set for network streaming */
@@ -214,7 +235,7 @@ struct lttng_consumer_stream {
  */
 struct consumer_relayd_sock_pair {
        /* Network sequence number. */
-       int net_seq_idx;
+       int64_t net_seq_idx;
        /* Number of stream associated with this relayd */
        unsigned int refcount;
 
@@ -245,7 +266,7 @@ struct consumer_relayd_sock_pair {
         * this socket is for now only used in a single thread.
         */
        struct lttcomm_sock data_sock;
-       struct lttng_ht_node_ulong node;
+       struct lttng_ht_node_u64 node;
 
        /* Session id on both sides for the sockets. */
        uint64_t relayd_session_id;
@@ -306,6 +327,7 @@ struct lttng_consumer_local_data {
        char *consumer_command_sock_path;
        /* communication with splice */
        int consumer_thread_pipe[2];
+       int consumer_channel_pipe[2];
        int consumer_splice_metadata_pipe[2];
        /* Data stream poll thread pipe. To transfer data stream to the thread */
        int consumer_data_pipe[2];
@@ -358,6 +380,11 @@ struct lttng_consumer_global_data {
         * This HT uses the "node_session_id" of the consumer stream.
         */
        struct lttng_ht *stream_list_ht;
+
+       /*
+        * This HT uses the "node_channel_id" of the consumer stream.
+        */
+       struct lttng_ht *stream_per_chan_id_ht;
 };
 
 /*
@@ -407,8 +434,8 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
  */
 int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
 
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
-               int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+               uint64_t stream_key,
                enum lttng_consumer_stream_state state,
                const char *channel_name,
                uid_t uid,
@@ -418,7 +445,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type);
-struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
                const char *pathname,
                const char *name,
@@ -430,14 +457,15 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
-int consumer_add_channel(struct lttng_consumer_channel *channel);
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+               struct lttng_consumer_local_data *ctx);
 void consumer_del_channel(struct lttng_consumer_channel *channel);
 
 /* lttng-relayd consumer command */
 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
                int net_seq_idx);
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key);
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                size_t data_size);
 void consumer_steal_stream_key(int key, struct lttng_ht *ht);
@@ -464,6 +492,7 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
 void *consumer_thread_metadata_poll(void *data);
 void *consumer_thread_data_poll(void *data);
 void *consumer_thread_sessiond_poll(void *data);
+void *consumer_thread_channel_poll(void *data);
 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
 
index 93eb176a08f1f0f8e40199a8011c327850c7ae1d..fca645870f2500649395f7ed0706bc129043d24e 100644 (file)
@@ -44,9 +44,6 @@
 #define DEFAULT_GLOBAL_APPS_PIPE                DEFAULT_UST_SOCK_DIR "/global"
 #define DEFAULT_TRACE_OUTPUT                    DEFAULT_HOME_DIR "/lttng"
 
-#define DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH       "/lttng-ust-apps-wait"
-#define DEFAULT_HOME_APPS_WAIT_SHM_PATH         "/lttng-ust-apps-wait-%u"
-
 /* Default directory where the trace are written in per domain */
 #define DEFAULT_KERNEL_TRACE_DIR                "/kernel"
 #define DEFAULT_UST_TRACE_DIR                   "/ust"
 
 /* Default unix socket path */
 #define DEFAULT_GLOBAL_CLIENT_UNIX_SOCK         DEFAULT_LTTNG_RUNDIR "/client-lttng-sessiond"
-#define DEFAULT_GLOBAL_APPS_UNIX_SOCK           DEFAULT_LTTNG_RUNDIR "/apps-lttng-sessiond"
-#define DEFAULT_HOME_APPS_UNIX_SOCK             DEFAULT_LTTNG_HOME_RUNDIR "/apps-lttng-sessiond"
 #define DEFAULT_HOME_CLIENT_UNIX_SOCK           DEFAULT_LTTNG_HOME_RUNDIR "/client-lttng-sessiond"
 #define DEFAULT_GLOBAL_HEALTH_UNIX_SOCK         DEFAULT_LTTNG_RUNDIR "/health.sock"
 #define DEFAULT_HOME_HEALTH_UNIX_SOCK           DEFAULT_LTTNG_HOME_RUNDIR "/health.sock"
 
+#ifdef HAVE_LIBLTTNG_UST_CTL
+#define DEFAULT_GLOBAL_APPS_UNIX_SOCK \
+       DEFAULT_LTTNG_RUNDIR "/" LTTNG_UST_SOCK_FILENAME
+#define DEFAULT_HOME_APPS_UNIX_SOCK \
+       DEFAULT_LTTNG_HOME_RUNDIR "/" LTTNG_UST_SOCK_FILENAME
+#define DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH \
+       "/" LTTNG_UST_WAIT_FILENAME
+#define DEFAULT_HOME_APPS_WAIT_SHM_PATH \
+       DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH "-%d"
+
+#else
+#define DEFAULT_GLOBAL_APPS_UNIX_SOCK
+#define DEFAULT_HOME_APPS_UNIX_SOCK
+#endif /* HAVE_LIBLTTNG_UST_CTL */
+
 /*
  * Value taken from the hard limit allowed by the kernel when using setrlimit
  * with RLIMIT_NOFILE on an Intel i7 CPU and Linux 3.0.3.
 /* DEFAULT_CHANNEL_SUBBUF_NUM must always be a power of 2 */
 #define DEFAULT_CHANNEL_SUBBUF_NUM      4
 #define DEFAULT_CHANNEL_SWITCH_TIMER    0       /* usec */
-#define DEFAULT_CHANNEL_READ_TIMER             200     /* usec */
+#define DEFAULT_CHANNEL_READ_TIMER      200000  /* usec */
 #define DEFAULT_CHANNEL_OUTPUT          LTTNG_EVENT_MMAP
 
 #define DEFAULT_METADATA_SUBBUF_SIZE    4096
 /* See lttng-kernel.h enum lttng_kernel_output for channel output */
 #define DEFAULT_KERNEL_CHANNEL_OUTPUT       LTTNG_EVENT_SPLICE
 
+#define DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER    \
+               DEFAULT_CHANNEL_SWITCH_TIMER
+#define DEFAULT_KERNEL_CHANNEL_READ_TIMER      200000  /* usec */
+
 /* User space defaults */
 
 /* Must be a power of 2 */
 /* See lttng-ust.h enum lttng_ust_output */
 #define DEFAULT_UST_CHANNEL_OUTPUT          LTTNG_EVENT_MMAP
 
+#define DEFAULT_UST_CHANNEL_SWITCH_TIMER       \
+               DEFAULT_CHANNEL_SWITCH_TIMER
+#define DEFAULT_UST_CHANNEL_READ_TIMER      0  /* usec */
+
 /*
  * Default timeout value for the sem_timedwait() call. Blocking forever is not
  * wanted so a timeout is used to control the data flow and not freeze the
index 24d579773e0ab26d89cfb5f078f84562ce4431df..263df46839d37af58e2efe66770f5e1833c90f0f 100644 (file)
@@ -54,6 +54,17 @@ static int match_ulong(struct cds_lfht_node *node, const void *key)
        return hash_match_key_ulong((void *) match_node->key, (void *) key);
 }
 
+/*
+ * Match function for u64 node.
+ */
+static int match_u64(struct cds_lfht_node *node, const void *key)
+{
+       struct lttng_ht_node_u64 *match_node =
+               caa_container_of(node, struct lttng_ht_node_u64, node);
+
+       return hash_match_key_u64(&match_node->key, (void *) key);
+}
+
 /*
  * Return an allocated lttng hashtable.
  */
@@ -88,6 +99,10 @@ struct lttng_ht *lttng_ht_new(unsigned long size, int type)
                ht->match_fct = match_ulong;
                ht->hash_fct = hash_key_ulong;
                break;
+       case LTTNG_HT_TYPE_U64:
+               ht->match_fct = match_u64;
+               ht->hash_fct = hash_key_u64;
+               break;
        default:
                ERR("Unknown lttng hashtable type %d", type);
                goto error;
@@ -136,6 +151,18 @@ void lttng_ht_node_init_ulong(struct lttng_ht_node_ulong *node,
        cds_lfht_node_init(&node->node);
 }
 
+/*
+ * Init lttng ht node uint64_t.
+ */
+void lttng_ht_node_init_u64(struct lttng_ht_node_u64 *node,
+               uint64_t key)
+{
+       assert(node);
+
+       node->key = key;
+       cds_lfht_node_init(&node->node);
+}
+
 /*
  * Free lttng ht node string.
  */
@@ -154,6 +181,15 @@ void lttng_ht_node_free_ulong(struct lttng_ht_node_ulong *node)
        free(node);
 }
 
+/*
+ * Free lttng ht node uint64_t.
+ */
+void lttng_ht_node_free_u64(struct lttng_ht_node_ulong *node)
+{
+       assert(node);
+       free(node);
+}
+
 /*
  * Lookup function in hashtable.
  */
@@ -196,6 +232,20 @@ void lttng_ht_add_ulong(struct lttng_ht *ht, struct lttng_ht_node_ulong *node)
                        &node->node);
 }
 
+/*
+ * Add uint64_t node to hashtable.
+
+ */
+void lttng_ht_add_u64(struct lttng_ht *ht, struct lttng_ht_node_u64 *node)
+{
+       assert(ht);
+       assert(ht->ht);
+       assert(node);
+
+       cds_lfht_add(ht->ht, ht->hash_fct(&node->key, lttng_ht_seed),
+                       &node->node);
+}
+
 /*
  * Add unique unsigned long node to hashtable.
  */
@@ -213,6 +263,23 @@ void lttng_ht_add_unique_ulong(struct lttng_ht *ht,
        assert(node_ptr == &node->node);
 }
 
+/*
+ * Add unique uint64_t node to hashtable.
+ */
+void lttng_ht_add_unique_u64(struct lttng_ht *ht,
+               struct lttng_ht_node_u64 *node)
+{
+       struct cds_lfht_node *node_ptr;
+       assert(ht);
+       assert(ht->ht);
+       assert(node);
+
+       node_ptr = cds_lfht_add_unique(ht->ht,
+                       ht->hash_fct(&node->key, lttng_ht_seed), ht->match_fct,
+                       &node->key, &node->node);
+       assert(node_ptr == &node->node);
+}
+
 /*
  * Add replace unsigned long node to hashtable.
  */
@@ -235,6 +302,28 @@ struct lttng_ht_node_ulong *lttng_ht_add_replace_ulong(struct lttng_ht *ht,
        assert(node_ptr == &node->node);
 }
 
+/*
+ * Add replace unsigned long node to hashtable.
+ */
+struct lttng_ht_node_u64 *lttng_ht_add_replace_u64(struct lttng_ht *ht,
+               struct lttng_ht_node_u64 *node)
+{
+       struct cds_lfht_node *node_ptr;
+       assert(ht);
+       assert(ht->ht);
+       assert(node);
+
+       node_ptr = cds_lfht_add_replace(ht->ht,
+                       ht->hash_fct(&node->key, lttng_ht_seed), ht->match_fct,
+                       &node->key, &node->node);
+       if (!node_ptr) {
+               return NULL;
+       } else {
+               return caa_container_of(node_ptr, struct lttng_ht_node_u64, node);
+       }
+       assert(node_ptr == &node->node);
+}
+
 /*
  * Delete node from hashtable.
  */
@@ -319,10 +408,26 @@ struct lttng_ht_node_ulong *lttng_ht_iter_get_node_ulong(
        return caa_container_of(node, struct lttng_ht_node_ulong, node);
 }
 
+/*
+ * Return lttng ht unsigned long node from iterator.
+ */
+struct lttng_ht_node_u64 *lttng_ht_iter_get_node_u64(
+               struct lttng_ht_iter *iter)
+{
+       struct cds_lfht_node *node;
+
+       assert(iter);
+       node = cds_lfht_iter_get_node(&iter->iter);
+       if (!node) {
+               return NULL;
+       }
+       return caa_container_of(node, struct lttng_ht_node_u64, node);
+}
+
 /*
  * lib constructor
  */
-static void __attribute__((constructor)) _init()
+static void __attribute__((constructor)) _init(void)
 {
        /* Init hash table seed */
        lttng_ht_seed = (unsigned long) time(NULL);
index 4007a9c6c6b29696313415c60473f198cc859a6a..b4c1909b79796fe1a84b4714664939697a5488b2 100644 (file)
@@ -19,6 +19,7 @@
 #define _LTT_HT_H
 
 #include <urcu.h>
+#include <stdint.h>
 
 #include "rculfhash.h"
 #include "rculfhash-internal.h"
@@ -31,6 +32,7 @@ typedef cds_lfht_match_fct hash_match_fct;
 enum lttng_ht_type {
        LTTNG_HT_TYPE_STRING,
        LTTNG_HT_TYPE_ULONG,
+       LTTNG_HT_TYPE_U64,
 };
 
 struct lttng_ht {
@@ -55,6 +57,12 @@ struct lttng_ht_node_ulong {
        struct rcu_head head;
 };
 
+struct lttng_ht_node_u64 {
+       uint64_t key;
+       struct cds_lfht_node node;
+       struct rcu_head head;
+};
+
 /* Hashtable new and destroy */
 extern struct lttng_ht *lttng_ht_new(unsigned long size, int type);
 extern void lttng_ht_destroy(struct lttng_ht *ht);
@@ -63,8 +71,11 @@ extern void lttng_ht_destroy(struct lttng_ht *ht);
 extern void lttng_ht_node_init_str(struct lttng_ht_node_str *node, char *key);
 extern void lttng_ht_node_init_ulong(struct lttng_ht_node_ulong *node,
                unsigned long key);
+extern void lttng_ht_node_init_u64(struct lttng_ht_node_u64 *node,
+               uint64_t key);
 extern void lttng_ht_node_free_str(struct lttng_ht_node_str *node);
 extern void lttng_ht_node_free_ulong(struct lttng_ht_node_ulong *node);
+extern void lttng_ht_node_free_u64(struct lttng_ht_node_ulong *node);
 
 extern void lttng_ht_lookup(struct lttng_ht *ht, void *key,
                struct lttng_ht_iter *iter);
@@ -74,10 +85,16 @@ extern void lttng_ht_add_unique_str(struct lttng_ht *ht,
                struct lttng_ht_node_str *node);
 extern void lttng_ht_add_unique_ulong(struct lttng_ht *ht,
                struct lttng_ht_node_ulong *node);
+extern void lttng_ht_add_unique_u64(struct lttng_ht *ht,
+               struct lttng_ht_node_u64 *node);
 extern struct lttng_ht_node_ulong *lttng_ht_add_replace_ulong(
                struct lttng_ht *ht, struct lttng_ht_node_ulong *node);
+extern struct lttng_ht_node_u64 *lttng_ht_add_replace_u64(
+               struct lttng_ht *ht, struct lttng_ht_node_u64 *node);
 extern void lttng_ht_add_ulong(struct lttng_ht *ht,
                struct lttng_ht_node_ulong *node);
+extern void lttng_ht_add_u64(struct lttng_ht *ht,
+               struct lttng_ht_node_u64 *node);
 
 extern int lttng_ht_del(struct lttng_ht *ht, struct lttng_ht_iter *iter);
 
@@ -91,5 +108,7 @@ extern struct lttng_ht_node_str *lttng_ht_iter_get_node_str(
                struct lttng_ht_iter *iter);
 extern struct lttng_ht_node_ulong *lttng_ht_iter_get_node_ulong(
                struct lttng_ht_iter *iter);
+extern struct lttng_ht_node_u64 *lttng_ht_iter_get_node_u64(
+               struct lttng_ht_iter *iter);
 
 #endif /* _LTT_HT_H */
index 850f9e5db3d905be2f83da28221fc18ab6e589cb..8d0e515aecafbb94a347c418f23bf341f719f71c 100644 (file)
@@ -446,12 +446,8 @@ static uint32_t __attribute__((unused)) hashlittle(const void *key,
        return c;
 }
 
-#if (CAA_BITS_PER_LONG == 64)
-/*
- * Hash function for number value.
- */
 LTTNG_HIDDEN
-unsigned long hash_key_ulong(void *_key, unsigned long seed)
+unsigned long hash_key_u64(void *_key, unsigned long seed)
 {
        union {
                uint64_t v64;
@@ -463,10 +459,21 @@ unsigned long hash_key_ulong(void *_key, unsigned long seed)
        } key;
 
        v.v64 = (uint64_t) seed;
-       key.v64 = (uint64_t) _key;
+       key.v64 = *(uint64_t *) _key;
        hashword2(key.v32, 2, &v.v32[0], &v.v32[1]);
        return v.v64;
 }
+
+#if (CAA_BITS_PER_LONG == 64)
+/*
+ * Hash function for number value.
+ */
+LTTNG_HIDDEN
+unsigned long hash_key_ulong(void *_key, unsigned long seed)
+{
+       uint64_t __key = (uint64_t) _key;
+       return (unsigned long) hash_key_u64(&__key, seed);
+}
 #else
 /*
  * Hash function for number value.
@@ -502,6 +509,19 @@ int hash_match_key_ulong(void *key1, void *key2)
        return 0;
 }
 
+/*
+ * Hash function compare for number value.
+ */
+LTTNG_HIDDEN
+int hash_match_key_u64(void *key1, void *key2)
+{
+       if (*(uint64_t *) key1 == *(uint64_t *) key2) {
+               return 1;
+       }
+
+       return 0;
+}
+
 /*
  * Hash compare function for string.
  */
index 4f0890892ef4b27bf964e3be5b7765fac77ec13e..38d6121e4ce65cd891e7d1ce6d485515596985dd 100644 (file)
 #include <stdint.h>
 
 unsigned long hash_key_ulong(void *_key, unsigned long seed);
+unsigned long hash_key_u64(void *_key, unsigned long seed);
 unsigned long hash_key_str(void *key, unsigned long seed);
 int hash_match_key_ulong(void *key1, void *key2);
+int hash_match_key_u64(void *key1, void *key2);
 int hash_match_key_str(void *key1, void *key2);
 
 #endif /* _LTT_HT_UTILS_H */
index 9d75d3f09cfc9ada6f30503a6c7ddfe306366809..0de73443e1bee5c51549e608d20b1548cb007257 100644 (file)
@@ -128,7 +128,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
-               DBG("consumer_add_channel %d", msg.u.channel.channel_key);
+               DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
                                msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
@@ -153,12 +153,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (ctx->on_recv_channel != NULL) {
                        ret = ctx->on_recv_channel(new_channel);
                        if (ret == 0) {
-                               consumer_add_channel(new_channel);
+                               consumer_add_channel(new_channel, ctx);
                        } else if (ret < 0) {
                                goto end_nosignal;
                        }
                } else {
-                       consumer_add_channel(new_channel);
+                       consumer_add_channel(new_channel, ctx);
                }
                goto end_nosignal;
        }
@@ -180,7 +180,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                         * We could not find the channel. Can happen if cpu hotplug
                         * happens while tearing down.
                         */
-                       ERR("Unable to find channel key %d", msg.u.stream.channel_key);
+                       ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
@@ -265,8 +265,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
-               } else if (new_stream->net_seq_idx != -1) {
-                       ERR("Network sequence index %d unknown. Not adding stream.",
+               } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+                       ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
                                        new_stream->net_seq_idx);
                        consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
@@ -464,8 +464,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 * network streaming or the full padding (len) size when we are _not_
                 * streaming.
                 */
-               if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
-                               (ret != len && stream->net_seq_idx == -1)) {
+               if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
+                               (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
                        /*
                         * Display the error but continue processing to try to release the
                         * subbuffer
@@ -513,7 +513,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        }
 
        /* Opening the tracefile in write mode */
-       if (stream->net_seq_idx == -1) {
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
                ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
                                S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid);
                if (ret < 0) {
index c390a9f5284a573a49f0c524750f5d7c66eaf25f..60a3eade6171faf18f886078723ed48b2c1cc2f5 100644 (file)
@@ -117,6 +117,8 @@ enum lttcomm_return_code {
        LTTCOMM_CONSUMERD_SPLICE_ENOMEM,            /* ENOMEM from splice(2) */
        LTTCOMM_CONSUMERD_SPLICE_ESPIPE,            /* ESPIPE from splice(2) */
        LTTCOMM_CONSUMERD_ENOMEM,                   /* Consumer is out of memory */
+       LTTCOMM_CONSUMERD_ERROR_METADATA,           /* Error with metadata. */
+       LTTCOMM_CONSUMERD_FATAL,                    /* Fatal error. */
 
        /* MUST be last element */
        LTTCOMM_NR,                                             /* Last element */
@@ -145,7 +147,7 @@ struct lttcomm_sockaddr {
 } LTTNG_PACKED;
 
 struct lttcomm_sock {
-       int fd;
+       int32_t fd;
        enum lttcomm_sock_proto proto;
        struct lttcomm_sockaddr sockaddr;
        const struct lttcomm_proto_ops *ops;
@@ -172,7 +174,7 @@ struct lttcomm_proto_ops {
  * Data structure received from lttng client to session daemon.
  */
 struct lttcomm_session_msg {
-       uint32_t cmd_type;    /* enum lttcomm_sessiond_command */
+       uint32_t cmd_type;      /* enum lttcomm_sessiond_command */
        struct lttng_session session;
        struct lttng_domain domain;
        union {
@@ -233,9 +235,9 @@ struct lttng_filter_bytecode {
  * Data structure for the response from sessiond to the lttng client.
  */
 struct lttcomm_lttng_msg {
-       uint32_t cmd_type;   /* enum lttcomm_sessiond_command */
-       uint32_t ret_code;   /* enum lttcomm_return_code */
-       uint32_t pid;        /* pid_t */
+       uint32_t cmd_type;      /* enum lttcomm_sessiond_command */
+       uint32_t ret_code;      /* enum lttcomm_return_code */
+       uint32_t pid;           /* pid_t */
        uint32_t data_size;
        /* Contains: trace_name + data */
        char payload[];
@@ -259,26 +261,26 @@ struct lttcomm_consumer_msg {
        uint32_t cmd_type;      /* enum consumerd_command */
        union {
                struct {
-                       int channel_key;
+                       uint64_t channel_key;
                        uint64_t session_id;
                        char pathname[PATH_MAX];
-                       uid_t uid;
-                       gid_t gid;
-                       int relayd_id;
+                       uint32_t uid;
+                       uint32_t gid;
+                       uint64_t relayd_id;
                        /* nb_init_streams is the number of streams open initially. */
-                       unsigned int nb_init_streams;
+                       uint32_t nb_init_streams;
                        char name[LTTNG_SYMBOL_NAME_LEN];
                        /* Use splice or mmap to consume this fd */
                        enum lttng_event_output output;
                        int type; /* Per cpu or metadata. */
                } LTTNG_PACKED channel; /* Only used by Kernel. */
                struct {
-                       int stream_key;
-                       int channel_key;
-                       int cpu;        /* On which CPU this stream is assigned. */
+                       uint64_t stream_key;
+                       uint64_t channel_key;
+                       int32_t cpu;    /* On which CPU this stream is assigned. */
                } LTTNG_PACKED stream;  /* Only used by Kernel. */
                struct {
-                       int net_index;
+                       uint64_t net_index;
                        enum lttng_stream_type type;
                        /* Open socket to the relayd */
                        struct lttcomm_sock sock;
@@ -292,28 +294,39 @@ struct lttcomm_consumer_msg {
                        uint64_t session_id;
                } LTTNG_PACKED data_pending;
                struct {
-                       uint64_t subbuf_size;                           /* bytes */
-                       uint64_t num_subbuf;                            /* power of 2 */
-                       int overwrite;                                          /* 1: overwrite, 0: discard */
-                       unsigned int switch_timer_interval;     /* usec */
-                       unsigned int read_timer_interval;       /* usec */
-                       int output;                                                     /* splice, mmap */
-                       int type;                                                       /* metadata or per_cpu */
-                       uint64_t session_id;                            /* Tracing session id */
-                       char pathname[PATH_MAX];                        /* Channel file path. */
+                       uint64_t subbuf_size;                   /* bytes */
+                       uint64_t num_subbuf;                    /* power of 2 */
+                       int32_t overwrite;                      /* 1: overwrite, 0: discard */
+                       uint32_t switch_timer_interval;         /* usec */
+                       uint32_t read_timer_interval;           /* usec */
+                       int32_t output;                         /* splice, mmap */
+                       int32_t type;                           /* metadata or per_cpu */
+                       uint64_t session_id;                    /* Tracing session id */
+                       char pathname[PATH_MAX];                /* Channel file path. */
                        char name[LTTNG_SYMBOL_NAME_LEN];       /* Channel name. */
-                       uid_t uid;                                                      /* User ID of the session */
-                       gid_t gid;                                                      /* Group ID ot the session */
-                       int relayd_id;                                          /* Relayd id if apply. */
-                       unsigned long key;                                      /* Unique channel key. */
+                       uint32_t uid;                           /* User ID of the session */
+                       uint32_t gid;                           /* Group ID ot the session */
+                       uint64_t relayd_id;                     /* Relayd id if apply. */
+                       uint64_t key;                           /* Unique channel key. */
                        unsigned char uuid[UUID_STR_LEN];       /* uuid for ust tracer. */
                } LTTNG_PACKED ask_channel;
                struct {
-                       unsigned long key;
+                       uint64_t key;
                } LTTNG_PACKED get_channel;
                struct {
-                       unsigned long key;
+                       uint64_t key;
                } LTTNG_PACKED destroy_channel;
+               struct {
+                       uint64_t key;   /* Metadata channel key. */
+                       uint64_t target_offset; /* Offset in the consumer */
+                       uint64_t len;   /* Length of metadata to be received. */
+               } LTTNG_PACKED push_metadata;
+               struct {
+                       uint64_t key;   /* Metadata channel key. */
+               } LTTNG_PACKED close_metadata;
+               struct {
+                       uint64_t key;   /* Metadata channel key. */
+               } LTTNG_PACKED setup_metadata;
        } u;
 } LTTNG_PACKED;
 
@@ -326,7 +339,7 @@ struct lttcomm_consumer_status_msg {
 
 struct lttcomm_consumer_status_channel {
        enum lttng_error_code ret_code;
-       unsigned long key;
+       uint64_t key;
        unsigned int stream_count;
 } LTTNG_PACKED;
 
index 442925754c634ec4d067382b5cdd7ed8f35b385e..5a09ff51fb751069a3ad2fdbed17e698a0630dd0 100644 (file)
@@ -88,17 +88,17 @@ static int add_channel(struct lttng_consumer_channel *channel,
        if (ctx->on_recv_channel != NULL) {
                ret = ctx->on_recv_channel(channel);
                if (ret == 0) {
-                       ret = consumer_add_channel(channel);
+                       ret = consumer_add_channel(channel, ctx);
                } else if (ret < 0) {
                        /* Most likely an ENOMEM. */
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto error;
                }
        } else {
-               ret = consumer_add_channel(channel);
+               ret = consumer_add_channel(channel, ctx);
        }
 
-       DBG("UST consumer channel added (key: %u)", channel->key);
+       DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
 
 error:
        return ret;
@@ -109,7 +109,7 @@ error:
  */
 static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
                const char *pathname, const char *name, uid_t uid, gid_t gid,
-               int relayd_id, unsigned long key, enum lttng_event_output output)
+               int relayd_id, uint64_t key, enum lttng_event_output output)
 {
        assert(pathname);
        assert(name);
@@ -223,8 +223,8 @@ static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
                if (ret < 0) {
                        goto error;
                }
-       } else if (stream->net_seq_idx != -1) {
-               ERR("Network sequence index %d unknown. Not adding stream.",
+       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
                                stream->net_seq_idx);
                ret = -1;
                goto error;
@@ -234,6 +234,11 @@ error:
        return ret;
 }
 
+/*
+ * Create streams for the given channel using liblttng-ust-ctl.
+ *
+ * Return 0 on success else a negative value.
+ */
 static int create_ust_streams(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
 {
@@ -251,7 +256,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
        while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
                int wait_fd;
 
-               wait_fd = ustctl_get_wait_fd(ustream);
+               wait_fd = ustctl_stream_get_wait_fd(ustream);
 
                /* Allocate consumer stream object. */
                stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
@@ -288,11 +293,16 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                        }
                }
 
-               DBG("UST consumer add stream %s (key: %d) with relayd id %" PRIu64,
+               DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
                                stream->name, stream->key, stream->relayd_stream_id);
 
                /* Set next CPU stream. */
                channel->streams.count = ++cpu;
+
+               /* Keep stream reference when creating metadata. */
+               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+                       channel->metadata_stream = stream;
+               }
        }
 
        return 0;
@@ -338,6 +348,11 @@ error_create:
        return ret;
 }
 
+/*
+ * Send a single given stream to the session daemon using the sock.
+ *
+ * Return 0 on success else a negative value.
+ */
 static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
 {
        int ret;
@@ -345,7 +360,7 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
        assert(stream);
        assert(sock >= 0);
 
-       DBG2("UST consumer sending stream %d to sessiond", stream->key);
+       DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
 
        /* Send stream to session daemon. */
        ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
@@ -353,11 +368,6 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
                goto error;
        }
 
-       ret = ustctl_stream_close_wakeup_fd(stream->ustream);
-       if (ret < 0) {
-               goto error;
-       }
-
 error:
        return ret;
 }
@@ -365,8 +375,7 @@ error:
 /*
  * Send channel to sessiond.
  *
- * Return 0 on success or else a negative value. On error, the channel is
- * destroy using ustctl.
+ * Return 0 on success or else a negative value.
  */
 static int send_sessiond_channel(int sock,
                struct lttng_consumer_channel *channel,
@@ -387,6 +396,11 @@ static int send_sessiond_channel(int sock,
                goto error;
        }
 
+       ret = ustctl_channel_close_wakeup_fd(channel->uchan);
+       if (ret < 0) {
+               goto error;
+       }
+
        /* The channel was sent successfully to the sessiond at this point. */
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /* Try to send the stream to the relayd if one is available. */
@@ -462,6 +476,12 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
                goto error;
        }
 
+       channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
+
+       if (ret < 0) {
+               goto error;
+       }
+
        /* Open all streams for this channel. */
        ret = create_ust_streams(channel, ctx);
        if (ret < 0) {
@@ -472,6 +492,155 @@ error:
        return ret;
 }
 
+/*
+ * Send all stream of a channel to the right thread handling it.
+ *
+ * On error, return a negative value else 0 on success.
+ */
+static int send_streams_to_thread(struct lttng_consumer_channel *channel,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+       struct lttng_consumer_stream *stream, *stmp;
+
+       assert(channel);
+       assert(ctx);
+
+       /* Send streams to the corresponding thread. */
+       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                       send_node) {
+               /* Sending the stream to the thread. */
+               ret = send_stream_to_thread(stream, ctx);
+               if (ret < 0) {
+                       /*
+                        * If we are unable to send the stream to the thread, there is
+                        * a big problem so just stop everything.
+                        */
+                       goto error;
+               }
+
+               /* Remove node from the channel stream list. */
+               cds_list_del(&stream->send_node);
+       }
+
+error:
+       return ret;
+}
+
+/*
+ * Write metadata to the given channel using ustctl to convert the string to
+ * the ringbuffer.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int push_metadata(struct lttng_consumer_channel *metadata,
+               const char *metadata_str, uint64_t target_offset, uint64_t len)
+{
+       int ret;
+
+       assert(metadata);
+       assert(metadata_str);
+
+       DBG("UST consumer writing metadata to channel %s", metadata->name);
+
+       assert(target_offset == metadata->contig_metadata_written);
+       ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
+       if (ret < 0) {
+               ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
+               goto error;
+       }
+       metadata->contig_metadata_written += len;
+
+       ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
+
+error:
+       return ret;
+}
+
+/*
+ * Close metadata stream wakeup_fd using the given key to retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int close_metadata(uint64_t chan_key)
+{
+       int ret;
+       struct lttng_consumer_channel *channel;
+
+       DBG("UST consumer close metadata key %lu", chan_key);
+
+       channel = consumer_find_channel(chan_key);
+       if (!channel) {
+               ERR("UST consumer close metadata %lu not found", chan_key);
+               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+               goto error;
+       }
+
+       ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+       if (ret < 0) {
+               ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               goto err