From 46ef48cdf8b64608a4f679500bc34293b9f0b649 Mon Sep 17 00:00:00 2001 From: Pierre-Marc Fournier Date: Mon, 2 Mar 2009 14:27:44 -0500 Subject: [PATCH] ust: first try at blocking support for consumer --- libtracectl/tracectl.c | 104 ++++++++++++++++++++++++++++++++++------- libtracing/relay.c | 36 +++++++++++--- libtracing/relay.h | 18 ++++--- libustcomm/ustcomm.c | 20 ++++++++ share/kernelcompat.h | 8 ---- 5 files changed, 147 insertions(+), 39 deletions(-) diff --git a/libtracectl/tracectl.c b/libtracectl/tracectl.c index c407be9..3732bc0 100644 --- a/libtracectl/tracectl.c +++ b/libtracectl/tracectl.c @@ -11,13 +11,10 @@ #include "tracer.h" #include "localerr.h" #include "ustcomm.h" +#include "relay.h" /* FIXME: remove */ //#define USE_CLONE -#define UNIX_PATH_MAX 108 - -#define SOCKETDIR "/tmp/socks" -#define SOCKETDIRLEN sizeof(SOCKETDIR) #define USTSIGNAL SIGIO #define MAX_MSG_SIZE (100) @@ -47,9 +44,6 @@ struct trctl_msg { char payload[94]; }; -char mysocketfile[UNIX_PATH_MAX] = ""; -//int pfd = -1; - struct consumer_channel { int fd; struct ltt_channel_struct *chan; @@ -184,8 +178,6 @@ void notif_cb(void) } } -#define CONSUMER_DAEMON_SOCK SOCKETDIR "/ustd" - static int inform_consumer_daemon(void) { ustcomm_request_consumer(getpid(), "metadata"); @@ -536,6 +528,55 @@ int listener_main(void *p) free(channel_name); free(consumed_old_str); } + else if(nth_token_is(recvbuf, "get_notifications", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_notifications"); + + channel_name = strdup_malloc(nth_token(recvbuf, 1)); + if(channel_name == NULL) { + ERR("put_subbuf_size: cannot parse channel"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + int fd; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; + + result = fd = ustcomm_app_detach_client(&ustcomm_app, &src); + if(result == -1) { + ERR("ustcomm_app_detach_client failed"); + goto next_cmd; + } + + lttbuf->wake_consumer_arg = (void *) fd; + + smp_wmb(); + + lttbuf->call_wake_consumer = 1; + + break; + } + } + + free(channel_name); + } else { ERR("unable to parse message: %s", recvbuf); } @@ -593,15 +634,15 @@ static int init_socket(void) static void destroy_socket(void) { - int result; - - if(mysocketfile[0] == '\0') - return; - - result = unlink(mysocketfile); - if(result == -1) { - PERROR("unlink"); - } +// int result; +// +// if(mysocketfile[0] == '\0') +// return; +// +// result = unlink(mysocketfile); +// if(result == -1) { +// PERROR("unlink"); +// } } static int init_signal_handler(void) @@ -646,12 +687,39 @@ static void auto_probe_connect(struct marker *m) DBG("just auto connected marker %s %s to probe default", m->channel, m->name); } +/* Wake the consumer of a buffer + * + * wake_consumer_cb is called in tracing context so it must haste. + * + * FIXME: don't do a system call here; maybe schedule work to be done + * in listener context? Once this is done in listener context, we can + * check for the return value of send_message_fd and remove the fd if necessary + * + * @arg: the buffer + * @finished: 0: subbuffer full; 1: buffer being destroyed + */ + +static void wake_consumer_cb(void *arg, int finished) +{ + struct ltt_channel_buf_struct *ltt_buf = (struct ltt_channel_buf_struct *) arg; + int fd = (int)ACCESS_ONCE(arg); + + if(!finished) { + send_message_fd(fd, "consume", NULL); + } + else { + send_message_fd(fd, "destroyed", NULL); + } +} + static void __attribute__((constructor(101))) init0() { DBG("UST_AUTOPROBE constructor"); if(getenv("UST_AUTOPROBE")) { marker_set_new_marker_cb(auto_probe_connect); } + + relay_set_wake_consumer(wake_consumer_cb); } static void fini(void); diff --git a/libtracing/relay.c b/libtracing/relay.c index a8775ec..0133f3e 100644 --- a/libtracing/relay.c +++ b/libtracing/relay.c @@ -873,6 +873,19 @@ static notrace void ltt_buffer_end_callback(struct rchan_buf *buf, } +void (*wake_consumer)(void *, int) = NULL; + +void relay_set_wake_consumer(void (*wake)(void *, int)) +{ + wake_consumer = wake; +} + +void relay_wake_consumer(void *arg, int finished) +{ + if(wake_consumer) + wake_consumer(arg, finished); +} + static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx, void *subbuf) { @@ -880,7 +893,9 @@ static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx, (struct ltt_channel_struct *)buf->chan->private_data; struct ltt_channel_buf_struct *ltt_buf = channel->buf; - atomic_set(<t_buf->wakeup_readers, 1); + if(ltt_buf->call_wake_consumer) + relay_wake_consumer(ACCESS_ONCE(ltt_buf->wake_consumer_arg), 0); +//ust// atomic_set(<t_buf->wakeup_readers, 1); } static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf) @@ -1084,14 +1099,14 @@ int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_ consumed_old = consumed_old | uconsumed_old; consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan); - spin_lock(<t_buf->full_lock); +//ust// spin_lock(<t_buf->full_lock); if (atomic_long_cmpxchg(<t_buf->consumed, consumed_old, consumed_new) != consumed_old) { /* We have been pushed by the writer : the last * buffer read _is_ corrupted! It can also * happen if this is a buffer we never got. */ - spin_unlock(<t_buf->full_lock); +//ust// spin_unlock(<t_buf->full_lock); return -EIO; } else { /* tell the client that buffer is now unfull */ @@ -1100,7 +1115,7 @@ int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_ index = SUBBUF_INDEX(consumed_old, buf->chan); data = BUFFER_OFFSET(consumed_old, buf->chan); ltt_buf_unfull(buf, index, data); - spin_unlock(<t_buf->full_lock); +//ust// spin_unlock(<t_buf->full_lock); } return 0; } @@ -1455,8 +1470,8 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace, for (j = 0; j < n_subbufs; j++) local_set(<t_buf->commit_count[j], 0); //ust// init_waitqueue_head(<t_buf->write_wait); - atomic_set(<t_buf->wakeup_readers, 0); - spin_lock_init(<t_buf->full_lock); +//ust// atomic_set(<t_buf->wakeup_readers, 0); +//ust// spin_lock_init(<t_buf->full_lock); ltt_buffer_begin_callback(buf, trace->start_tsc, 0); @@ -1465,6 +1480,9 @@ static int ltt_relay_create_buffer(struct ltt_trace_struct *trace, local_set(<t_buf->events_lost, 0); local_set(<t_buf->corrupted_subbuffers, 0); + ltt_buf->call_wake_consumer = 0; + ltt_buf->wake_consumer_arg = NULL; + return 0; } @@ -1572,8 +1590,14 @@ static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace) */ static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf) { + struct ltt_channel_struct *channel = + (struct ltt_channel_struct *)buf->chan->private_data; + struct ltt_channel_buf_struct *ltt_buf = channel->buf; + buf->finalized = 1; ltt_force_switch(buf, FORCE_FLUSH); + + relay_wake_consumer(ltt_buf, 1); } static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel) diff --git a/libtracing/relay.h b/libtracing/relay.h index 1689418..6cfcb07 100644 --- a/libtracing/relay.h +++ b/libtracing/relay.h @@ -59,7 +59,7 @@ struct rchan_buf { //ust// unsigned int page_count; /* number of current buffer pages */ unsigned int finalized; /* buffer has been finalized */ //ust// unsigned int cpu; /* this buf's cpu */ - int shmid; + int shmid; /* the shmid of the buffer data pages */ } ____cacheline_aligned; /* @@ -335,16 +335,20 @@ struct ltt_channel_buf_struct { */ local_t events_lost; local_t corrupted_subbuffers; - spinlock_t full_lock; /* - * buffer full condition spinlock, only - * for userspace tracing blocking mode - * synchronization with reader. - */ +//ust// spinlock_t full_lock; /* +//ust// * buffer full condition spinlock, only +//ust// * for userspace tracing blocking mode +//ust// * synchronization with reader. +//ust// */ //ust// wait_queue_head_t write_wait; /* //ust// * Wait queue for blocking user space //ust// * writers //ust// */ - atomic_t wakeup_readers; /* Boolean : wakeup readers waiting ? */ +//ust// atomic_t wakeup_readers; /* Boolean : wakeup readers waiting ? */ + /* whether or not wake_consumer must be called; must be accessed atomically */ + int call_wake_consumer; + /* the arg to pass to wake_consumer; must be accessed atomically */ + void *wake_consumer_arg; } ____cacheline_aligned; int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old); diff --git a/libustcomm/ustcomm.c b/libustcomm/ustcomm.c index 6047b60..2b08b31 100644 --- a/libustcomm/ustcomm.c +++ b/libustcomm/ustcomm.c @@ -316,6 +316,26 @@ int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm return ustcomm_recv_message(&app->server, msg, src, timeout); } +/* This removes src from the list of active connections of app. + */ + +int ustcomm_app_detach_client(struct ustcomm_app *app, struct ustcomm_source *src) +{ + struct ustcomm_server *server = (struct ustcomm_server *)app; + struct ustcomm_connection *conn; + + list_for_each_entry(conn, &server->connections, list) { + if(conn->fd == src->fd) { + list_del(&conn->list); + goto found; + } + } + + return -1; +found: + return src->fd; +} + static int init_named_socket(char *name, char **path_out) { int result; diff --git a/share/kernelcompat.h b/share/kernelcompat.h index 052f768..b7c119b 100644 --- a/share/kernelcompat.h +++ b/share/kernelcompat.h @@ -71,14 +71,6 @@ static inline long IS_ERR(const void *ptr) #define mutex_unlock(m) pthread_mutex_unlock(m) -/* SPINLOCKS */ - -typedef int spinlock_t; - -#define spin_lock(a) /* nothing */ -#define spin_unlock(a) /* nothing */ -#define spin_lock_init(a) /* nothing */ - /* MALLOCATION */ -- 2.34.1