Update version to 0.16
[ust.git] / libust / tracectl.c
index 848b272f77c5167de00828e88b67652472ada7f7..410b1c72e75cb52a2fd24a2898c590f31c011455 100644 (file)
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
  */
 
+/* This file contains the implementation of the UST listener thread, which
+ * receives trace control commands. It also coordinates the initialization of
+ * libust.
+ */
+
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <stdio.h>
+#include <stdlib.h>
 #include <stdint.h>
+#include <pthread.h>
 #include <signal.h>
+#include <sys/epoll.h>
+#include <sys/time.h>
 #include <sys/types.h>
 #include <sys/socket.h>
-#include <sys/un.h>
-#include <sched.h>
 #include <fcntl.h>
 #include <poll.h>
 #include <regex.h>
-
-#include <urcu-bp.h>
+#include <urcu/uatomic.h>
+#include <urcu/list.h>
 
 #include <ust/marker.h>
+#include <ust/tracepoint.h>
+#include <ust/tracepoint-internal.h>
 #include <ust/tracectl.h>
+#include <ust/clock.h>
 #include "tracer.h"
-#include "usterr.h"
+#include "usterr_signal_safe.h"
 #include "ustcomm.h"
 #include "buffers.h"
 #include "marker-control.h"
 
-//#define USE_CLONE
-
-#define USTSIGNAL SIGIO
-
-#define MAX_MSG_SIZE (100)
-#define MSG_NOTIF 1
-#define MSG_REGISTER_NOTIF 2
-
-char consumer_stack[10000];
-
 /* This should only be accessed by the constructor, before the creation
  * of the listener, and then only by the listener.
  */
 s64 pidunique = -1LL;
 
-struct list_head blocked_consumers = LIST_HEAD_INIT(blocked_consumers);
+/* The process pid is used to detect a non-traceable fork
+ * and allow the non-traceable fork to be ignored
+ * by destructor sequences in libust
+ */
+static pid_t processpid = 0;
 
-static struct ustcomm_app ustcomm_app;
+static struct ustcomm_header _receive_header;
+static struct ustcomm_header *receive_header = &_receive_header;
+static char receive_buffer[USTCOMM_BUFFER_SIZE];
+static char send_buffer[USTCOMM_BUFFER_SIZE];
 
-struct tracecmd { /* no padding */
-       uint32_t size;
-       uint16_t command;
-};
+static int epoll_fd;
 
-/* volatile because shared between the listener and the main thread */
-volatile sig_atomic_t buffers_to_export = 0;
-
-struct trctl_msg {
-       /* size: the size of all the fields except size itself */
-       uint32_t size;
-       uint16_t type;
-       /* Only the necessary part of the payload is transferred. It
-         * may even be none of it.
-         */
-       char payload[94];
-};
+/*
+ * Listener thread data vs fork() protection mechanism. Ensures that no listener
+ * thread mutexes and data structures are being concurrently modified or held by
+ * other threads when fork() is executed.
+ */
+static pthread_mutex_t listener_thread_data_mutex = PTHREAD_MUTEX_INITIALIZER;
 
-struct consumer_channel {
-       int fd;
-       struct ltt_channel_struct *chan;
-};
+/* Mutex protecting listen_sock. Nests inside listener_thread_data_mutex. */
+static pthread_mutex_t listen_sock_mutex = PTHREAD_MUTEX_INITIALIZER;
+static struct ustcomm_sock *listen_sock;
 
-struct blocked_consumer {
-       int fd_consumer;
-       int fd_producer;
-       int tmp_poll_idx;
+extern struct chan_info_struct chan_infos[];
 
-       /* args to ustcomm_send_reply */
-       struct ustcomm_server server;
-       struct ustcomm_source src;
+static struct cds_list_head ust_socks = CDS_LIST_HEAD_INIT(ust_socks);
 
-       /* args to ust_buffers_get_subbuf */
-       struct ust_buffer *buf;
+/* volatile because shared between the listener and the main thread */
+int buffers_to_export = 0;
 
-       struct list_head list;
-};
+int ust_clock_source;
 
 static long long make_pidunique(void)
 {
        s64 retval;
        struct timeval tv;
-       
+
        gettimeofday(&tv, NULL);
 
        retval = tv.tv_sec;
@@ -108,52 +100,99 @@ static long long make_pidunique(void)
        return retval;
 }
 
-static void print_markers(FILE *fp)
+static void print_ust_marker(FILE *fp)
 {
-       struct marker_iter iter;
-
-       lock_markers();
-       marker_iter_reset(&iter);
-       marker_iter_start(&iter);
-
-       while(iter.marker) {
-               fprintf(fp, "marker: %s/%s %d \"%s\" %p\n", iter.marker->channel, iter.marker->name, (int)imv_read(iter.marker->state), iter.marker->format, iter.marker->location);
-               marker_iter_next(&iter);
+       struct ust_marker_iter iter;
+
+       ust_marker_iter_reset(&iter);
+       ust_marker_iter_start(&iter);
+
+       while (iter.ust_marker) {
+               fprintf(fp, "ust_marker: %s/%s %d \"%s\" %p\n",
+                       (*iter.ust_marker)->channel,
+                       (*iter.ust_marker)->name,
+                       (int)(*iter.ust_marker)->state,
+                       (*iter.ust_marker)->format,
+                       NULL);  /*
+                                * location is null for now, will be added
+                                * to a different table.
+                                */
+               ust_marker_iter_next(&iter);
        }
-       unlock_markers();
+       ust_marker_iter_stop(&iter);
 }
 
-static int init_socket(void);
+static void print_trace_events(FILE *fp)
+{
+       struct trace_event_iter iter;
 
-/* This needs to be called whenever a new thread is created. It notifies
- * liburcu of the new thread.
- */
+       trace_event_iter_reset(&iter);
+       trace_event_iter_start(&iter);
 
-void ust_register_thread(void)
-{
-       rcu_register_thread();
+       while (iter.trace_event) {
+               fprintf(fp, "trace_event: %s\n", (*iter.trace_event)->name);
+               trace_event_iter_next(&iter);
+       }
+       trace_event_iter_stop(&iter);
 }
 
-int fd_notif = -1;
-void notif_cb(void)
+static int connect_ustconsumer(void)
 {
-       int result;
-       struct trctl_msg msg;
+       int result, fd;
+       char default_daemon_path[] = SOCK_DIR "/ustconsumer";
+       char *explicit_daemon_path, *daemon_path;
+
+       explicit_daemon_path = getenv("UST_DAEMON_SOCKET");
+       if (explicit_daemon_path) {
+               daemon_path = explicit_daemon_path;
+       } else {
+               daemon_path = default_daemon_path;
+       }
 
-       /* FIXME: fd_notif should probably be protected by a spinlock */
+       DBG("Connecting to daemon_path %s", daemon_path);
 
-       if(fd_notif == -1)
-               return;
+       result = ustcomm_connect_path(daemon_path, &fd);
+       if (result < 0) {
+               WARN("connect_ustconsumer failed, daemon_path: %s",
+                    daemon_path);
+               return result;
+       }
+
+       return fd;
+}
 
-       msg.type = MSG_NOTIF;
-       msg.size = sizeof(msg.type);
 
-       /* FIXME: don't block here */
-       result = write(fd_notif, &msg, msg.size+sizeof(msg.size));
-       if(result == -1) {
-               PERROR("write");
+static void request_buffer_consumer(int sock,
+                                   const char *trace,
+                                   const char *channel,
+                                   int cpu)
+{
+       struct ustcomm_header send_header, recv_header;
+       struct ustcomm_buffer_info buf_inf;
+       int result = 0;
+
+       result = ustcomm_pack_buffer_info(&send_header,
+                                         &buf_inf,
+                                         trace,
+                                         channel,
+                                         cpu);
+
+       if (result < 0) {
+               ERR("failed to pack buffer info message %s_%d",
+                   channel, cpu);
                return;
        }
+
+       buf_inf.pid = getpid();
+       send_header.command = CONSUME_BUFFER;
+
+       result = ustcomm_req(sock, &send_header, (char *) &buf_inf,
+                            &recv_header, NULL);
+       if (result <= 0) {
+               PERROR("request for buffer consumer failed, is the daemon online?");
+       }
+
+       return;
 }
 
 /* Ask the daemon to collect a trace called trace_name and being
@@ -165,982 +204,985 @@ void notif_cb(void)
 
 static void inform_consumer_daemon(const char *trace_name)
 {
-       int i,j;
+       int sock, i,j;
        struct ust_trace *trace;
-       pid_t pid = getpid();
-       int result;
-
-       ltt_lock_traces();
-
-       trace = _ltt_trace_find(trace_name);
-       if(trace == NULL) {
-               WARN("inform_consumer_daemon: could not find trace \"%s\"; it is probably already destroyed", trace_name);
-               goto finish;
-       }
-
-       for(i=0; i < trace->nr_channels; i++) {
-               /* iterate on all cpus */
-               for(j=0; j<trace->channels[i].n_cpus; j++) {
-                       char *buf;
-                       asprintf(&buf, "%s_%d", trace->channels[i].channel_name, j);
-                       result = ustcomm_request_consumer(pid, buf);
-                       if(result == -1) {
-                               WARN("Failed to request collection for channel %s. Is the daemon available?", trace->channels[i].channel_name);
-                               /* continue even if fail */
-                       }
-                       free(buf);
-                       buffers_to_export++;
-               }
-       }
-
-       finish:
-       ltt_unlock_traces();
-}
-
-void process_blocked_consumers(void)
-{
-       int n_fds = 0;
-       struct pollfd *fds;
-       struct blocked_consumer *bc;
-       int idx = 0;
-       char inbuf;
-       int result;
-
-       list_for_each_entry(bc, &blocked_consumers, list) {
-               n_fds++;
-       }
-
-       fds = (struct pollfd *) malloc(n_fds * sizeof(struct pollfd));
-       if(fds == NULL) {
-               ERR("malloc returned NULL");
-               return;
-       }
-
-       list_for_each_entry(bc, &blocked_consumers, list) {
-               fds[idx].fd = bc->fd_producer;
-               fds[idx].events = POLLIN;
-               bc->tmp_poll_idx = idx;
-               idx++;
-       }
+       const char *ch_name;
 
-       while((result = poll(fds, n_fds, 0)) == -1 && errno == EINTR)
-               /* nothing */;
-       if(result == -1) {
-               PERROR("poll");
+       sock = connect_ustconsumer();
+       if (sock < 0) {
                return;
        }
 
-       list_for_each_entry(bc, &blocked_consumers, list) {
-               if(fds[bc->tmp_poll_idx].revents) {
-                       long consumed_old = 0;
-                       char *reply;
+       DBG("Connected to ustconsumer");
 
-                       result = read(bc->fd_producer, &inbuf, 1);
-                       if(result == -1) {
-                               PERROR("read");
-                               continue;
-                       }
-                       if(result == 0) {
-                               DBG("PRODUCER END");
-
-                               close(bc->fd_producer);
-
-                               list_del(&bc->list);
-
-                               result = ustcomm_send_reply(&bc->server, "END", &bc->src);
-                               if(result < 0) {
-                                       ERR("ustcomm_send_reply failed");
-                                       continue;
-                               }
+       ltt_lock_traces();
 
-                               continue;
-                       }
+       trace = _ltt_trace_find(trace_name);
+       if (trace == NULL) {
+               WARN("inform_consumer_daemon: could not find trace \"%s\"; it is probably already destroyed", trace_name);
+               goto unlock_traces;
+       }
 
-                       result = ust_buffers_get_subbuf(bc->buf, &consumed_old);
-                       if(result == -EAGAIN) {
-                               WARN("missed buffer?");
-                               continue;
-                       }
-                       else if(result < 0) {
-                               DBG("ust_buffers_get_subbuf: error: %s", strerror(-result));
+       for (i=0; i < trace->nr_channels; i++) {
+               if (trace->channels[i].request_collection) {
+                       /* iterate on all cpus */
+                       for (j=0; j<trace->channels[i].n_cpus; j++) {
+                               ch_name = trace->channels[i].channel_name;
+                               request_buffer_consumer(sock, trace_name,
+                                                       ch_name, j);
+                               CMM_STORE_SHARED(buffers_to_export,
+                                            CMM_LOAD_SHARED(buffers_to_export)+1);
                        }
-                       asprintf(&reply, "%s %ld", "OK", consumed_old);
-                       result = ustcomm_send_reply(&bc->server, reply, &bc->src);
-                       if(result < 0) {
-                               ERR("ustcomm_send_reply failed");
-                               free(reply);
-                               continue;
-                       }
-                       free(reply);
-
-                       list_del(&bc->list);
                }
        }
 
-}
-
-void seperate_channel_cpu(const char *channel_and_cpu, char **channel, int *cpu)
-{
-       const char *sep;
-
-       sep = rindex(channel_and_cpu, '_');
-       if(sep == NULL) {
-               *cpu = -1;
-               sep = channel_and_cpu + strlen(channel_and_cpu);
-       }
-       else {
-               *cpu = atoi(sep+1);
-       }
+unlock_traces:
+       ltt_unlock_traces();
 
-       asprintf(channel, "%.*s", (int)(sep-channel_and_cpu), channel_and_cpu);
+       close(sock);
 }
 
-static int do_cmd_get_shmid(const char *recvbuf, struct ustcomm_source *src)
+static struct ust_channel *find_channel(const char *ch_name,
+                                       struct ust_trace *trace)
 {
-       int retval = 0;
-       struct ust_trace *trace;
-       char trace_name[] = "auto";
        int i;
-       char *channel_and_cpu;
-       int found = 0;
-       int result;
-       char *ch_name;
-       int ch_cpu;
-
-       DBG("get_shmid");
-
-       channel_and_cpu = nth_token(recvbuf, 1);
-       if(channel_and_cpu == NULL) {
-               ERR("cannot parse channel");
-               goto end;
-       }
-
-       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
-       if(ch_cpu == -1) {
-               ERR("problem parsing channel name");
-               goto free_short_chan_name;
-       }
-
-       ltt_lock_traces();
-       trace = _ltt_trace_find(trace_name);
-       ltt_unlock_traces();
-
-       if(trace == NULL) {
-               ERR("cannot find trace!");
-               retval = -1;
-               goto free_short_chan_name;
-       }
-
-       for(i=0; i<trace->nr_channels; i++) {
-               struct ust_channel *channel = &trace->channels[i];
-               struct ust_buffer *buf = channel->buf[ch_cpu];
 
-               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
-                       char *reply;
-
-//                     DBG("the shmid for the requested channel is %d", buf->shmid);
-//                     DBG("the shmid for its buffer structure is %d", channel->buf_struct_shmids);
-                       asprintf(&reply, "%d %d", buf->shmid, channel->buf_struct_shmids[ch_cpu]);
-
-                       result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
-                       if(result) {
-                               ERR("ustcomm_send_reply failed");
-                               free(reply);
-                               retval = -1;
-                               goto free_short_chan_name;
-                       }
-
-                       free(reply);
-
-                       found = 1;
-                       break;
+       for (i=0; i<trace->nr_channels; i++) {
+               if (!strcmp(trace->channels[i].channel_name, ch_name)) {
+                       return &trace->channels[i];
                }
        }
 
-       if(!found) {
-               ERR("channel not found (%s)", channel_and_cpu);
-       }
-
-       free_short_chan_name:
-       free(ch_name);
-
-       end:
-       return retval;
+       return NULL;
 }
 
-static int do_cmd_get_n_subbufs(const char *recvbuf, struct ustcomm_source *src)
+static int get_buffer_shmid_pipe_fd(const char *trace_name, const char *ch_name,
+                                   int ch_cpu,
+                                   int *buf_shmid,
+                                   int *buf_struct_shmid,
+                                   int *buf_pipe_fd)
 {
-       int retval = 0;
        struct ust_trace *trace;
-       char trace_name[] = "auto";
-       int i;
-       char *channel_and_cpu;
-       int found = 0;
-       int result;
-       char *ch_name;
-       int ch_cpu;
-
-       DBG("get_n_subbufs");
-
-       channel_and_cpu = nth_token(recvbuf, 1);
-       if(channel_and_cpu == NULL) {
-               ERR("cannot parse channel");
-               goto end;
-       }
+       struct ust_channel *channel;
+       struct ust_buffer *buf;
 
-       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
-       if(ch_cpu == -1) {
-               ERR("problem parsing channel name");
-               goto free_short_chan_name;
-       }
+       DBG("get_buffer_shmid_pipe_fd");
 
        ltt_lock_traces();
        trace = _ltt_trace_find(trace_name);
        ltt_unlock_traces();
 
-       if(trace == NULL) {
+       if (trace == NULL) {
                ERR("cannot find trace!");
-               retval = -1;
-               goto free_short_chan_name;
+               return -ENODATA;
        }
 
-       for(i=0; i<trace->nr_channels; i++) {
-               struct ust_channel *channel = &trace->channels[i];
-
-               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
-                       char *reply;
-
-                       DBG("the n_subbufs for the requested channel is %d", channel->subbuf_cnt);
-                       asprintf(&reply, "%d", channel->subbuf_cnt);
-
-                       result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
-                       if(result) {
-                               ERR("ustcomm_send_reply failed");
-                               free(reply);
-                               retval = -1;
-                               goto free_short_chan_name;
-                       }
-
-                       free(reply);
-                       found = 1;
-                       break;
-               }
-       }
-       if(found == 0) {
-               ERR("unable to find channel");
+       channel = find_channel(ch_name, trace);
+       if (!channel) {
+               ERR("cannot find channel %s!", ch_name);
+               return -ENODATA;
        }
 
-       free_short_chan_name:
-       free(ch_name);
+       buf = channel->buf[ch_cpu];
 
-       end:
-       return retval;
+       *buf_shmid = buf->shmid;
+       *buf_struct_shmid = channel->buf_struct_shmids[ch_cpu];
+       *buf_pipe_fd = buf->data_ready_fd_read;
+
+       return 0;
 }
 
-static int do_cmd_get_subbuf_size(const char *recvbuf, struct ustcomm_source *src)
+static int get_subbuf_num_size(const char *trace_name, const char *ch_name,
+                              int *num, int *size)
 {
-       int retval = 0;
        struct ust_trace *trace;
-       char trace_name[] = "auto";
-       int i;
-       char *channel_and_cpu;
-       int found = 0;
-       int result;
-       char *ch_name;
-       int ch_cpu;
+       struct ust_channel *channel;
 
        DBG("get_subbuf_size");
 
-       channel_and_cpu = nth_token(recvbuf, 1);
-       if(channel_and_cpu == NULL) {
-               ERR("cannot parse channel");
-               goto end;
-       }
-
-       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
-       if(ch_cpu == -1) {
-               ERR("problem parsing channel name");
-               goto free_short_chan_name;
-       }
-
        ltt_lock_traces();
        trace = _ltt_trace_find(trace_name);
        ltt_unlock_traces();
 
-       if(trace == NULL) {
+       if (!trace) {
                ERR("cannot find trace!");
-               retval = -1;
-               goto free_short_chan_name;
+               return -ENODATA;
        }
 
-       for(i=0; i<trace->nr_channels; i++) {
-               struct ust_channel *channel = &trace->channels[i];
-
-               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
-                       char *reply;
-
-                       DBG("the subbuf_size for the requested channel is %zd", channel->subbuf_size);
-                       asprintf(&reply, "%zd", channel->subbuf_size);
-
-                       result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
-                       if(result) {
-                               ERR("ustcomm_send_reply failed");
-                               free(reply);
-                               retval = -1;
-                               goto free_short_chan_name;
-                       }
-
-                       free(reply);
-                       found = 1;
-                       break;
-               }
-       }
-       if(found == 0) {
+       channel = find_channel(ch_name, trace);
+       if (!channel) {
                ERR("unable to find channel");
+               return -ENODATA;
        }
 
-       free_short_chan_name:
-       free(ch_name);
+       *num = channel->subbuf_cnt;
+       *size = channel->subbuf_size;
 
-       end:
-       return retval;
+       return 0;
 }
 
-static unsigned int poweroftwo(unsigned int x)
-{
-    unsigned int power2 = 1;
-    unsigned int hardcoded = 2147483648; /* FIX max 2^31 */
+/* Return the power of two which is equal or higher to v */
 
-    if (x < 2)
-        return 2;
-
-    while (power2 < x && power2 < hardcoded)
-        power2 *= 2;
+static unsigned int pow2_higher_or_eq(unsigned int v)
+{
+       int hb = fls(v);
+       int retval = 1<<(hb-1);
 
-    return power2;
+       if (v-retval == 0)
+               return retval;
+       else
+               return retval<<1;
 }
 
-static int do_cmd_set_subbuf_size(const char *recvbuf, struct ustcomm_source *src)
+static int set_subbuf_size(const char *trace_name, const char *ch_name,
+                          unsigned int size)
 {
-       char *channel_slash_size;
-       char ch_name[256]="";
-       unsigned int size, power;
+       unsigned int power;
        int retval = 0;
        struct ust_trace *trace;
-       char trace_name[] = "auto";
-       int i;
-       int found = 0;
+       struct ust_channel *channel;
 
        DBG("set_subbuf_size");
 
-       channel_slash_size = nth_token(recvbuf, 1);
-       sscanf(channel_slash_size, "%255[^/]/%u", ch_name, &size);
-
-       if(ch_name == NULL) {
-               ERR("cannot parse channel");
-               goto end;
+       power = pow2_higher_or_eq(size);
+       power = max_t(unsigned int, 2u, power);
+       if (power != size) {
+               WARN("using the next power of two for buffer size = %u\n", power);
        }
 
-       power = poweroftwo(size);
-       if (power != size)
-               WARN("using the next 2^n = %u\n", power);
-
        ltt_lock_traces();
        trace = _ltt_trace_find_setup(trace_name);
-       if(trace == NULL) {
+       if (trace == NULL) {
                ERR("cannot find trace!");
-               retval = -1;
-               goto end;
+               retval = -ENODATA;
+               goto unlock_traces;
        }
 
-       for(i = 0; i < trace->nr_channels; i++) {
-               struct ust_channel *channel = &trace->channels[i];
-
-               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
-
-                       channel->subbuf_size = power;
-                       DBG("the set_subbuf_size for the requested channel is %zd", channel->subbuf_size);
-
-                       found = 1;
-                       break;
-               }
-       }
-       if(found == 0) {
+       channel = find_channel(ch_name, trace);
+       if (!channel) {
                ERR("unable to find channel");
+               retval = -ENODATA;
+               goto unlock_traces;
        }
 
-       end:
+       channel->subbuf_size = power;
+       DBG("the set_subbuf_size for the requested channel is %zu", channel->subbuf_size);
+
+unlock_traces:
        ltt_unlock_traces();
+
        return retval;
 }
 
-static int do_cmd_set_subbuf_num(const char *recvbuf, struct ustcomm_source *src)
+static int set_subbuf_num(const char *trace_name, const char *ch_name,
+                                unsigned int num)
 {
-       char *channel_slash_num;
-       char ch_name[256]="";
-       unsigned int num;
-       int retval = 0;
        struct ust_trace *trace;
-       char trace_name[] = "auto";
-       int i;
-       int found = 0;
+       struct ust_channel *channel;
+       int retval = 0;
 
        DBG("set_subbuf_num");
 
-       channel_slash_num = nth_token(recvbuf, 1);
-       sscanf(channel_slash_num, "%255[^/]/%u", ch_name, &num);
-
-       if(ch_name == NULL) {
-               ERR("cannot parse channel");
-               goto end;
-       }
        if (num < 2) {
                ERR("subbuffer count should be greater than 2");
-               goto end;
+               return -EINVAL;
        }
 
        ltt_lock_traces();
        trace = _ltt_trace_find_setup(trace_name);
-       if(trace == NULL) {
+       if (trace == NULL) {
                ERR("cannot find trace!");
-               retval = -1;
-               goto end;
+               retval = -ENODATA;
+               goto unlock_traces;
        }
 
-       for(i = 0; i < trace->nr_channels; i++) {
-               struct ust_channel *channel = &trace->channels[i];
-
-               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
-
-                       channel->subbuf_cnt = num;
-                       DBG("the set_subbuf_cnt for the requested channel is %zd", channel->subbuf_cnt);
-
-                       found = 1;
-                       break;
-               }
-       }
-       if(found == 0) {
+       channel = find_channel(ch_name, trace);
+       if (!channel) {
                ERR("unable to find channel");
+               retval = -ENODATA;
+               goto unlock_traces;
        }
 
-       end:
+       channel->subbuf_cnt = num;
+       DBG("the set_subbuf_cnt for the requested channel is %u", channel->subbuf_cnt);
+
+unlock_traces:
        ltt_unlock_traces();
        return retval;
 }
 
-static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src)
+static int get_subbuffer(const char *trace_name, const char *ch_name,
+                        int ch_cpu, long *consumed_old)
 {
        int retval = 0;
        struct ust_trace *trace;
-       char trace_name[] = "auto";
-       int i;
-       char *channel_and_cpu;
-       int found = 0;
-       char *ch_name;
-       int ch_cpu;
+       struct ust_channel *channel;
+       struct ust_buffer *buf;
 
        DBG("get_subbuf");
 
-       channel_and_cpu = nth_token(recvbuf, 1);
-       if(channel_and_cpu == NULL) {
-               ERR("cannot parse channel");
-               goto end;
-       }
-
-       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
-       if(ch_cpu == -1) {
-               ERR("problem parsing channel name");
-               goto free_short_chan_name;
-       }
+       *consumed_old = 0;
 
        ltt_lock_traces();
        trace = _ltt_trace_find(trace_name);
 
-       if(trace == NULL) {
-               int result;
-
-               WARN("Cannot find trace. It was likely destroyed by the user.");
-               result = ustcomm_send_reply(&ustcomm_app.server, "NOTFOUND", src);
-               if(result) {
-                       ERR("ustcomm_send_reply failed");
-                       retval = -1;
-                       goto unlock_traces;
-               }
+       if (!trace) {
+               DBG("Cannot find trace. It was likely destroyed by the user.");
+               retval = -ENODATA;
+               goto unlock_traces;
+       }
 
+       channel = find_channel(ch_name, trace);
+       if (!channel) {
+               ERR("unable to find channel");
+               retval = -ENODATA;
                goto unlock_traces;
        }
 
-       for(i=0; i<trace->nr_channels; i++) {
-               struct ust_channel *channel = &trace->channels[i];
+       buf = channel->buf[ch_cpu];
 
-               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
-                       struct ust_buffer *buf = channel->buf[ch_cpu];
-                       struct blocked_consumer *bc;
+       retval = ust_buffers_get_subbuf(buf, consumed_old);
+       if (retval < 0) {
+               WARN("missed buffer?");
+       }
 
-                       found = 1;
+unlock_traces:
+       ltt_unlock_traces();
 
-                       bc = (struct blocked_consumer *) malloc(sizeof(struct blocked_consumer));
-                       if(bc == NULL) {
-                               ERR("malloc returned NULL");
-                               goto unlock_traces;
-                       }
-                       bc->fd_consumer = src->fd;
-                       bc->fd_producer = buf->data_ready_fd_read;
-                       bc->buf = buf;
-                       bc->src = *src;
-                       bc->server = ustcomm_app.server;
-
-                       list_add(&bc->list, &blocked_consumers);
-
-                       /* Being here is the proof the daemon has mapped the buffer in its
-                        * memory. We may now decrement buffers_to_export.
-                        */
-                       if(atomic_long_read(&buf->consumed) == 0) {
-                               DBG("decrementing buffers_to_export");
-                               buffers_to_export--;
-                       }
+       return retval;
+}
 
-                       break;
-               }
+
+static int notify_buffer_mapped(const char *trace_name,
+                               const char *ch_name,
+                               int ch_cpu)
+{
+       int retval = 0;
+       struct ust_trace *trace;
+       struct ust_channel *channel;
+       struct ust_buffer *buf;
+
+       DBG("get_buffer_fd");
+
+       ltt_lock_traces();
+       trace = _ltt_trace_find(trace_name);
+
+       if (!trace) {
+               retval = -ENODATA;
+               DBG("Cannot find trace. It was likely destroyed by the user.");
+               goto unlock_traces;
        }
-       if(found == 0) {
+
+       channel = find_channel(ch_name, trace);
+       if (!channel) {
+               retval = -ENODATA;
                ERR("unable to find channel");
+               goto unlock_traces;
        }
 
-       unlock_traces:
-       ltt_unlock_traces();
+       buf = channel->buf[ch_cpu];
 
-       free_short_chan_name:
-       free(ch_name);
+       /* Being here is the proof the daemon has mapped the buffer in its
+        * memory. We may now decrement buffers_to_export.
+        */
+       if (uatomic_read(&buf->consumed) == 0) {
+               DBG("decrementing buffers_to_export");
+               CMM_STORE_SHARED(buffers_to_export, CMM_LOAD_SHARED(buffers_to_export)-1);
+       }
+
+unlock_traces:
+       ltt_unlock_traces();
 
-       end:
        return retval;
 }
 
-static int do_cmd_put_subbuffer(const char *recvbuf, struct ustcomm_source *src)
+static int put_subbuffer(const char *trace_name, const char *ch_name,
+                        int ch_cpu, long consumed_old)
 {
        int retval = 0;
        struct ust_trace *trace;
-       char trace_name[] = "auto";
-       int i;
-       char *channel_and_cpu;
-       int found = 0;
-       int result;
-       char *ch_name;
-       int ch_cpu;
-       long consumed_old;
-       char *consumed_old_str;
-       char *endptr;
-       char *reply = NULL;
+       struct ust_channel *channel;
+       struct ust_buffer *buf;
 
        DBG("put_subbuf");
 
-       channel_and_cpu = strdup_malloc(nth_token(recvbuf, 1));
-       if(channel_and_cpu == NULL) {
-               ERR("cannot parse channel");
-               retval = -1;
-               goto end;
-       }
+       ltt_lock_traces();
+       trace = _ltt_trace_find(trace_name);
 
-       consumed_old_str = strdup_malloc(nth_token(recvbuf, 2));
-       if(consumed_old_str == NULL) {
-               ERR("cannot parse consumed_old");
-               retval = -1;
-               goto free_channel_and_cpu;
-       }
-       consumed_old = strtol(consumed_old_str, &endptr, 10);
-       if(*endptr != '\0') {
-               ERR("invalid value for consumed_old");
-               retval = -1;
-               goto free_consumed_old_str;
+       if (!trace) {
+               retval = -ENODATA;
+               DBG("Cannot find trace. It was likely destroyed by the user.");
+               goto unlock_traces;
        }
 
-       seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
-       if(ch_cpu == -1) {
-               ERR("problem parsing channel name");
-               retval = -1;
-               goto free_short_chan_name;
+       channel = find_channel(ch_name, trace);
+       if (!channel) {
+               retval = -ENODATA;
+               ERR("unable to find channel");
+               goto unlock_traces;
        }
 
-       ltt_lock_traces();
-       trace = _ltt_trace_find(trace_name);
-
-       if(trace == NULL) {
-               WARN("Cannot find trace. It was likely destroyed by the user.");
-               result = ustcomm_send_reply(&ustcomm_app.server, "NOTFOUND", src);
-               if(result) {
-                       ERR("ustcomm_send_reply failed");
-                       retval = -1;
-                       goto unlock_traces;
-               }
+       buf = channel->buf[ch_cpu];
 
-               goto unlock_traces;
+       retval = ust_buffers_put_subbuf(buf, consumed_old);
+       if (retval < 0) {
+               WARN("ust_buffers_put_subbuf: error (subbuf=%s_%d)",
+                    ch_name, ch_cpu);
+       } else {
+               DBG("ust_buffers_put_subbuf: success (subbuf=%s_%d)",
+                   ch_name, ch_cpu);
        }
 
-       for(i=0; i<trace->nr_channels; i++) {
-               struct ust_channel *channel = &trace->channels[i];
+unlock_traces:
+       ltt_unlock_traces();
 
-               if(!strcmp(trace->channels[i].channel_name, ch_name)) {
-                       struct ust_buffer *buf = channel->buf[ch_cpu];
+       return retval;
+}
 
-                       found = 1;
+static void release_listener_mutex(void *ptr)
+{
+       pthread_mutex_unlock(&listener_thread_data_mutex);
+}
 
-                       result = ust_buffers_put_subbuf(buf, consumed_old);
-                       if(result < 0) {
-                               WARN("ust_buffers_put_subbuf: error (subbuf=%s)", channel_and_cpu);
-                               asprintf(&reply, "%s", "ERROR");
-                       }
-                       else {
-                               DBG("ust_buffers_put_subbuf: success (subbuf=%s)", channel_and_cpu);
-                               asprintf(&reply, "%s", "OK");
-                       }
+static void listener_cleanup(void *ptr)
+{
+       pthread_mutex_lock(&listen_sock_mutex);
+       if (listen_sock) {
+               ustcomm_del_named_sock(listen_sock, 0);
+               listen_sock = NULL;
+       }
+       pthread_mutex_unlock(&listen_sock_mutex);
+}
 
-                       result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
-                       if(result) {
-                               ERR("ustcomm_send_reply failed");
-                               free(reply);
-                               retval = -1;
-                               goto unlock_traces;
-                       }
+static int force_subbuf_switch(const char *trace_name)
+{
+       struct ust_trace *trace;
+       int i, j, retval = 0;
 
-                       free(reply);
-                       break;
+       ltt_lock_traces();
+       trace = _ltt_trace_find(trace_name);
+       if (!trace) {
+                retval = -ENODATA;
+                DBG("Cannot find trace. It was likely destroyed by the user.");
+                goto unlock_traces;
+        }
+
+       for (i = 0; i < trace->nr_channels; i++) {
+               for (j = 0; j < trace->channels[i].n_cpus; j++) {
+                       ltt_force_switch(trace->channels[i].buf[j],
+                                        FORCE_FLUSH);
                }
        }
-       if(found == 0) {
-               ERR("unable to find channel");
-       }
 
-       unlock_traces:
+unlock_traces:
        ltt_unlock_traces();
-       free_short_chan_name:
-       free(ch_name);
-       free_consumed_old_str:
-       free(consumed_old_str);
-       free_channel_and_cpu:
-       free(channel_and_cpu);
-
-       end:
+
        return retval;
 }
 
-void *listener_main(void *p)
+static int process_trace_cmd(int command, char *trace_name)
 {
        int result;
+       char trace_type[] = "ustrelay";
 
-       ust_register_thread();
+       switch(command) {
+       case START:
+               /* start is an operation that setups the trace, allocates it and starts it */
+               result = ltt_trace_setup(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_setup failed");
+                       return result;
+               }
 
-       DBG("LISTENER");
+               result = ltt_trace_set_type(trace_name, trace_type);
+               if (result < 0) {
+                       ERR("ltt_trace_set_type failed");
+                       return result;
+               }
 
-       for(;;) {
-               char trace_name[] = "auto";
-               char trace_type[] = "ustrelay";
-               char *recvbuf;
-               int len;
-               struct ustcomm_source src;
+               result = ltt_trace_alloc(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_alloc failed");
+                       return result;
+               }
 
-               process_blocked_consumers();
+               inform_consumer_daemon(trace_name);
 
-               result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src, 5);
-               if(result < 0) {
-                       WARN("error in ustcomm_app_recv_message");
-                       continue;
+               result = ltt_trace_start(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_start failed");
+                       return result;
                }
-               else if(result == 0) {
-                       /* no message */
-                       continue;
+
+               return 0;
+       case SETUP_TRACE:
+               DBG("trace setup");
+
+               result = ltt_trace_setup(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_setup failed");
+                       return result;
+               }
+
+               result = ltt_trace_set_type(trace_name, trace_type);
+               if (result < 0) {
+                       ERR("ltt_trace_set_type failed");
+                       return result;
                }
 
-               DBG("received a message! it's: %s", recvbuf);
-               len = strlen(recvbuf);
+               return 0;
+       case ALLOC_TRACE:
+               DBG("trace alloc");
 
-               if(!strcmp(recvbuf, "print_markers")) {
-                       print_markers(stderr);
+               result = ltt_trace_alloc(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_alloc failed");
+                       return result;
                }
-               else if(!strcmp(recvbuf, "list_markers")) {
-                       char *ptr;
-                       size_t size;
-                       FILE *fp;
+               inform_consumer_daemon(trace_name);
 
-                       fp = open_memstream(&ptr, &size);
-                       print_markers(fp);
-                       fclose(fp);
+               return 0;
 
-                       result = ustcomm_send_reply(&ustcomm_app.server, ptr, &src);
+       case CREATE_TRACE:
+               DBG("trace create");
 
-                       free(ptr);
+               result = ltt_trace_setup(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_setup failed");
+                       return result;
                }
-               else if(!strcmp(recvbuf, "start")) {
-                       /* start is an operation that setups the trace, allocates it and starts it */
-                       result = ltt_trace_setup(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_setup failed");
-                               return (void *)1;
-                       }
 
-                       result = ltt_trace_set_type(trace_name, trace_type);
-                       if(result < 0) {
-                               ERR("ltt_trace_set_type failed");
-                               return (void *)1;
-                       }
+               result = ltt_trace_set_type(trace_name, trace_type);
+               if (result < 0) {
+                       ERR("ltt_trace_set_type failed");
+                       return result;
+               }
 
-                       result = ltt_trace_alloc(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_alloc failed");
-                               return (void *)1;
-                       }
+               return 0;
+       case START_TRACE:
+               DBG("trace start");
 
+               result = ltt_trace_alloc(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_alloc failed");
+                       return result;
+               }
+               if (!result) {
                        inform_consumer_daemon(trace_name);
+               }
 
-                       result = ltt_trace_start(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_start failed");
-                               continue;
-                       }
+               result = ltt_trace_start(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_start failed");
+                       return result;
                }
-               else if(!strcmp(recvbuf, "trace_setup")) {
-                       DBG("trace setup");
 
-                       result = ltt_trace_setup(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_setup failed");
-                               return (void *)1;
-                       }
+               return 0;
+       case STOP_TRACE:
+               DBG("trace stop");
 
-                       result = ltt_trace_set_type(trace_name, trace_type);
-                       if(result < 0) {
-                               ERR("ltt_trace_set_type failed");
-                               return (void *)1;
-                       }
+               result = ltt_trace_stop(trace_name);
+               if (result < 0) {
+                       ERR("ltt_trace_stop failed");
+                       return result;
                }
-               else if(!strcmp(recvbuf, "trace_alloc")) {
-                       DBG("trace alloc");
 
-                       result = ltt_trace_alloc(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_alloc failed");
-                               return (void *)1;
-                       }
-                       inform_consumer_daemon(trace_name);
+               return 0;
+       case DESTROY_TRACE:
+               DBG("trace destroy");
+
+               result = ltt_trace_destroy(trace_name, 0);
+               if (result < 0) {
+                       ERR("ltt_trace_destroy failed");
+                       return result;
                }
-               else if(!strcmp(recvbuf, "trace_create")) {
-                       DBG("trace create");
+               return 0;
+       case FORCE_SUBBUF_SWITCH:
+               DBG("force switch");
+
+               result = force_subbuf_switch(trace_name);
+               if (result < 0) {
+                       ERR("force_subbuf_switch failed");
+                       return result;
+               }
+               return 0;
+       }
 
-                       result = ltt_trace_setup(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_setup failed");
-                               return (void *)1;
-                       }
+       return 0;
+}
 
-                       result = ltt_trace_set_type(trace_name, trace_type);
-                       if(result < 0) {
-                               ERR("ltt_trace_set_type failed");
-                               return (void *)1;
-                       }
+
+static void process_channel_cmd(int sock, int command,
+                               struct ustcomm_channel_info *ch_inf)
+{
+       struct ustcomm_header _reply_header;
+       struct ustcomm_header *reply_header = &_reply_header;
+       struct ustcomm_channel_info *reply_msg =
+               (struct ustcomm_channel_info *)send_buffer;
+       int result, offset = 0, num, size;
+
+       memset(reply_header, 0, sizeof(*reply_header));
+
+       switch (command) {
+       case GET_SUBBUF_NUM_SIZE:
+               result = get_subbuf_num_size(ch_inf->trace,
+                                            ch_inf->channel,
+                                            &num, &size);
+               if (result < 0) {
+                       reply_header->result = result;
+                       break;
                }
-               else if(!strcmp(recvbuf, "trace_start")) {
-                       DBG("trace start");
 
-                       result = ltt_trace_alloc(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_alloc failed");
-                               return (void *)1;
-                       }
-                       if(!result) {
-                               inform_consumer_daemon(trace_name);
-                       }
+               reply_msg->channel = USTCOMM_POISON_PTR;
+               reply_msg->subbuf_num = num;
+               reply_msg->subbuf_size = size;
 
-                       result = ltt_trace_start(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_start failed");
-                               continue;
-                       }
-               }
-               else if(!strcmp(recvbuf, "trace_stop")) {
-                       DBG("trace stop");
 
-                       result = ltt_trace_stop(trace_name);
-                       if(result < 0) {
-                               ERR("ltt_trace_stop failed");
-                               return (void *)1;
-                       }
+               reply_header->size = COMPUTE_MSG_SIZE(reply_msg, offset);
+
+               break;
+       case SET_SUBBUF_NUM:
+               reply_header->result = set_subbuf_num(ch_inf->trace,
+                                                     ch_inf->channel,
+                                                     ch_inf->subbuf_num);
+
+               break;
+       case SET_SUBBUF_SIZE:
+               reply_header->result = set_subbuf_size(ch_inf->trace,
+                                                      ch_inf->channel,
+                                                      ch_inf->subbuf_size);
+
+
+               break;
+       }
+       if (ustcomm_send(sock, reply_header, (char *)reply_msg) < 0) {
+               ERR("ustcomm_send failed");
+       }
+}
+
+static void process_buffer_cmd(int sock, int command,
+                              struct ustcomm_buffer_info *buf_inf)
+{
+       struct ustcomm_header _reply_header;
+       struct ustcomm_header *reply_header = &_reply_header;
+       struct ustcomm_buffer_info *reply_msg =
+               (struct ustcomm_buffer_info *)send_buffer;
+       int result, offset = 0, buf_shmid, buf_struct_shmid, buf_pipe_fd;
+       long consumed_old;
+
+       memset(reply_header, 0, sizeof(*reply_header));
+
+       switch (command) {
+       case GET_BUF_SHMID_PIPE_FD:
+               result = get_buffer_shmid_pipe_fd(buf_inf->trace,
+                                                 buf_inf->channel,
+                                                 buf_inf->ch_cpu,
+                                                 &buf_shmid,
+                                                 &buf_struct_shmid,
+                                                 &buf_pipe_fd);
+               if (result < 0) {
+                       reply_header->result = result;
+                       break;
                }
-               else if(!strcmp(recvbuf, "trace_destroy")) {
 
-                       DBG("trace destroy");
+               reply_msg->channel = USTCOMM_POISON_PTR;
+               reply_msg->buf_shmid = buf_shmid;
+               reply_msg->buf_struct_shmid = buf_struct_shmid;
 
-                       result = ltt_trace_destroy(trace_name, 0);
-                       if(result < 0) {
-                               ERR("ltt_trace_destroy failed");
-                               return (void *)1;
-                       }
+               reply_header->size = COMPUTE_MSG_SIZE(reply_msg, offset);
+               reply_header->fd_included = 1;
+
+               if (ustcomm_send_fd(sock, reply_header, (char *)reply_msg,
+                                   &buf_pipe_fd) < 0) {
+                       ERR("ustcomm_send failed");
                }
-               else if(nth_token_is(recvbuf, "get_shmid", 0) == 1) {
-                       do_cmd_get_shmid(recvbuf, &src);
+               return;
+
+       case NOTIFY_BUF_MAPPED:
+               reply_header->result =
+                       notify_buffer_mapped(buf_inf->trace,
+                                            buf_inf->channel,
+                                            buf_inf->ch_cpu);
+               break;
+       case GET_SUBBUFFER:
+               result = get_subbuffer(buf_inf->trace, buf_inf->channel,
+                                      buf_inf->ch_cpu, &consumed_old);
+               if (result < 0) {
+                       reply_header->result = result;
+                       break;
                }
-               else if(nth_token_is(recvbuf, "get_n_subbufs", 0) == 1) {
-                       do_cmd_get_n_subbufs(recvbuf, &src);
+
+               reply_msg->channel = USTCOMM_POISON_PTR;
+               reply_msg->consumed_old = consumed_old;
+
+               reply_header->size = COMPUTE_MSG_SIZE(reply_msg, offset);
+
+               break;
+       case PUT_SUBBUFFER:
+               result = put_subbuffer(buf_inf->trace, buf_inf->channel,
+                                      buf_inf->ch_cpu,
+                                      buf_inf->consumed_old);
+               reply_header->result = result;
+
+               break;
+       }
+
+       if (ustcomm_send(sock, reply_header, (char *)reply_msg) < 0) {
+               ERR("ustcomm_send failed");
+       }
+
+}
+
+static void process_ust_marker_cmd(int sock, int command,
+                              struct ustcomm_ust_marker_info *ust_marker_inf)
+{
+       struct ustcomm_header _reply_header;
+       struct ustcomm_header *reply_header = &_reply_header;
+       int result = 0;
+
+       memset(reply_header, 0, sizeof(*reply_header));
+
+       switch(command) {
+       case ENABLE_MARKER:
+
+               result = ltt_ust_marker_connect(ust_marker_inf->channel,
+                                           ust_marker_inf->ust_marker,
+                                           "default");
+               if (result < 0) {
+                       WARN("could not enable ust_marker; channel=%s,"
+                            " name=%s",
+                            ust_marker_inf->channel,
+                            ust_marker_inf->ust_marker);
+
                }
-               else if(nth_token_is(recvbuf, "get_subbuf_size", 0) == 1) {
-                       do_cmd_get_subbuf_size(recvbuf, &src);
+               break;
+       case DISABLE_MARKER:
+               result = ltt_ust_marker_disconnect(ust_marker_inf->channel,
+                                              ust_marker_inf->ust_marker,
+                                              "default");
+               if (result < 0) {
+                       WARN("could not disable ust_marker; channel=%s,"
+                            " name=%s",
+                            ust_marker_inf->channel,
+                            ust_marker_inf->ust_marker);
                }
-               else if(nth_token_is(recvbuf, "load_probe_lib", 0) == 1) {
-                       char *libfile;
+               break;
+       }
 
-                       libfile = nth_token(recvbuf, 1);
+       reply_header->result = result;
 
-                       DBG("load_probe_lib loading %s", libfile);
+       if (ustcomm_send(sock, reply_header, NULL) < 0) {
+               ERR("ustcomm_send failed");
+       }
 
-                       free(libfile);
+}
+static void process_client_cmd(struct ustcomm_header *recv_header,
+                              char *recv_buf, int sock)
+{
+       int result;
+       struct ustcomm_header _reply_header;
+       struct ustcomm_header *reply_header = &_reply_header;
+       char *send_buf = send_buffer;
+
+       memset(reply_header, 0, sizeof(*reply_header));
+       memset(send_buf, 0, sizeof(send_buffer));
+
+       switch(recv_header->command) {
+       case GET_SUBBUF_NUM_SIZE:
+       case SET_SUBBUF_NUM:
+       case SET_SUBBUF_SIZE:
+       {
+               struct ustcomm_channel_info *ch_inf;
+               ch_inf = (struct ustcomm_channel_info *)recv_buf;
+               result = ustcomm_unpack_channel_info(ch_inf);
+               if (result < 0) {
+                       ERR("couldn't unpack channel info");
+                       reply_header->result = -EINVAL;
+                       goto send_response;
                }
-               else if(nth_token_is(recvbuf, "get_subbuffer", 0) == 1) {
-                       do_cmd_get_subbuffer(recvbuf, &src);
+               process_channel_cmd(sock, recv_header->command, ch_inf);
+               return;
+       }
+       case GET_BUF_SHMID_PIPE_FD:
+       case NOTIFY_BUF_MAPPED:
+       case GET_SUBBUFFER:
+       case PUT_SUBBUFFER:
+       {
+               struct ustcomm_buffer_info *buf_inf;
+               buf_inf = (struct ustcomm_buffer_info *)recv_buf;
+               result = ustcomm_unpack_buffer_info(buf_inf);
+               if (result < 0) {
+                       ERR("couldn't unpack buffer info");
+                       reply_header->result = -EINVAL;
+                       goto send_response;
                }
-               else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) {
-                       do_cmd_put_subbuffer(recvbuf, &src);
+               process_buffer_cmd(sock, recv_header->command, buf_inf);
+               return;
+       }
+       case ENABLE_MARKER:
+       case DISABLE_MARKER:
+       {
+               struct ustcomm_ust_marker_info *ust_marker_inf;
+               ust_marker_inf = (struct ustcomm_ust_marker_info *)recv_buf;
+               result = ustcomm_unpack_ust_marker_info(ust_marker_inf);
+               if (result < 0) {
+                       ERR("couldn't unpack ust_marker info");
+                       reply_header->result = -EINVAL;
+                       goto send_response;
                }
-               else if(nth_token_is(recvbuf, "set_subbuf_size", 0) == 1) {
-                       do_cmd_set_subbuf_size(recvbuf, &src);
+               process_ust_marker_cmd(sock, recv_header->command, ust_marker_inf);
+               return;
+       }
+       case LIST_MARKERS:
+       {
+               char *ptr;
+               size_t size;
+               FILE *fp;
+
+               fp = open_memstream(&ptr, &size);
+               if (fp == NULL) {
+                       ERR("opening memstream failed");
+                       return;
+               }
+               print_ust_marker(fp);
+               fclose(fp);
+
+               reply_header->size = size + 1;  /* Include final \0 */
+
+               result = ustcomm_send(sock, reply_header, ptr);
+
+               free(ptr);
+
+               if (result < 0) {
+                       PERROR("failed to send ust_marker list");
                }
-               else if(nth_token_is(recvbuf, "set_subbuf_num", 0) == 1) {
-                       do_cmd_set_subbuf_num(recvbuf, &src);
+
+               break;
+       }
+       case LIST_TRACE_EVENTS:
+       {
+               char *ptr;
+               size_t size;
+               FILE *fp;
+
+               fp = open_memstream(&ptr, &size);
+               if (fp == NULL) {
+                       ERR("opening memstream failed");
+                       return;
                }
-               else if(nth_token_is(recvbuf, "enable_marker", 0) == 1) {
-                       char *channel_slash_name = nth_token(recvbuf, 1);
-                       char channel_name[256]="";
-                       char marker_name[256]="";
+               print_trace_events(fp);
+               fclose(fp);
 
-                       result = sscanf(channel_slash_name, "%255[^/]/%255s", channel_name, marker_name);
+               reply_header->size = size + 1;  /* Include final \0 */
 
-                       if(channel_name == NULL || marker_name == NULL) {
-                               WARN("invalid marker name");
-                               goto next_cmd;
-                       }
+               result = ustcomm_send(sock, reply_header, ptr);
 
-                       result = ltt_marker_connect(channel_name, marker_name, "default");
-                       if(result < 0) {
-                               WARN("could not enable marker; channel=%s, name=%s", channel_name, marker_name);
-                       }
+               free(ptr);
+
+               if (result < 0) {
+                       ERR("list_trace_events failed");
+                       return;
                }
-               else if(nth_token_is(recvbuf, "disable_marker", 0) == 1) {
-                       char *channel_slash_name = nth_token(recvbuf, 1);
-                       char *marker_name;
-                       char *channel_name;
 
-                       result = sscanf(channel_slash_name, "%a[^/]/%as", &channel_name, &marker_name);
+               break;
+       }
+       case LOAD_PROBE_LIB:
+       {
+               char *libfile;
 
-                       if(marker_name == NULL) {
-                       }
+               /* FIXME: No functionality at all... */
+               libfile = recv_buf;
 
-                       result = ltt_marker_disconnect(channel_name, marker_name, "default");
-                       if(result < 0) {
-                               WARN("could not disable marker; channel=%s, name=%s", channel_name, marker_name);
-                       }
+               DBG("load_probe_lib loading %s", libfile);
+
+               break;
+       }
+       case GET_PIDUNIQUE:
+       {
+               struct ustcomm_pidunique *pid_msg;
+               pid_msg = (struct ustcomm_pidunique *)send_buf;
+
+               pid_msg->pidunique = pidunique;
+               reply_header->size = sizeof(pid_msg);
+
+               goto send_response;
+
+       }
+       case GET_SOCK_PATH:
+       {
+               struct ustcomm_single_field *sock_msg;
+               char *sock_path_env;
+
+               sock_msg = (struct ustcomm_single_field *)send_buf;
+
+               sock_path_env = getenv("UST_DAEMON_SOCKET");
+
+               if (!sock_path_env) {
+                       result = ustcomm_pack_single_field(reply_header,
+                                                          sock_msg,
+                                                          SOCK_DIR "/ustconsumer");
+
+               } else {
+                       result = ustcomm_pack_single_field(reply_header,
+                                                          sock_msg,
+                                                          sock_path_env);
                }
-               else if(nth_token_is(recvbuf, "get_pidunique", 0) == 1) {
-                       char *reply;
+               reply_header->result = result;
 
-                       asprintf(&reply, "%lld", pidunique);
+               goto send_response;
+       }
+       case SET_SOCK_PATH:
+       {
+               struct ustcomm_single_field *sock_msg;
+               sock_msg = (struct ustcomm_single_field *)recv_buf;
+               result = ustcomm_unpack_single_field(sock_msg);
+               if (result < 0) {
+                       reply_header->result = -EINVAL;
+                       goto send_response;
+               }
 
-                       result = ustcomm_send_reply(&ustcomm_app.server, reply, &src);
-                       if(result) {
-                               ERR("listener: get_pidunique: ustcomm_send_reply failed");
-                               goto next_cmd;
-                       }
+               reply_header->result = setenv("UST_DAEMON_SOCKET",
+                                             sock_msg->field, 1);
 
-                       free(reply);
+               goto send_response;
+       }
+       case START:
+       case SETUP_TRACE:
+       case ALLOC_TRACE:
+       case CREATE_TRACE:
+       case START_TRACE:
+       case STOP_TRACE:
+       case DESTROY_TRACE:
+       case FORCE_SUBBUF_SWITCH:
+       {
+               struct ustcomm_single_field *trace_inf =
+                       (struct ustcomm_single_field *)recv_buf;
+
+               result = ustcomm_unpack_single_field(trace_inf);
+               if (result < 0) {
+                       ERR("couldn't unpack trace info");
+                       reply_header->result = -EINVAL;
+                       goto send_response;
                }
-//             else if(nth_token_is(recvbuf, "get_notifications", 0) == 1) {
-//                     struct ust_trace *trace;
-//                     char trace_name[] = "auto";
-//                     int i;
-//                     char *channel_name;
-//
-//                     DBG("get_notifications");
-//
-//                     channel_name = strdup_malloc(nth_token(recvbuf, 1));
-//                     if(channel_name == NULL) {
-//                             ERR("put_subbuf_size: cannot parse channel");
-//                             goto next_cmd;
-//                     }
-//
-//                     ltt_lock_traces();
-//                     trace = _ltt_trace_find(trace_name);
-//                     ltt_unlock_traces();
-//
-//                     if(trace == NULL) {
-//                             ERR("cannot find trace!");
-//                             return (void *)1;
-//                     }
-//
-//                     for(i=0; i<trace->nr_channels; i++) {
-//                             struct rchan *rchan = trace->channels[i].trans_channel_data;
-//                             int fd;
-//
-//                             if(!strcmp(trace->channels[i].channel_name, channel_name)) {
-//                                     struct rchan_buf *rbuf = rchan->buf;
-//                                     struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf;
-//
-//                                     result = fd = ustcomm_app_detach_client(&ustcomm_app, &src);
-//                                     if(result == -1) {
-//                                             ERR("ustcomm_app_detach_client failed");
-//                                             goto next_cmd;
-//                                     }
-//
-//                                     lttbuf->wake_consumer_arg = (void *) fd;
-//
-//                                     smp_wmb();
-//
-//                                     lttbuf->call_wake_consumer = 1;
-//
-//                                     break;
-//                             }
-//                     }
-//
-//                     free(channel_name);
-//             }
-               else {
-                       ERR("unable to parse message: %s", recvbuf);
+
+               reply_header->result =
+                       process_trace_cmd(recv_header->command,
+                                         trace_inf->field);
+               goto send_response;
+
+       }
+       default:
+               reply_header->result = -EINVAL;
+
+               goto send_response;
+       }
+
+       return;
+
+send_response:
+       ustcomm_send(sock, reply_header, send_buf);
+}
+
+#define MAX_EVENTS 10
+
+void *listener_main(void *p)
+{
+       struct ustcomm_sock *epoll_sock;
+       struct epoll_event events[MAX_EVENTS];
+       struct sockaddr addr;
+       int accept_fd, nfds, result, i, addr_size;
+
+       DBG("LISTENER");
+
+       pthread_cleanup_push(listener_cleanup, NULL);
+
+       for(;;) {
+               nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+               if (nfds == -1) {
+                       PERROR("listener_main: epoll_wait failed");
+                       continue;
                }
 
-       next_cmd:
-               free(recvbuf);
+               for (i = 0; i < nfds; i++) {
+                       pthread_mutex_lock(&listener_thread_data_mutex);
+                       pthread_cleanup_push(release_listener_mutex, NULL);
+                       epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
+                       if (epoll_sock == listen_sock) {
+                               addr_size = sizeof(struct sockaddr);
+                               accept_fd = accept(epoll_sock->fd,
+                                                  &addr,
+                                                  (socklen_t *)&addr_size);
+                               if (accept_fd == -1) {
+                                       PERROR("listener_main: accept failed");
+                                       continue;
+                               }
+                               ustcomm_init_sock(accept_fd, epoll_fd,
+                                                &ust_socks);
+                       } else {
+                               memset(receive_header, 0,
+                                      sizeof(*receive_header));
+                               memset(receive_buffer, 0,
+                                      sizeof(receive_buffer));
+                               result = ustcomm_recv(epoll_sock->fd,
+                                                     receive_header,
+                                                     receive_buffer);
+                               if (result == 0) {
+                                       ustcomm_del_sock(epoll_sock, 0);
+                               } else {
+                                       process_client_cmd(receive_header,
+                                                          receive_buffer,
+                                                          epoll_sock->fd);
+                               }
+                       }
+                       pthread_cleanup_pop(1); /* release listener mutex */
+               }
        }
+
+       pthread_cleanup_pop(1);
 }
 
-volatile sig_atomic_t have_listener = 0;
+/* These should only be accessed in the parent thread,
+ * not the listener.
+ */
+static volatile sig_atomic_t have_listener = 0;
+static pthread_t listener_thread;
 
 void create_listener(void)
 {
-#ifdef USE_CLONE
-       static char listener_stack[16384];
        int result;
-#else
-       pthread_t thread;
-#endif
+       sigset_t sig_all_blocked;
+       sigset_t orig_parent_mask;
 
-       if(have_listener) {
+       if (have_listener) {
                WARN("not creating listener because we already had one");
                return;
        }
 
-#ifdef USE_CLONE
-       result = clone((int (*)(void *)) listener_main, listener_stack+sizeof(listener_stack)-1, CLONE_FS | CLONE_FILES | CLONE_VM | CLONE_SIGHAND | CLONE_THREAD, NULL);
-       if(result == -1) {
-               perror("clone");
-               return;
-       }
-#else
+       /* A new thread created by pthread_create inherits the signal mask
+        * from the parent. To avoid any signal being received by the
+        * listener thread, we block all signals temporarily in the parent,
+        * while we create the listener thread.
+        */
 
-       pthread_create(&thread, NULL, listener_main, NULL);
-#endif
+       sigfillset(&sig_all_blocked);
 
-       have_listener = 1;
-}
+       result = pthread_sigmask(SIG_SETMASK, &sig_all_blocked, &orig_parent_mask);
+       if (result) {
+               PERROR("pthread_sigmask: %s", strerror(result));
+       }
 
-static int init_socket(void)
-{
-       return ustcomm_init_app(getpid(), &ustcomm_app);
+       result = pthread_create(&listener_thread, NULL, listener_main, NULL);
+       if (result == -1) {
+               PERROR("pthread_create");
+       }
+
+       /* Restore original signal mask in parent */
+       result = pthread_sigmask(SIG_SETMASK, &orig_parent_mask, NULL);
+       if (result) {
+               PERROR("pthread_sigmask: %s", strerror(result));
+       } else {
+               have_listener = 1;
+       }
 }
 
 #define AUTOPROBE_DISABLED      0
@@ -1149,20 +1191,19 @@ static int init_socket(void)
 static int autoprobe_method = AUTOPROBE_DISABLED;
 static regex_t autoprobe_regex;
 
-static void auto_probe_connect(struct marker *m)
+static void auto_probe_connect(struct ust_marker *m)
 {
        int result;
 
        char* concat_name = NULL;
        const char *probe_name = "default";
 
-       if(autoprobe_method == AUTOPROBE_DISABLED) {
+       if (autoprobe_method == AUTOPROBE_DISABLED) {
                return;
-       }
-       else if(autoprobe_method == AUTOPROBE_ENABLE_REGEX) {
+       } else if (autoprobe_method == AUTOPROBE_ENABLE_REGEX) {
                result = asprintf(&concat_name, "%s/%s", m->channel, m->name);
-               if(result == -1) {
-                       ERR("auto_probe_connect: asprintf failed (marker %s/%s)",
+               if (result == -1) {
+                       ERR("auto_probe_connect: asprintf failed (ust_marker %s/%s)",
                                m->channel, m->name);
                        return;
                }
@@ -1173,58 +1214,120 @@ static void auto_probe_connect(struct marker *m)
                free(concat_name);
        }
 
-       result = ltt_marker_connect(m->channel, m->name, probe_name);
-       if(result && result != -EEXIST)
-               ERR("ltt_marker_connect (marker = %s/%s, errno = %d)", m->channel, m->name, -result);
+       result = ltt_ust_marker_connect(m->channel, m->name, probe_name);
+       if (result && result != -EEXIST)
+               ERR("ltt_ust_marker_connect (ust_marker = %s/%s, errno = %d)", m->channel, m->name, -result);
+
+       DBG("auto connected ust_marker %s (addr: %p) %s to probe default", m->channel, m, m->name);
+
+}
+
+static struct ustcomm_sock * init_app_socket(int epoll_fd)
+{
+       char *dir_name, *sock_name;
+       int result;
+       struct ustcomm_sock *sock = NULL;
+       time_t mtime;
+
+       dir_name = ustcomm_user_sock_dir();
+       if (!dir_name)
+               return NULL;
+
+       mtime = ustcomm_pid_st_mtime(getpid());
+       if (!mtime) {
+               goto free_dir_name;
+       }
+
+       result = asprintf(&sock_name, "%s/%d.%ld", dir_name,
+                         (int) getpid(), (long) mtime);
+       if (result < 0) {
+               ERR("string overflow allocating socket name, "
+                   "UST thread bailing");
+               goto free_dir_name;
+       }
+
+       result = ensure_dir_exists(dir_name, S_IRWXU);
+       if (result == -1) {
+               ERR("Unable to create socket directory %s, UST thread bailing",
+                   dir_name);
+               goto free_sock_name;
+       }
+
+       sock = ustcomm_init_named_socket(sock_name, epoll_fd);
+       if (!sock) {
+               ERR("Error initializing named socket (%s). Check that directory"
+                   "exists and that it is writable. UST thread bailing", sock_name);
+               goto free_sock_name;
+       }
 
-       DBG("auto connected marker %s (addr: %p) %s to probe default", m->channel, m, m->name);
+free_sock_name:
+       free(sock_name);
+free_dir_name:
+       free(dir_name);
 
+       return sock;
 }
 
 static void __attribute__((constructor)) init()
 {
+       struct timespec ts;
        int result;
        char* autoprobe_val = NULL;
+       char* subbuffer_size_val = NULL;
+       char* subbuffer_count_val = NULL;
+       unsigned int subbuffer_size;
+       unsigned int subbuffer_count;
+       unsigned int power;
 
        /* Assign the pidunique, to be able to differentiate the processes with same
         * pid, (before and after an exec).
         */
        pidunique = make_pidunique();
-
-       /* Initialize RCU in case the constructor order is not good. */
-       rcu_init();
-
-       /* It is important to do this before events start to be generated. */
-       ust_register_thread();
+       processpid = getpid();
 
        DBG("Tracectl constructor");
 
-       /* Must create socket before signal handler to prevent races.
-         */
-       result = init_socket();
-       if(result == -1) {
-               ERR("init_socket error");
+       /* Set up epoll */
+       epoll_fd = epoll_create(MAX_EVENTS);
+       if (epoll_fd == -1) {
+               ERR("epoll_create failed, tracing shutting down");
+               return;
+       }
+
+       /* Create the socket */
+       listen_sock = init_app_socket(epoll_fd);
+       if (!listen_sock) {
+               ERR("failed to create application socket,"
+                   " tracing shutting down");
                return;
        }
 
        create_listener();
 
-       autoprobe_val = getenv("UST_AUTOPROBE");
-       if(autoprobe_val) {
-               struct marker_iter iter;
+       /* Get clock the clock source type */
 
-               DBG("Autoprobe enabled.");
+       /* Default clock source */
+       ust_clock_source = CLOCK_TRACE;
+       if (clock_gettime(ust_clock_source, &ts) != 0) {
+               ust_clock_source = CLOCK_MONOTONIC;
+               DBG("UST traces will not be synchronized with LTTng traces");
+       }
 
-               /* Ensure markers are initialized */
-               //init_markers();
+       if (getenv("UST_TRACE") || getenv("UST_AUTOPROBE")) {
+               /* Ensure ust_marker control is initialized */
+               init_ust_marker_control();
+       }
 
-               /* Ensure marker control is initialized, for the probe */
-               init_marker_control();
+       autoprobe_val = getenv("UST_AUTOPROBE");
+       if (autoprobe_val) {
+               struct ust_marker_iter iter;
+
+               DBG("Autoprobe enabled.");
 
                /* first, set the callback that will connect the
-                * probe on new markers
+                * probe on new ust_marker
                 */
-               if(autoprobe_val[0] == '/') {
+               if (autoprobe_val[0] == '/') {
                        result = regcomp(&autoprobe_regex, autoprobe_val+1, 0);
                        if (result) {
                                char regexerr[150];
@@ -1232,49 +1335,77 @@ static void __attribute__((constructor)) init()
                                regerror(result, &autoprobe_regex, regexerr, sizeof(regexerr));
                                ERR("cannot parse regex %s (%s), will ignore UST_AUTOPROBE", autoprobe_val, regexerr);
                                /* don't crash the application just for this */
-                       }
-                       else {
+                       } else {
                                autoprobe_method = AUTOPROBE_ENABLE_REGEX;
                        }
-               }
-               else {
+               } else {
                        /* just enable all instrumentation */
                        autoprobe_method = AUTOPROBE_ENABLE_ALL;
                }
 
-               marker_set_new_marker_cb(auto_probe_connect);
+               ust_marker_set_new_ust_marker_cb(auto_probe_connect);
 
                /* Now, connect the probes that were already registered. */
-               marker_iter_reset(&iter);
-               marker_iter_start(&iter);
-
-               DBG("now iterating on markers already registered");
-               while(iter.marker) {
-                       DBG("now iterating on marker %s", iter.marker->name);
-                       auto_probe_connect(iter.marker);
-                       marker_iter_next(&iter);
+               ust_marker_iter_reset(&iter);
+               ust_marker_iter_start(&iter);
+
+               DBG("now iterating on ust_marker already registered");
+               while (iter.ust_marker) {
+                       DBG("now iterating on ust_marker %s", (*iter.ust_marker)->name);
+                       auto_probe_connect(*iter.ust_marker);
+                       ust_marker_iter_next(&iter);
                }
+               ust_marker_iter_stop(&iter);
        }
 
-       if(getenv("UST_TRACE")) {
+       if (getenv("UST_OVERWRITE")) {
+               int val = atoi(getenv("UST_OVERWRITE"));
+               if (val == 0 || val == 1) {
+                       CMM_STORE_SHARED(ust_channels_overwrite_by_default, val);
+               } else {
+                       WARN("invalid value for UST_OVERWRITE");
+               }
+       }
+
+       if (getenv("UST_AUTOCOLLECT")) {
+               int val = atoi(getenv("UST_AUTOCOLLECT"));
+               if (val == 0 || val == 1) {
+                       CMM_STORE_SHARED(ust_channels_request_collection_by_default, val);
+               } else {
+                       WARN("invalid value for UST_AUTOCOLLECT");
+               }
+       }
+
+       subbuffer_size_val = getenv("UST_SUBBUF_SIZE");
+       if (subbuffer_size_val) {
+               sscanf(subbuffer_size_val, "%u", &subbuffer_size);
+               power = pow2_higher_or_eq(subbuffer_size);
+               if (power != subbuffer_size)
+                       WARN("using the next power of two for buffer size = %u\n", power);
+               chan_infos[LTT_CHANNEL_UST].def_subbufsize = power;
+       }
+
+       subbuffer_count_val = getenv("UST_SUBBUF_NUM");
+       if (subbuffer_count_val) {
+               sscanf(subbuffer_count_val, "%u", &subbuffer_count);
+               if (subbuffer_count < 2)
+                       subbuffer_count = 2;
+               chan_infos[LTT_CHANNEL_UST].def_subbufcount = subbuffer_count;
+       }
+
+       if (getenv("UST_TRACE")) {
                char trace_name[] = "auto";
                char trace_type[] = "ustrelay";
 
                DBG("starting early tracing");
 
-               /* Ensure marker control is initialized */
-               init_marker_control();
-
-               /* Ensure markers are initialized */
-               init_markers();
-
                /* Ensure buffers are initialized, for the transport to be available.
                 * We are about to set a trace type and it will fail without this.
                 */
                init_ustrelay_transport();
 
                /* FIXME: When starting early tracing (here), depending on the
-                * order of constructors, it is very well possible some marker
+                * order of constructors, it is very well possible some ust_marker
                 * sections are not yet registered. Because of this, some
                 * channels may not be registered. Yet, we are about to ask the
                 * daemon to collect the channels. Channels which are not yet
@@ -1294,25 +1425,25 @@ static void __attribute__((constructor)) init()
                ltt_channels_register("ust");
 
                result = ltt_trace_setup(trace_name);
-               if(result < 0) {
+               if (result < 0) {
                        ERR("ltt_trace_setup failed");
                        return;
                }
 
                result = ltt_trace_set_type(trace_name, trace_type);
-               if(result < 0) {
+               if (result < 0) {
                        ERR("ltt_trace_set_type failed");
                        return;
                }
 
                result = ltt_trace_alloc(trace_name);
-               if(result < 0) {
+               if (result < 0) {
                        ERR("ltt_trace_alloc failed");
                        return;
                }
 
                result = ltt_trace_start(trace_name);
-               if(result < 0) {
+               if (result < 0) {
                        ERR("ltt_trace_start failed");
                        return;
                }
@@ -1322,7 +1453,6 @@ static void __attribute__((constructor)) init()
                inform_consumer_daemon(trace_name);
        }
 
-
        return;
 
        /* should decrementally destroy stuff if error */
@@ -1349,12 +1479,12 @@ static void destroy_traces(void)
        DBG("destructor stopping traces");
 
        result = ltt_trace_stop("auto");
-       if(result == -1) {
+       if (result == -1) {
                ERR("ltt_trace_stop error");
        }
 
        result = ltt_trace_destroy("auto", 0);
-       if(result == -1) {
+       if (result == -1) {
                ERR("ltt_trace_destroy error");
        }
 }
@@ -1366,8 +1496,8 @@ static int trace_recording(void)
 
        ltt_lock_traces();
 
-       list_for_each_entry(trace, &ltt_traces.head, list) {
-               if(trace->active) {
+       cds_list_for_each_entry(trace, &ltt_traces.head, list) {
+               if (trace->active) {
                        retval = 1;
                        break;
                }
@@ -1378,30 +1508,40 @@ static int trace_recording(void)
        return retval;
 }
 
-#if 0
-static int have_consumer(void)
+int restarting_usleep(useconds_t usecs)
 {
-       return !list_empty(&blocked_consumers);
+        struct timespec tv;
+        int result;
+
+        tv.tv_sec = 0;
+        tv.tv_nsec = usecs * 1000;
+
+        do {
+                result = nanosleep(&tv, &tv);
+        } while (result == -1 && errno == EINTR);
+
+       return result;
 }
-#endif
 
-int restarting_usleep(useconds_t usecs)
+static void stop_listener(void)
 {
-        struct timespec tv; 
-        int result; 
-        tv.tv_sec = 0; 
-        tv.tv_nsec = usecs * 1000; 
-        do { 
-                result = nanosleep(&tv, &tv); 
-        } while(result == -1 && errno == EINTR); 
+       int result;
 
-       return result;
+       if (!have_listener)
+               return;
+
+       result = pthread_cancel(listener_thread);
+       if (result != 0) {
+               ERR("pthread_cancel: %s", strerror(result));
+       }
+       result = pthread_join(listener_thread, NULL);
+       if (result != 0) {
+               ERR("pthread_join: %s", strerror(result));
+       }
 }
 
 /* This destructor keeps the process alive for a few seconds in order
- * to leave time to ustd to connect to its buffers. This is necessary
+ * to leave time for ustconsumer to connect to its buffers. This is necessary
  * for programs whose execution is very short. It is also useful in all
  * programs when tracing is started close to the end of the program
  * execution.
@@ -1412,15 +1552,19 @@ int restarting_usleep(useconds_t usecs)
 
 static void __attribute__((destructor)) keepalive()
 {
-       if(trace_recording() && buffers_to_export) {
+       if (processpid != getpid()) {
+               return;
+       }
+
+       if (trace_recording() && CMM_LOAD_SHARED(buffers_to_export)) {
                int total = 0;
                DBG("Keeping process alive for consumer daemon...");
-               while(buffers_to_export) {
+               while (CMM_LOAD_SHARED(buffers_to_export)) {
                        const int interv = 200000;
                        restarting_usleep(interv);
                        total += interv;
 
-                       if(total >= 3000000) {
+                       if (total >= 3000000) {
                                WARN("non-consumed buffers remaining after wait limit; not waiting anymore");
                                break;
                        }
@@ -1430,12 +1574,13 @@ static void __attribute__((destructor)) keepalive()
 
        destroy_traces();
 
-       ustcomm_fini_app(&ustcomm_app);
+       /* Ask the listener to stop and clean up. */
+       stop_listener();
 }
 
 void ust_potential_exec(void)
 {
-       trace_mark(ust, potential_exec, MARK_NOARGS);
+       ust_marker(potential_exec, UST_MARKER_NOARGS);
 
        DBG("test");
 
@@ -1454,39 +1599,66 @@ void ust_potential_exec(void)
 
 static void ust_fork(void)
 {
-       struct blocked_consumer *bc;
-       struct blocked_consumer *deletable_bc = NULL;
+       struct ustcomm_sock *sock, *sock_tmp;
+       struct ust_trace *trace, *trace_tmp;
        int result;
 
        /* FIXME: technically, the locks could have been taken before the fork */
        DBG("ust: forking");
 
-       /* break lock if necessary */
-       ltt_unlock_traces();
+       /* Get the pid of the new process */
+       processpid = getpid();
+
+       /*
+        * FIXME: This could be prettier, we loop over the list twice and
+        * following good locking practice should lock around the loop
+        */
+       cds_list_for_each_entry_safe(trace, trace_tmp, &ltt_traces.head, list) {
+               ltt_trace_stop(trace->trace_name);
+       }
 
-       ltt_trace_stop("auto");
-       ltt_trace_destroy("auto", 1);
-       /* Delete all active connections */
-       ustcomm_close_all_connections(&ustcomm_app.server);
+       /* Delete all active connections, but leave them in the epoll set */
+       cds_list_for_each_entry_safe(sock, sock_tmp, &ust_socks, list) {
+               ustcomm_del_sock(sock, 1);
+       }
 
-       /* Delete all blocked consumers */
-       list_for_each_entry(bc, &blocked_consumers, list) {
-               close(bc->fd_producer);
-               close(bc->fd_consumer);
-               free(deletable_bc);
-               deletable_bc = bc;
-               list_del(&bc->list);
+       /*
+        * FIXME: This could be prettier, we loop over the list twice and
+        * following good locking practice should lock around the loop
+        */
+       cds_list_for_each_entry_safe(trace, trace_tmp, &ltt_traces.head, list) {
+               ltt_trace_destroy(trace->trace_name, 1);
        }
 
-       ustcomm_free_app(&ustcomm_app);
+       /* Clean up the listener socket and epoll, keeping the socket file */
+       if (listen_sock) {
+               ustcomm_del_named_sock(listen_sock, 1);
+               listen_sock = NULL;
+       }
+       close(epoll_fd);
 
-       buffers_to_export = 0;
+       /* Re-start the launch sequence */
+       CMM_STORE_SHARED(buffers_to_export, 0);
        have_listener = 0;
-       init_socket();
+
+       /* Set up epoll */
+       epoll_fd = epoll_create(MAX_EVENTS);
+       if (epoll_fd == -1) {
+               ERR("epoll_create failed, tracing shutting down");
+               return;
+       }
+
+       /* Create the socket */
+       listen_sock = init_app_socket(epoll_fd);
+       if (!listen_sock) {
+               ERR("failed to create application socket,"
+                   " tracing shutting down");
+               return;
+       }
        create_listener();
        ltt_trace_setup("auto");
        result = ltt_trace_set_type("auto", "ustrelay");
-       if(result < 0) {
+       if (result < 0) {
                ERR("ltt_trace_set_type failed");
                return;
        }
@@ -1513,10 +1685,21 @@ void ust_before_fork(ust_fork_info_t *fork_info)
         /* Disable signals */
         sigfillset(&all_sigs);
         result = sigprocmask(SIG_BLOCK, &all_sigs, &fork_info->orig_sigs);
-        if(result == -1) {
+        if (result == -1) {
                 PERROR("sigprocmask");
                 return;
         }
+
+       /*
+        * Take the fork lock to make sure we are not in the middle of
+        * something in the listener thread.
+        */
+       pthread_mutex_lock(&listener_thread_data_mutex);
+       /*
+        * Hold listen_sock_mutex to protect from listen_sock teardown.
+        */
+       pthread_mutex_lock(&listen_sock_mutex);
+       rcu_bp_before_fork();
 }
 
 /* Don't call this function directly in a traced program */
@@ -1524,9 +1707,11 @@ static void ust_after_fork_common(ust_fork_info_t *fork_info)
 {
        int result;
 
+       pthread_mutex_unlock(&listen_sock_mutex);
+       pthread_mutex_unlock(&listener_thread_data_mutex);
         /* Restore signals */
         result = sigprocmask(SIG_SETMASK, &fork_info->orig_sigs, NULL);
-        if(result == -1) {
+        if (result == -1) {
                 PERROR("sigprocmask");
                 return;
         }
@@ -1534,16 +1719,20 @@ static void ust_after_fork_common(ust_fork_info_t *fork_info)
 
 void ust_after_fork_parent(ust_fork_info_t *fork_info)
 {
-       /* Reenable signals */
+       rcu_bp_after_fork_parent();
+       /* Release mutexes and reenable signals */
        ust_after_fork_common(fork_info);
 }
 
 void ust_after_fork_child(ust_fork_info_t *fork_info)
 {
-       /* First sanitize the child */
+       /* Release urcu mutexes */
+       rcu_bp_after_fork_child();
+
+       /* Sanitize the child */
        ust_fork();
 
-       /* Then reenable interrupts */
+       /* Release mutexes and reenable signals */
        ust_after_fork_common(fork_info);
 }
 
This page took 0.052548 seconds and 4 git commands to generate.