#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>
+#include <unistd.h>
#include <fcntl.h>
+#include <signal.h>
+#include <time.h>
#include <urcu/compiler.h>
#include <urcu/ref.h>
#include <urcu/tls-compat.h>
/* Print DBG() messages about events lost only every 1048576 hits */
#define DBG_PRINT_NR_LOST (1UL << 20)
+#define LTTNG_UST_RB_SIG_FLUSH SIGRTMIN
+#define LTTNG_UST_RB_SIG_READ SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_TEARDOWN SIGRTMIN + 2
+#define CLOCKID CLOCK_MONOTONIC
+
/*
* Use POSIX SHM: shm_open(3) and shm_unlink(3).
* close(2) to close the fd returned by shm_open.
DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
+/*
+ * wakeup_fd_mutex protects wakeup fd use by timer from concurrent
+ * close.
+ */
+static pthread_mutex_t wakeup_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
+
static
void lib_ring_buffer_print_errors(struct channel *chan,
struct lttng_ust_lib_ring_buffer *buf, int cpu,
struct lttng_ust_shm_handle *handle);
+/*
+ * Handle timer teardown race wrt memory free of private data by
+ * ring buffer signals are handled by a single thread, which permits
+ * a synchronization point between handling of each signal.
+ * Protected by the ust mutex.
+ */
+struct timer_signal_data {
+ pthread_t tid; /* thread id managing signals */
+ int setup_done;
+ int qs_done;
+};
+
+static struct timer_signal_data timer_signal;
+
/**
* lib_ring_buffer_reset - Reset ring buffer to initial values.
* @buf: Ring buffer.
return ret;
}
-#if 0
-static void switch_buffer_timer(unsigned long data)
+static
+void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
{
- struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
- struct channel *chan = shmp(handle, buf->backend.chan);
- const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+ const struct lttng_ust_lib_ring_buffer_config *config;
+ struct lttng_ust_shm_handle *handle;
+ struct channel *chan;
+ int cpu;
+
+ assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
+
+ chan = si->si_value.sival_ptr;
+ handle = chan->handle;
+ config = &chan->backend.config;
+
+ DBG("Switch timer for channel %p\n", chan);
/*
* Only flush buffers periodically if readers are active.
*/
- if (uatomic_read(&buf->active_readers))
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
-
- //TODO timers
- //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- // mod_timer_pinned(&buf->switch_timer,
- // jiffies + chan->switch_timer_interval);
- //else
- // mod_timer(&buf->switch_timer,
- // jiffies + chan->switch_timer_interval);
+ pthread_mutex_lock(&wakeup_fd_mutex);
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ for_each_possible_cpu(cpu) {
+ struct lttng_ust_lib_ring_buffer *buf =
+ shmp(handle, chan->backend.buf[cpu].shmp);
+ if (uatomic_read(&buf->active_readers))
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+ chan->handle);
+ }
+ } else {
+ struct lttng_ust_lib_ring_buffer *buf =
+ shmp(handle, chan->backend.buf[0].shmp);
+
+ if (uatomic_read(&buf->active_readers))
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+ chan->handle);
+ }
+ pthread_mutex_unlock(&wakeup_fd_mutex);
+ return;
}
-#endif //0
-static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
- struct lttng_ust_shm_handle *handle)
+static
+void lib_ring_buffer_channel_do_read(struct channel *chan)
{
- struct channel *chan = shmp(handle, buf->backend.chan);
- //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+ const struct lttng_ust_lib_ring_buffer_config *config;
+ struct lttng_ust_shm_handle *handle;
+ int cpu;
+
+ handle = chan->handle;
+ config = &chan->backend.config;
- if (!chan->switch_timer_interval || buf->switch_timer_enabled)
+ /*
+ * Only flush buffers periodically if readers are active.
+ */
+ pthread_mutex_lock(&wakeup_fd_mutex);
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ for_each_possible_cpu(cpu) {
+ struct lttng_ust_lib_ring_buffer *buf =
+ shmp(handle, chan->backend.buf[cpu].shmp);
+
+ if (uatomic_read(&buf->active_readers)
+ && lib_ring_buffer_poll_deliver(config, buf,
+ chan, handle)) {
+ lib_ring_buffer_wakeup(buf, handle);
+ }
+ }
+ } else {
+ struct lttng_ust_lib_ring_buffer *buf =
+ shmp(handle, chan->backend.buf[0].shmp);
+
+ if (uatomic_read(&buf->active_readers)
+ && lib_ring_buffer_poll_deliver(config, buf,
+ chan, handle)) {
+ lib_ring_buffer_wakeup(buf, handle);
+ }
+ }
+ pthread_mutex_unlock(&wakeup_fd_mutex);
+}
+
+static
+void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc)
+{
+ struct channel *chan;
+
+ assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
+ chan = si->si_value.sival_ptr;
+ DBG("Read timer for channel %p\n", chan);
+ lib_ring_buffer_channel_do_read(chan);
+ return;
+}
+
+static
+void rb_setmask(sigset_t *mask)
+{
+ int ret;
+
+ ret = sigemptyset(mask);
+ if (ret) {
+ PERROR("sigemptyset");
+ }
+ ret = sigaddset(mask, LTTNG_UST_RB_SIG_FLUSH);
+ if (ret) {
+ PERROR("sigaddset");
+ }
+ ret = sigaddset(mask, LTTNG_UST_RB_SIG_READ);
+ if (ret) {
+ PERROR("sigaddset");
+ }
+ ret = sigaddset(mask, LTTNG_UST_RB_SIG_TEARDOWN);
+ if (ret) {
+ PERROR("sigaddset");
+ }
+}
+
+static
+void *sig_thread(void *arg)
+{
+ sigset_t mask;
+ siginfo_t info;
+ int signr;
+
+ /* Only self thread will receive signal mask. */
+ rb_setmask(&mask);
+ CMM_STORE_SHARED(timer_signal.tid, pthread_self());
+
+ for (;;) {
+ signr = sigwaitinfo(&mask, &info);
+ if (signr == -1) {
+ if (errno != EINTR)
+ PERROR("sigwaitinfo");
+ continue;
+ }
+ if (signr == LTTNG_UST_RB_SIG_FLUSH) {
+ lib_ring_buffer_channel_switch_timer(info.si_signo,
+ &info, NULL);
+ } else if (signr == LTTNG_UST_RB_SIG_READ) {
+ lib_ring_buffer_channel_read_timer(info.si_signo,
+ &info, NULL);
+ } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) {
+ cmm_smp_mb();
+ CMM_STORE_SHARED(timer_signal.qs_done, 1);
+ cmm_smp_mb();
+ } else {
+ ERR("Unexptected signal %d\n", info.si_signo);
+ }
+ }
+ return NULL;
+}
+
+/*
+ * Called with ust_lock() held.
+ * Ensure only a single thread listens on the timer signal.
+ */
+static
+void lib_ring_buffer_setup_timer_thread(void)
+{
+ pthread_t thread;
+ int ret;
+
+ if (timer_signal.setup_done)
return;
- //TODO
- //init_timer(&buf->switch_timer);
- //buf->switch_timer.function = switch_buffer_timer;
- //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
- //buf->switch_timer.data = (unsigned long)buf;
- //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- // add_timer_on(&buf->switch_timer, buf->backend.cpu);
- //else
- // add_timer(&buf->switch_timer);
- buf->switch_timer_enabled = 1;
+
+ ret = pthread_create(&thread, NULL, &sig_thread, NULL);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_create");
+ }
+ ret = pthread_detach(thread);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_detach");
+ }
+ timer_signal.setup_done = 1;
}
-static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
- struct lttng_ust_shm_handle *handle)
+/*
+ * Called with ust_lock() held.
+ */
+static
+void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
{
- struct channel *chan = shmp(handle, buf->backend.chan);
+ struct sigevent sev;
+ struct itimerspec its;
+ int ret;
- if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
+ if (!chan->switch_timer_interval || chan->switch_timer_enabled)
return;
- //TODO
- //del_timer_sync(&buf->switch_timer);
- buf->switch_timer_enabled = 0;
+ chan->switch_timer_enabled = 1;
+
+ lib_ring_buffer_setup_timer_thread();
+
+ sev.sigev_notify = SIGEV_SIGNAL;
+ sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH;
+ sev.sigev_value.sival_ptr = chan;
+ ret = timer_create(CLOCKID, &sev, &chan->switch_timer);
+ if (ret == -1) {
+ PERROR("timer_create");
+ }
+
+ its.it_value.tv_sec = chan->switch_timer_interval / 1000000;
+ its.it_value.tv_nsec = chan->switch_timer_interval % 1000000;
+ its.it_interval.tv_sec = its.it_value.tv_sec;
+ its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+ ret = timer_settime(chan->switch_timer, 0, &its, NULL);
+ if (ret == -1) {
+ PERROR("timer_settime");
+ }
}
-#if 0
/*
- * Polling timer to check the channels for data.
+ * Called with ust_lock() held.
*/
-static void read_buffer_timer(unsigned long data)
+static
+void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
{
- struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
- struct channel *chan = shmp(handle, buf->backend.chan);
- const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+ sigset_t pending_set;
+ int ret;
- CHAN_WARN_ON(chan, !buf->backend.allocated);
+ if (!chan->switch_timer_interval || !chan->switch_timer_enabled)
+ return;
- if (uatomic_read(&buf->active_readers))
- && lib_ring_buffer_poll_deliver(config, buf, chan)) {
- //TODO
- //wake_up_interruptible(&buf->read_wait);
- //wake_up_interruptible(&chan->read_wait);
+ ret = timer_delete(chan->switch_timer);
+ if (ret == -1) {
+ PERROR("timer_delete");
}
- //TODO
- //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- // mod_timer_pinned(&buf->read_timer,
- // jiffies + chan->read_timer_interval);
- //else
- // mod_timer(&buf->read_timer,
- // jiffies + chan->read_timer_interval);
+ /*
+ * Ensure we don't have any signal queued for this channel.
+ */
+ for (;;) {
+ ret = sigemptyset(&pending_set);
+ if (ret == -1) {
+ PERROR("sigemptyset");
+ }
+ ret = sigpending(&pending_set);
+ if (ret == -1) {
+ PERROR("sigpending");
+ }
+ if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_FLUSH))
+ break;
+ caa_cpu_relax();
+ }
+
+ /*
+ * From this point, no new signal handler will be fired that
+ * would try to access "chan". However, we still need to wait
+ * for any currently executing handler to complete.
+ */
+ cmm_smp_mb();
+ CMM_STORE_SHARED(timer_signal.qs_done, 0);
+ cmm_smp_mb();
+
+ /*
+ * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
+ * thread wakes up.
+ */
+ kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
+
+ while (!CMM_LOAD_SHARED(timer_signal.qs_done))
+ caa_cpu_relax();
+ cmm_smp_mb();
+
+ chan->switch_timer = 0;
+ chan->switch_timer_enabled = 0;
}
-#endif //0
-static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
- struct lttng_ust_shm_handle *handle)
+/*
+ * Called with ust_lock() held.
+ */
+static
+void lib_ring_buffer_channel_read_timer_start(struct channel *chan)
{
- struct channel *chan = shmp(handle, buf->backend.chan);
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+ struct sigevent sev;
+ struct itimerspec its;
+ int ret;
if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
- || !chan->read_timer_interval
- || buf->read_timer_enabled)
+ || !chan->read_timer_interval || chan->read_timer_enabled)
return;
- //TODO
- //init_timer(&buf->read_timer);
- //buf->read_timer.function = read_buffer_timer;
- //buf->read_timer.expires = jiffies + chan->read_timer_interval;
- //buf->read_timer.data = (unsigned long)buf;
-
- //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- // add_timer_on(&buf->read_timer, buf->backend.cpu);
- //else
- // add_timer(&buf->read_timer);
- buf->read_timer_enabled = 1;
+ chan->read_timer_enabled = 1;
+
+ lib_ring_buffer_setup_timer_thread();
+
+ sev.sigev_notify = SIGEV_SIGNAL;
+ sev.sigev_signo = LTTNG_UST_RB_SIG_READ;
+ sev.sigev_value.sival_ptr = chan;
+ ret = timer_create(CLOCKID, &sev, &chan->read_timer);
+ if (ret == -1) {
+ PERROR("timer_create");
+ }
+
+ its.it_value.tv_sec = chan->read_timer_interval / 1000000;
+ its.it_value.tv_nsec = chan->read_timer_interval % 1000000;
+ its.it_interval.tv_sec = its.it_value.tv_sec;
+ its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+ ret = timer_settime(chan->read_timer, 0, &its, NULL);
+ if (ret == -1) {
+ PERROR("timer_settime");
+ }
}
-static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
- struct lttng_ust_shm_handle *handle)
+/*
+ * Called with ust_lock() held.
+ */
+static
+void lib_ring_buffer_channel_read_timer_stop(struct channel *chan)
{
- struct channel *chan = shmp(handle, buf->backend.chan);
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+ sigset_t pending_set;
+ int ret;
if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
- || !chan->read_timer_interval
- || !buf->read_timer_enabled)
+ || !chan->read_timer_interval || !chan->read_timer_enabled)
return;
- //TODO
- //del_timer_sync(&buf->read_timer);
+ ret = timer_delete(chan->read_timer);
+ if (ret == -1) {
+ PERROR("timer_delete");
+ }
+
/*
* do one more check to catch data that has been written in the last
* timer period.
*/
- if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
- //TODO
- //wake_up_interruptible(&buf->read_wait);
- //wake_up_interruptible(&chan->read_wait);
+ lib_ring_buffer_channel_do_read(chan);
+
+
+ /*
+ * Ensure we don't have any signal queued for this channel.
+ */
+ for (;;) {
+ ret = sigemptyset(&pending_set);
+ if (ret == -1) {
+ PERROR("sigemptyset");
+ }
+ ret = sigpending(&pending_set);
+ if (ret == -1) {
+ PERROR("sigpending");
+ }
+ if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_READ))
+ break;
+ caa_cpu_relax();
}
- buf->read_timer_enabled = 0;
+
+ /*
+ * From this point, no new signal handler will be fired that
+ * would try to access "chan". However, we still need to wait
+ * for any currently executing handler to complete.
+ */
+ cmm_smp_mb();
+ CMM_STORE_SHARED(timer_signal.qs_done, 0);
+ cmm_smp_mb();
+
+ /*
+ * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
+ * thread wakes up.
+ */
+ kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
+
+ while (!CMM_LOAD_SHARED(timer_signal.qs_done))
+ caa_cpu_relax();
+ cmm_smp_mb();
+
+ chan->read_timer = 0;
+ chan->read_timer_enabled = 0;
}
static void channel_unregister_notifiers(struct channel *chan,
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
int cpu;
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
- for_each_possible_cpu(cpu) {
- struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-
- lib_ring_buffer_stop_switch_timer(buf, handle);
- lib_ring_buffer_stop_read_timer(buf, handle);
- }
- } else {
- struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
-
- lib_ring_buffer_stop_switch_timer(buf, handle);
- lib_ring_buffer_stop_read_timer(buf, handle);
- }
- //channel_backend_unregister_notifiers(&chan->backend);
+ lib_ring_buffer_channel_switch_timer_stop(chan);
+ lib_ring_buffer_channel_read_timer_stop(chan);
}
static void channel_print_errors(struct channel *chan,
static void channel_free(struct channel *chan,
struct lttng_ust_shm_handle *handle)
{
- channel_print_errors(chan, handle);
channel_backend_free(&chan->backend, handle);
/* chan is freed by shm teardown */
shm_object_table_destroy(handle->table);
if (ret)
goto error_backend_init;
+ chan->handle = handle;
chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
- //TODO
- //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
- //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
- //TODO
- //init_waitqueue_head(&chan->read_wait);
- //init_waitqueue_head(&chan->hp_wait);
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
- /*
- * In case of non-hotplug cpu, if the ring-buffer is allocated
- * in early initcall, it will not be notified of secondary cpus.
- * In that off case, we need to allocate for all possible cpus.
- */
- for_each_possible_cpu(cpu) {
- struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
- lib_ring_buffer_start_switch_timer(buf, handle);
- lib_ring_buffer_start_read_timer(buf, handle);
- }
- } else {
- struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
+ chan->switch_timer_interval = switch_timer_interval;
+ chan->read_timer_interval = read_timer_interval;
+ lib_ring_buffer_channel_switch_timer_start(chan);
+ lib_ring_buffer_channel_read_timer_start(chan);
- lib_ring_buffer_start_switch_timer(buf, handle);
- lib_ring_buffer_start_read_timer(buf, handle);
- }
return handle;
error_backend_init:
}
struct lttng_ust_shm_handle *channel_handle_create(void *data,
- uint64_t memory_map_size)
+ uint64_t memory_map_size,
+ int wakeup_fd)
{
struct lttng_ust_shm_handle *handle;
struct shm_object *object;
goto error_table_alloc;
/* Add channel object */
object = shm_object_table_append_mem(handle->table, data,
- memory_map_size);
+ memory_map_size, wakeup_fd);
if (!object)
goto error_table_object;
/* struct channel is at object 0, offset 0 (hardcoded) */
* switching the buffers.
*/
channel_unregister_notifiers(chan, handle);
+ /*
+ * The consumer prints errors.
+ */
+ channel_print_errors(chan, handle);
}
/*
return shmp(handle, chan->backend.buf[cpu].shmp);
}
-int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct shm_ref *ref;
+
+ ref = &handle->chan._ref;
+ return shm_close_wait_fd(handle, ref);
+}
+
+int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct shm_ref *ref;
+
+ ref = &handle->chan._ref;
+ return shm_close_wakeup_fd(handle, ref);
+}
+
+int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
struct channel *chan,
struct lttng_ust_shm_handle *handle,
int cpu)
return shm_close_wait_fd(handle, ref);
}
-int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
struct channel *chan,
struct lttng_ust_shm_handle *handle,
int cpu)
{
struct shm_ref *ref;
+ int ret;
if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
cpu = 0;
return -EINVAL;
}
ref = &chan->backend.buf[cpu].shmp._ref;
- return shm_close_wakeup_fd(handle, ref);
+ pthread_mutex_lock(&wakeup_fd_mutex);
+ ret = shm_close_wakeup_fd(handle, ref);
+ pthread_mutex_unlock(&wakeup_fd_mutex);
+ return ret;
}
int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
{
asm volatile ("" : : "m" (URCU_TLS(lib_ring_buffer_nesting)));
}
+
+void lib_ringbuffer_signal_init(void)
+{
+ sigset_t mask;
+ int ret;
+
+ /*
+ * Block signal for entire process, so only our thread processes
+ * it.
+ */
+ rb_setmask(&mask);
+ ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_sigmask");
+ }
+}