From 03d2d2938911e2641038313211b08546338953c3 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 5 Mar 2013 20:03:37 -0500 Subject: [PATCH] Implement ring buffer periodic buffer switch timer The tricky part was the teardown. It must not race against neither pending signals nor in-flight signal handler execution, otherwise the timer handler could access freed channel data. Use a dedicated thread to handle those signals, with a synchronization point between each handler execution. Signed-off-by: Mathieu Desnoyers --- liblttng-ust-ctl/ustctl.c | 1 + liblttng-ust/lttng-ust-abi.c | 1 + libringbuffer/frontend.h | 6 + libringbuffer/frontend_types.h | 13 +- libringbuffer/ring_buffer_frontend.c | 266 ++++++++++++++++++++++++--- 5 files changed, 254 insertions(+), 33 deletions(-) diff --git a/liblttng-ust-ctl/ustctl.c b/liblttng-ust-ctl/ustctl.c index 54456c8e..fb4330e4 100644 --- a/liblttng-ust-ctl/ustctl.c +++ b/liblttng-ust-ctl/ustctl.c @@ -1569,6 +1569,7 @@ void ustctl_init(void) lttng_ring_buffer_metadata_client_init(); lttng_ring_buffer_client_overwrite_init(); lttng_ring_buffer_client_discard_init(); + lib_ringbuffer_signal_init(); } static __attribute__((destructor)) diff --git a/liblttng-ust/lttng-ust-abi.c b/liblttng-ust/lttng-ust-abi.c index 9085b040..94c05968 100644 --- a/liblttng-ust/lttng-ust-abi.c +++ b/liblttng-ust/lttng-ust-abi.c @@ -412,6 +412,7 @@ int lttng_abi_map_channel(int session_objd, chan = shmp(channel_handle, channel_handle->chan); assert(chan); + chan->handle = channel_handle; config = &chan->backend.config; lttng_chan = channel_get_private(chan); if (!lttng_chan) { diff --git a/libringbuffer/frontend.h b/libringbuffer/frontend.h index b59948df..2eda6e94 100644 --- a/libringbuffer/frontend.h +++ b/libringbuffer/frontend.h @@ -109,6 +109,12 @@ extern int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf, extern void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle); +/* + * Initialize signals for ring buffer. Should be called early e.g. by + * main() in the program to affect all threads. + */ +void lib_ringbuffer_signal_init(void); + /* * Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer. */ diff --git a/libringbuffer/frontend_types.h b/libringbuffer/frontend_types.h index 9d3932d8..c23fdd28 100644 --- a/libringbuffer/frontend_types.h +++ b/libringbuffer/frontend_types.h @@ -29,6 +29,7 @@ */ #include +#include /* for timer_t */ #include #include @@ -37,6 +38,7 @@ #include #include "backend_types.h" #include "shm_internal.h" +#include "shm_types.h" #include "vatomic.h" /* @@ -56,12 +58,17 @@ struct channel { * subbuffer index. */ - unsigned long switch_timer_interval; /* Buffer flush (jiffies) */ - unsigned long read_timer_interval; /* Reader wakeup (jiffies) */ + unsigned long switch_timer_interval; /* Buffer flush (us) */ + timer_t switch_timer; + int switch_timer_enabled; + + unsigned long read_timer_interval; /* Reader wakeup (us) */ + //timer_t read_timer; //wait_queue_head_t read_wait; /* reader wait queue */ int finalized; /* Has channel been finalized */ size_t priv_data_offset; unsigned int nr_streams; /* Number of streams */ + struct lttng_ust_shm_handle *handle; char padding[RB_CHANNEL_PADDING]; /* * Associated backend contains a variable-length array. Needs to @@ -118,8 +125,6 @@ struct lttng_ust_lib_ring_buffer { union v_atomic records_overrun; /* Number of overwritten records */ //wait_queue_head_t read_wait; /* reader buffer-level wait queue */ int finalized; /* buffer has been finalized */ - //struct timer_list switch_timer; /* timer for periodical switch */ - //struct timer_list read_timer; /* timer for read poll */ unsigned long get_subbuf_consumed; /* Read-side consumed */ unsigned long prod_snapshot; /* Producer count snapshot */ unsigned long cons_snapshot; /* Consumer count snapshot */ diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 393aa9f0..76f369e9 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -55,7 +55,10 @@ #include #include #include +#include #include +#include +#include #include #include #include @@ -77,6 +80,10 @@ /* Print DBG() messages about events lost only every 1048576 hits */ #define DBG_PRINT_NR_LOST (1UL << 20) +#define LTTNG_UST_RB_SIG SIGRTMIN +#define LTTNG_UST_RB_SIG_TEARDOWN SIGRTMIN + 1 +#define CLOCKID CLOCK_MONOTONIC + /* * Use POSIX SHM: shm_open(3) and shm_unlink(3). * close(2) to close the fd returned by shm_open. @@ -110,6 +117,20 @@ void lib_ring_buffer_print_errors(struct channel *chan, struct lttng_ust_lib_ring_buffer *buf, int cpu, struct lttng_ust_shm_handle *handle); +/* + * Handle timer teardown race wrt memory free of private data by + * ring buffer signals are handled by a single thread, which permits + * a synchronization point between handling of each signal. + * Protected by the ust mutex. + */ +struct timer_signal_data { + pthread_t tid; /* thread id managing signals */ + int setup_done; + int qs_done; +}; + +static struct timer_signal_data timer_signal; + /** * lib_ring_buffer_reset - Reset ring buffer to initial values. * @buf: Ring buffer. @@ -264,37 +285,208 @@ static void switch_buffer_timer(unsigned long data) } #endif //0 -static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle) +static +void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc) { - struct channel *chan = shmp(handle, buf->backend.chan); - //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config; + struct lttng_ust_shm_handle *handle; + struct channel *chan; + int cpu; + + assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self()); + + chan = si->si_value.sival_ptr; + handle = chan->handle; + config = &chan->backend.config; - if (!chan->switch_timer_interval || buf->switch_timer_enabled) + DBG("Timer for channel %p\n", chan); + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + for_each_possible_cpu(cpu) { + struct lttng_ust_lib_ring_buffer *buf = + shmp(handle, chan->backend.buf[cpu].shmp); + + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, + chan->handle); + } + } else { + struct lttng_ust_lib_ring_buffer *buf = + shmp(handle, chan->backend.buf[0].shmp); + + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, + chan->handle); + } + return; +} + +static +void rb_setmask(sigset_t *mask) +{ + int ret; + + ret = sigemptyset(mask); + if (ret) { + PERROR("sigemptyset"); + } + ret = sigaddset(mask, LTTNG_UST_RB_SIG); + if (ret) { + PERROR("sigaddset"); + } + ret = sigaddset(mask, LTTNG_UST_RB_SIG_TEARDOWN); + if (ret) { + PERROR("sigaddset"); + } +} + +static +void *sig_thread(void *arg) +{ + sigset_t mask; + siginfo_t info; + int signr; + + /* Only self thread will receive signal mask. */ + rb_setmask(&mask); + CMM_STORE_SHARED(timer_signal.tid, pthread_self()); + + for (;;) { + signr = sigwaitinfo(&mask, &info); + if (signr == -1) { + PERROR("sigwaitinfo"); + continue; + } + if (signr == LTTNG_UST_RB_SIG) { + lib_ring_buffer_channel_switch_timer(info.si_signo, + &info, NULL); + } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) { + cmm_smp_mb(); + CMM_STORE_SHARED(timer_signal.qs_done, 1); + cmm_smp_mb(); + } else { + ERR("Unexptected signal %d\n", info.si_signo); + } + } + return NULL; +} + +/* + * Called with ust_lock() held. + * Ensure only a single thread listens on the timer signal. + */ +static +void lib_ring_buffer_setup_timer_thread(void) +{ + pthread_t thread; + int ret; + + if (timer_signal.setup_done) return; - //TODO - //init_timer(&buf->switch_timer); - //buf->switch_timer.function = switch_buffer_timer; - //buf->switch_timer.expires = jiffies + chan->switch_timer_interval; - //buf->switch_timer.data = (unsigned long)buf; - //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - // add_timer_on(&buf->switch_timer, buf->backend.cpu); - //else - // add_timer(&buf->switch_timer); - buf->switch_timer_enabled = 1; + + ret = pthread_create(&thread, NULL, &sig_thread, NULL); + if (ret) { + errno = ret; + PERROR("pthread_create"); + } + ret = pthread_detach(thread); + if (ret) { + errno = ret; + PERROR("pthread_detach"); + } + timer_signal.setup_done = 1; } -static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle) +/* + * Called with ust_lock() held. + */ +static +void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) { - struct channel *chan = shmp(handle, buf->backend.chan); + struct sigevent sev; + struct itimerspec its; + int ret; - if (!chan->switch_timer_interval || !buf->switch_timer_enabled) + if (!chan->switch_timer_interval || chan->switch_timer_enabled) return; - //TODO - //del_timer_sync(&buf->switch_timer); - buf->switch_timer_enabled = 0; + chan->switch_timer_enabled = 1; + + lib_ring_buffer_setup_timer_thread(); + + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = LTTNG_UST_RB_SIG; + sev.sigev_value.sival_ptr = chan; + ret = timer_create(CLOCKID, &sev, &chan->switch_timer); + if (ret == -1) { + PERROR("timer_create"); + } + + its.it_value.tv_sec = chan->switch_timer_interval / 1000000; + its.it_value.tv_nsec = chan->switch_timer_interval % 1000000; + its.it_interval.tv_sec = its.it_value.tv_sec; + its.it_interval.tv_nsec = its.it_value.tv_nsec; + + ret = timer_settime(chan->switch_timer, 0, &its, NULL); + if (ret == -1) { + PERROR("timer_settime"); + } +} + +/* + * Called with ust_lock() held. + */ +static +void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) +{ + sigset_t pending_set; + int sig_is_pending, ret; + + if (!chan->switch_timer_interval || !chan->switch_timer_enabled) + return; + + ret = timer_delete(chan->switch_timer); + if (ret == -1) { + PERROR("timer_delete"); + } + + /* + * Ensure we don't have any signal queued for this channel. + */ + for (;;) { + ret = sigemptyset(&pending_set); + if (ret == -1) { + PERROR("sigemptyset"); + } + ret = sigpending(&pending_set); + if (ret == -1) { + PERROR("sigpending"); + } + sig_is_pending = sigismember(&pending_set, LTTNG_UST_RB_SIG); + if (!sig_is_pending) + break; + caa_cpu_relax(); + } + + /* + * From this point, no new signal handler will be fired that + * would try to access "chan". However, we still need to wait + * for any currently executing handler to complete. + */ + cmm_smp_mb(); + CMM_STORE_SHARED(timer_signal.qs_done, 0); + cmm_smp_mb(); + + /* + * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management + * thread wakes up. + */ + kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); + + while (!CMM_LOAD_SHARED(timer_signal.qs_done)) + caa_cpu_relax(); + cmm_smp_mb(); + + chan->switch_timer = 0; + chan->switch_timer_enabled = 0; } #if 0 @@ -381,17 +573,16 @@ static void channel_unregister_notifiers(struct channel *chan, const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; int cpu; + lib_ring_buffer_channel_switch_timer_stop(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { for_each_possible_cpu(cpu) { struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - lib_ring_buffer_stop_switch_timer(buf, handle); lib_ring_buffer_stop_read_timer(buf, handle); } } else { struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); - lib_ring_buffer_stop_switch_timer(buf, handle); lib_ring_buffer_stop_read_timer(buf, handle); } //channel_backend_unregister_notifiers(&chan->backend); @@ -527,14 +718,16 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff if (ret) goto error_backend_init; + chan->handle = handle; chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order); + chan->switch_timer_interval = switch_timer_interval; + //TODO - //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval); - //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval); - //TODO + //chan->read_timer_interval = read_timer_interval; //init_waitqueue_head(&chan->read_wait); //init_waitqueue_head(&chan->hp_wait); + lib_ring_buffer_channel_switch_timer_start(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { /* * In case of non-hotplug cpu, if the ring-buffer is allocated @@ -543,13 +736,11 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff */ for_each_possible_cpu(cpu) { struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - lib_ring_buffer_start_switch_timer(buf, handle); lib_ring_buffer_start_read_timer(buf, handle); } } else { struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); - lib_ring_buffer_start_switch_timer(buf, handle); lib_ring_buffer_start_read_timer(buf, handle); } return handle; @@ -1592,3 +1783,20 @@ void lttng_fixup_ringbuffer_tls(void) { asm volatile ("" : : "m" (URCU_TLS(lib_ring_buffer_nesting))); } + +void lib_ringbuffer_signal_init(void) +{ + sigset_t mask; + int ret; + + /* + * Block signal for entire process, so only our thread processes + * it. + */ + rb_setmask(&mask); + ret = pthread_sigmask(SIG_BLOCK, &mask, NULL); + if (ret) { + errno = ret; + PERROR("pthread_sigmask"); + } +} -- 2.34.1