ust: first try at blocking support for consumer
authorPierre-Marc Fournier <pierre-marc.fournier@polymtl.ca>
Mon, 2 Mar 2009 19:27:44 +0000 (14:27 -0500)
committerPierre-Marc Fournier <pierre-marc.fournier@polymtl.ca>
Mon, 2 Mar 2009 19:27:44 +0000 (14:27 -0500)
libtracectl/tracectl.c
libtracing/relay.c
libtracing/relay.h
libustcomm/ustcomm.c
share/kernelcompat.h

index c407be97c13f22f4e99f1a36a7a508924eda5ef4..3732bc03714ae585297dd3b933e86a912b3c7a29 100644 (file)
 #include "tracer.h"
 #include "localerr.h"
 #include "ustcomm.h"
+#include "relay.h" /* FIXME: remove */
 
 //#define USE_CLONE
 
-#define UNIX_PATH_MAX 108
-
-#define SOCKETDIR "/tmp/socks"
-#define SOCKETDIRLEN sizeof(SOCKETDIR)
 #define USTSIGNAL SIGIO
 
 #define MAX_MSG_SIZE (100)
@@ -47,9 +44,6 @@ struct trctl_msg {
        char payload[94];
 };
 
-char mysocketfile[UNIX_PATH_MAX] = "";
-//int pfd = -1;
-
 struct consumer_channel {
        int fd;
        struct ltt_channel_struct *chan;
@@ -184,8 +178,6 @@ void notif_cb(void)
        }
 }
 
-#define CONSUMER_DAEMON_SOCK SOCKETDIR "/ustd"
-
 static int inform_consumer_daemon(void)
 {
        ustcomm_request_consumer(getpid(), "metadata");
@@ -536,6 +528,55 @@ int listener_main(void *p)
                        free(channel_name);
                        free(consumed_old_str);
                }
+               else if(nth_token_is(recvbuf, "get_notifications", 0) == 1) {
+                       struct ltt_trace_struct *trace;
+                       char trace_name[] = "auto";
+                       int i;
+                       char *channel_name;
+
+                       DBG("get_notifications");
+
+                       channel_name = strdup_malloc(nth_token(recvbuf, 1));
+                       if(channel_name == NULL) {
+                               ERR("put_subbuf_size: cannot parse channel");
+                               goto next_cmd;
+                       }
+
+                       ltt_lock_traces();
+                       trace = _ltt_trace_find(trace_name);
+                       ltt_unlock_traces();
+
+                       if(trace == NULL) {
+                               CPRINTF("cannot find trace!");
+                               return 1;
+                       }
+
+                       for(i=0; i<trace->nr_channels; i++) {
+                               struct rchan *rchan = trace->channels[i].trans_channel_data;
+                               int fd;
+
+                               if(!strcmp(trace->channels[i].channel_name, channel_name)) {
+                                       struct rchan_buf *rbuf = rchan->buf;
+                                       struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf;
+
+                                       result = fd = ustcomm_app_detach_client(&ustcomm_app, &src);
+                                       if(result == -1) {
+                                               ERR("ustcomm_app_detach_client failed");
+                                               goto next_cmd;
+                                       }
+
+                                       lttbuf->wake_consumer_arg = (void *) fd;
+
+                                       smp_wmb();
+
+                                       lttbuf->call_wake_consumer = 1;
+
+                                       break;
+                               }
+                       }
+
+                       free(channel_name);
+               }
                else {
                        ERR("unable to parse message: %s", recvbuf);
                }
@@ -593,15 +634,15 @@ static int init_socket(void)
 
 static void destroy_socket(void)
 {
-       int result;
-
-       if(mysocketfile[0] == '\0')
-               return;
-
-       result = unlink(mysocketfile);
-       if(result == -1) {
-               PERROR("unlink");
-       }
+//     int result;
+//
+//     if(mysocketfile[0] == '\0')
+//             return;
+//
+//     result = unlink(mysocketfile);
+//     if(result == -1) {
+//             PERROR("unlink");
+//     }
 }
 
 static int init_signal_handler(void)
@@ -646,12 +687,39 @@ static void auto_probe_connect(struct marker *m)
        DBG("just auto connected marker %s %s to probe default", m->channel, m->name);
 }
 
+/* Wake the consumer of a buffer 
+ *
+ * wake_consumer_cb is called in tracing context so it must haste.
+ *
+ * FIXME: don't do a system call here; maybe schedule work to be done
+ * in listener context? Once this is done in listener context, we can
+ * check for the return value of send_message_fd and remove the fd if necessary
+ *
+ * @arg: the buffer
+ * @finished: 0: subbuffer full; 1: buffer being destroyed
+ */
+
+static void wake_consumer_cb(void *arg, int finished)
+{
+       struct ltt_channel_buf_struct *ltt_buf = (struct ltt_channel_buf_struct *) arg;
+       int fd = (int)ACCESS_ONCE(arg);
+
+       if(!finished) {
+               send_message_fd(fd, "consume", NULL);
+       }
+       else {
+               send_message_fd(fd, "destroyed", NULL);
+       }
+}
+
 static void __attribute__((constructor(101))) init0()
 {
        DBG("UST_AUTOPROBE constructor");
        if(getenv("UST_AUTOPROBE")) {
                marker_set_new_marker_cb(auto_probe_connect);
        }
+
+       relay_set_wake_consumer(wake_consumer_cb);
 }
 
 static void fini(void);
index a8775ec77d2554e30bee693a1c6a102c49ec91fd..0133f3e3bb5cc8ae57073fd6b6d541b96c1cfc24 100644 (file)
@@ -873,6 +873,19 @@ static notrace void ltt_buffer_end_callback(struct rchan_buf *buf,
 
 }
 
+void (*wake_consumer)(void *, int) = NULL;
+
+void relay_set_wake_consumer(void (*wake)(void *, int))
+{
+       wake_consumer = wake;
+}
+
+void relay_wake_consumer(void *arg, int finished)
+{
+       if(wake_consumer)
+               wake_consumer(arg, finished);
+}
+
 static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx,
                void *subbuf)
 {
@@ -880,7 +893,9 @@ static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx,
                (struct ltt_channel_struct *)buf->chan->private_data;
        struct ltt_channel_buf_struct *ltt_buf = channel->buf;
 
-       atomic_set(&ltt_buf->wakeup_readers, 1);
+       if(ltt_buf->call_wake_consumer)
+               relay_wake_consumer(ACCESS_ONCE(ltt_buf->wake_consumer_arg), 0);
+//ust//        atomic_set(&ltt_buf->wakeup_readers, 1);
 }
 
 static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf)
@@ -1084,14 +1099,14 @@ int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_
        consumed_old = consumed_old | uconsumed_old;
        consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
 
-       spin_lock(&ltt_buf->full_lock);
+//ust//        spin_lock(&ltt_buf->full_lock);
        if (atomic_long_cmpxchg(&ltt_buf->consumed, consumed_old,
                                consumed_new)
            != consumed_old) {
                /* We have been pushed by the writer : the last
                 * buffer read _is_ corrupted! It can also
                 * happen if this is a buffer we never got. */
-               spin_unlock(&ltt_buf->full_lock);
+//ust//                spin_unlock(&ltt_buf->full_lock);
                return -EIO;
        } else {
                /* tell the client that buffer is now unfull */
@@ -1100,7 +1115,7 @@ int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_
                index = SUBBUF_INDEX(consumed_old, buf->chan);
                data = BUFFER_OFFSET(consumed_old, buf->chan);
                ltt_buf_unfull(buf, index, data);
-               spin_unlock(&ltt_buf->full_lock);
+//ust//                spin_unlock(&ltt_buf->full_lock);
        }
        return 0;
 }
@@ -1455,8 +1470,8 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
        for (j = 0; j < n_subbufs; j++)
                local_set(&ltt_buf->commit_count[j], 0);
 //ust//        init_waitqueue_head(&ltt_buf->write_wait);
-       atomic_set(&ltt_buf->wakeup_readers, 0);
-       spin_lock_init(&ltt_buf->full_lock);
+//ust//        atomic_set(&ltt_buf->wakeup_readers, 0);
+//ust//        spin_lock_init(&ltt_buf->full_lock);
 
        ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
 
@@ -1465,6 +1480,9 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
        local_set(&ltt_buf->events_lost, 0);
        local_set(&ltt_buf->corrupted_subbuffers, 0);
 
+       ltt_buf->call_wake_consumer = 0;
+       ltt_buf->wake_consumer_arg = NULL;
+
        return 0;
 }
 
@@ -1572,8 +1590,14 @@ static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace)
  */
 static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf)
 {
+       struct ltt_channel_struct *channel =
+               (struct ltt_channel_struct *)buf->chan->private_data;
+       struct ltt_channel_buf_struct *ltt_buf = channel->buf;
+
        buf->finalized = 1;
        ltt_force_switch(buf, FORCE_FLUSH);
+
+       relay_wake_consumer(ltt_buf, 1);
 }
 
 static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel)
index 1689418342fa5fde75d3a40f409dcd0fbb6c0cfe..6cfcb077578ac20c42fc07a2a3eeb524402e22f9 100644 (file)
@@ -59,7 +59,7 @@ struct rchan_buf {
 //ust//        unsigned int page_count;        /* number of current buffer pages */ 
        unsigned int finalized;         /* buffer has been finalized */ 
 //ust//        unsigned int cpu;               /* this buf's cpu */ 
-       int shmid;
+       int shmid;                      /* the shmid of the buffer data pages */
 } ____cacheline_aligned; 
 
 /*
@@ -335,16 +335,20 @@ struct ltt_channel_buf_struct {
                                         */
        local_t events_lost;
        local_t corrupted_subbuffers;
-       spinlock_t full_lock;           /*
-                                        * buffer full condition spinlock, only
-                                        * for userspace tracing blocking mode
-                                        * synchronization with reader.
-                                        */
+//ust//        spinlock_t full_lock;           /*
+//ust//                                         * buffer full condition spinlock, only
+//ust//                                         * for userspace tracing blocking mode
+//ust//                                         * synchronization with reader.
+//ust//                                         */
 //ust//        wait_queue_head_t write_wait;   /*
 //ust//                                         * Wait queue for blocking user space
 //ust//                                         * writers
 //ust//                                         */
-       atomic_t wakeup_readers;        /* Boolean : wakeup readers waiting ? */
+//ust//        atomic_t wakeup_readers;        /* Boolean : wakeup readers waiting ? */
+       /* whether or not wake_consumer must be called; must be accessed atomically */
+       int call_wake_consumer;
+       /* the arg to pass to wake_consumer; must be accessed atomically */
+       void *wake_consumer_arg;
 } ____cacheline_aligned;
 
 int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old);
index 6047b6012bc5f5abb98c45c2e28a4038b6b438a6..2b08b310b826b0de71c3a1b42d8bd6ac289e747b 100644 (file)
@@ -316,6 +316,26 @@ int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm
        return ustcomm_recv_message(&app->server, msg, src, timeout);
 }
 
+/* This removes src from the list of active connections of app.
+ */
+
+int ustcomm_app_detach_client(struct ustcomm_app *app, struct ustcomm_source *src)
+{
+       struct ustcomm_server *server = (struct ustcomm_server *)app;
+       struct ustcomm_connection *conn;
+
+       list_for_each_entry(conn, &server->connections, list) {
+               if(conn->fd == src->fd) {
+                       list_del(&conn->list);
+                       goto found;
+               }
+       }
+
+       return -1;
+found:
+       return src->fd;
+}
+
 static int init_named_socket(char *name, char **path_out)
 {
        int result;
index 052f7688bb9f36d804529eca4ca65176fc04c428..b7c119bfcdddea0586e6faa01ab8b2525eb32eef 100644 (file)
@@ -71,14 +71,6 @@ static inline long IS_ERR(const void *ptr)
 
 #define mutex_unlock(m) pthread_mutex_unlock(m)
 
-/* SPINLOCKS */
-
-typedef int spinlock_t;
-
-#define spin_lock(a) /* nothing */
-#define spin_unlock(a) /* nothing */
-#define spin_lock_init(a) /* nothing */
-
 
 /* MALLOCATION */
 
This page took 0.029695 seconds and 4 git commands to generate.