X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=eb4e48629ca3d26a08b3c015b069a0e3c29f7d68;hb=025907937f0f5aa689385a575e8cebb7e9381a1e;hp=6504f0e64cee59a07344703d8c402a96c375f5b1;hpb=34a91bdb42a2a3b01b687ab5e1ba7638401e6dfc;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 6504f0e6..eb4e4862 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -128,15 +128,21 @@ void lib_ring_buffer_print_errors(struct channel *chan, * Handle timer teardown race wrt memory free of private data by * ring buffer signals are handled by a single thread, which permits * a synchronization point between handling of each signal. - * Protected by the ust mutex. + * Protected by the lock within the structure. */ struct timer_signal_data { pthread_t tid; /* thread id managing signals */ int setup_done; int qs_done; + pthread_mutex_t lock; }; -static struct timer_signal_data timer_signal; +static struct timer_signal_data timer_signal = { + .tid = 0, + .setup_done = 0, + .qs_done = 0, + .lock = PTHREAD_MUTEX_INITIALIZER, +}; /** * lib_ring_buffer_reset - Reset ring buffer to initial values. @@ -418,7 +424,6 @@ void *sig_thread(void *arg) } /* - * Called with ust_lock() held. * Ensure only a single thread listens on the timer signal. */ static @@ -427,8 +432,9 @@ void lib_ring_buffer_setup_timer_thread(void) pthread_t thread; int ret; + pthread_mutex_lock(&timer_signal.lock); if (timer_signal.setup_done) - return; + goto end; ret = pthread_create(&thread, NULL, &sig_thread, NULL); if (ret) { @@ -441,11 +447,64 @@ void lib_ring_buffer_setup_timer_thread(void) PERROR("pthread_detach"); } timer_signal.setup_done = 1; +end: + pthread_mutex_unlock(&timer_signal.lock); } /* - * Called with ust_lock() held. + * Wait for signal-handling thread quiescent state. */ +static +void lib_ring_buffer_wait_signal_thread_qs(unsigned int signr) +{ + sigset_t pending_set; + int ret; + + /* + * We need to be the only thread interacting with the thread + * that manages signals for teardown synchronization. + */ + pthread_mutex_lock(&timer_signal.lock); + + /* + * Ensure we don't have any signal queued for this channel. + */ + for (;;) { + ret = sigemptyset(&pending_set); + if (ret == -1) { + PERROR("sigemptyset"); + } + ret = sigpending(&pending_set); + if (ret == -1) { + PERROR("sigpending"); + } + if (!sigismember(&pending_set, signr)) + break; + caa_cpu_relax(); + } + + /* + * From this point, no new signal handler will be fired that + * would try to access "chan". However, we still need to wait + * for any currently executing handler to complete. + */ + cmm_smp_mb(); + CMM_STORE_SHARED(timer_signal.qs_done, 0); + cmm_smp_mb(); + + /* + * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management + * thread wakes up. + */ + kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); + + while (!CMM_LOAD_SHARED(timer_signal.qs_done)) + caa_cpu_relax(); + cmm_smp_mb(); + + pthread_mutex_unlock(&timer_signal.lock); +} + static void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) { @@ -479,13 +538,9 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) } } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) { - sigset_t pending_set; int ret; if (!chan->switch_timer_interval || !chan->switch_timer_enabled) @@ -496,49 +551,12 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) PERROR("timer_delete"); } - /* - * Ensure we don't have any signal queued for this channel. - */ - for (;;) { - ret = sigemptyset(&pending_set); - if (ret == -1) { - PERROR("sigemptyset"); - } - ret = sigpending(&pending_set); - if (ret == -1) { - PERROR("sigpending"); - } - if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_FLUSH)) - break; - caa_cpu_relax(); - } - - /* - * From this point, no new signal handler will be fired that - * would try to access "chan". However, we still need to wait - * for any currently executing handler to complete. - */ - cmm_smp_mb(); - CMM_STORE_SHARED(timer_signal.qs_done, 0); - cmm_smp_mb(); - - /* - * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management - * thread wakes up. - */ - kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); - - while (!CMM_LOAD_SHARED(timer_signal.qs_done)) - caa_cpu_relax(); - cmm_smp_mb(); + lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_FLUSH); chan->switch_timer = 0; chan->switch_timer_enabled = 0; } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_read_timer_start(struct channel *chan) { @@ -574,14 +592,10 @@ void lib_ring_buffer_channel_read_timer_start(struct channel *chan) } } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - sigset_t pending_set; int ret; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -599,42 +613,7 @@ void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) */ lib_ring_buffer_channel_do_read(chan); - - /* - * Ensure we don't have any signal queued for this channel. - */ - for (;;) { - ret = sigemptyset(&pending_set); - if (ret == -1) { - PERROR("sigemptyset"); - } - ret = sigpending(&pending_set); - if (ret == -1) { - PERROR("sigpending"); - } - if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_READ)) - break; - caa_cpu_relax(); - } - - /* - * From this point, no new signal handler will be fired that - * would try to access "chan". However, we still need to wait - * for any currently executing handler to complete. - */ - cmm_smp_mb(); - CMM_STORE_SHARED(timer_signal.qs_done, 0); - cmm_smp_mb(); - - /* - * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management - * thread wakes up. - */ - kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); - - while (!CMM_LOAD_SHARED(timer_signal.qs_done)) - caa_cpu_relax(); - cmm_smp_mb(); + lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_READ); chan->read_timer = 0; chan->read_timer_enabled = 0; @@ -643,9 +622,6 @@ void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) static void channel_unregister_notifiers(struct channel *chan, struct lttng_ust_shm_handle *handle) { - const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - int cpu; - lib_ring_buffer_channel_switch_timer_stop(chan); lib_ring_buffer_channel_read_timer_stop(chan); } @@ -710,7 +686,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval) { - int ret, cpu; + int ret; size_t shmsize, chansize; struct channel *chan; struct lttng_ust_shm_handle *handle; @@ -1051,7 +1027,7 @@ nodata: } /** - * lib_ring_buffer_put_snapshot - move consumed counter forward + * lib_ring_buffer_move_consumer - move consumed counter forward * @buf: ring buffer * @consumed_new: new consumed count value */ @@ -1131,7 +1107,7 @@ retry: */ if (((commit_count - chan->backend.subbuf_size) & chan->commit_count_mask) - - (buf_trunc(consumed_cur, chan) + - (buf_trunc(consumed, chan) >> chan->backend.num_subbuf_order) != 0) goto nodata; @@ -1140,7 +1116,7 @@ retry: * Check that we are not about to read the same subbuffer in * which the writer head is. */ - if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan) + if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed, chan) == 0) goto nodata; @@ -1357,11 +1333,10 @@ void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, - commit_count, oldidx, handle); + commit_count, oldidx, handle, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, - offsets->old, commit_count, - config->cb.subbuffer_header_size(), - handle); + offsets->old + config->cb.subbuffer_header_size(), + commit_count, handle); } /* @@ -1396,10 +1371,9 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, oldidx)->cc); commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, - commit_count, oldidx, handle); + commit_count, oldidx, handle, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, - offsets->old, commit_count, - padding_size, handle); + offsets->old + padding_size, commit_count, handle); } /* @@ -1432,18 +1406,19 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, beginidx)->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, - commit_count, beginidx, handle); + commit_count, beginidx, handle, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx, - offsets->begin, commit_count, - config->cb.subbuffer_header_size(), - handle); + offsets->begin + config->cb.subbuffer_header_size(), + commit_count, handle); } /* * lib_ring_buffer_switch_new_end: finish switching current subbuffer * - * The only remaining threads could be the ones with pending commits. They will - * have to do the deliver themselves. + * Calls subbuffer_set_data_size() to set the data size of the current + * sub-buffer. We do not need to perform check_deliver nor commit here, + * since this task will be done by the "commit" of the event for which + * we are currently doing the space reservation. */ static void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, @@ -1453,26 +1428,12 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - unsigned long endidx = subbuf_index(offsets->end - 1, chan); - unsigned long commit_count, padding_size, data_size; + unsigned long endidx, data_size; + endidx = subbuf_index(offsets->end - 1, chan); data_size = subbuf_offset(offsets->end - 1, chan) + 1; - padding_size = chan->backend.subbuf_size - data_size; subbuffer_set_data_size(config, &buf->backend, endidx, data_size, handle); - - /* - * Order all writes to buffer before the commit count update that will - * determine that the subbuffer is full. - */ - cmm_smp_wmb(); - v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, endidx)->cc); - commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc); - lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1, - commit_count, endidx, handle); - lib_ring_buffer_write_commit_counter(config, buf, chan, endidx, - offsets->end, commit_count, - padding_size, handle); } /* @@ -1485,10 +1446,11 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - uint64_t *tsc) + uint64_t *tsc, + struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - unsigned long off; + unsigned long off, reserve_commit_diff; offsets->begin = v_read(config, &buf->offset); offsets->old = offsets->begin; @@ -1513,36 +1475,69 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * timestamps) are visible to the reader. This is required for * quiescence guarantees for the fusion merge. */ - if (mode == SWITCH_FLUSH || off > 0) { - if (caa_unlikely(off == 0)) { - /* - * A final flush that encounters an empty - * sub-buffer cannot switch buffer if a - * reader is located within this sub-buffer. - * Anyway, the purpose of final flushing of a - * sub-buffer at offset 0 is to handle the case - * of entirely empty stream. - */ - if (caa_unlikely(subbuf_trunc(offsets->begin, chan) - - subbuf_trunc((unsigned long) - uatomic_read(&buf->consumed), chan) - >= chan->backend.buf_size)) - return -1; - /* - * The client does not save any header information. - * Don't switch empty subbuffer on finalize, because it - * is invalid to deliver a completely empty subbuffer. - */ - if (!config->cb.subbuffer_header_size()) + if (mode != SWITCH_FLUSH && !off) + return -1; /* we do not have to switch : buffer is empty */ + + if (caa_unlikely(off == 0)) { + unsigned long sb_index, commit_count; + + /* + * We are performing a SWITCH_FLUSH. At this stage, there are no + * concurrent writes into the buffer. + * + * The client does not save any header information. Don't + * switch empty subbuffer on finalize, because it is invalid to + * deliver a completely empty subbuffer. + */ + if (!config->cb.subbuffer_header_size()) + return -1; + + /* Test new buffer integrity */ + sb_index = subbuf_index(offsets->begin, chan); + commit_count = v_read(config, + &shmp_index(handle, buf->commit_cold, + sb_index)->cc_sb); + reserve_commit_diff = + (buf_trunc(offsets->begin, chan) + >> chan->backend.num_subbuf_order) + - (commit_count & chan->commit_count_mask); + if (caa_likely(reserve_commit_diff == 0)) { + /* Next subbuffer not being written to. */ + if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE && + subbuf_trunc(offsets->begin, chan) + - subbuf_trunc((unsigned long) + uatomic_read(&buf->consumed), chan) + >= chan->backend.buf_size)) { + /* + * We do not overwrite non consumed buffers + * and we are full : don't switch. + */ return -1; + } else { + /* + * Next subbuffer not being written to, and we + * are either in overwrite mode or the buffer is + * not full. It's safe to write in this new + * subbuffer. + */ + } + } else { /* - * Need to write the subbuffer start header on finalize. + * Next subbuffer reserve offset does not match the + * commit offset. Don't perform switch in + * producer-consumer and overwrite mode. Caused by + * either a writer OOPS or too many nested writes over a + * reserve/commit pair. */ - offsets->switch_old_start = 1; + return -1; } - offsets->begin = subbuf_align(offsets->begin, chan); - } else - return -1; /* we do not have to switch : buffer is empty */ + + /* + * Need to write the subbuffer start header on finalize. + */ + offsets->switch_old_start = 1; + } + offsets->begin = subbuf_align(offsets->begin, chan); /* Note: old points to the next subbuf at offset 0 */ offsets->end = offsets->begin; return 0; @@ -1572,7 +1567,7 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum swi */ do { if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets, - &tsc)) + &tsc, handle)) return; /* Switch not needed */ } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end) != offsets.old); @@ -1622,9 +1617,10 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct lttng_ust_shm_handle *handle = ctx->handle; - unsigned long reserve_commit_diff; + unsigned long reserve_commit_diff, offset_cmp; - offsets->begin = v_read(config, &buf->offset); +retry: + offsets->begin = offset_cmp = v_read(config, &buf->offset); offsets->old = offsets->begin; offsets->switch_new_start = 0; offsets->switch_new_end = 0; @@ -1656,7 +1652,7 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, } } if (caa_unlikely(offsets->switch_new_start)) { - unsigned long sb_index; + unsigned long sb_index, commit_count; /* * We are typically not filling the previous buffer completely. @@ -1667,12 +1663,32 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, + config->cb.subbuffer_header_size(); /* Test new buffer integrity */ sb_index = subbuf_index(offsets->begin, chan); + /* + * Read buf->offset before buf->commit_cold[sb_index].cc_sb. + * lib_ring_buffer_check_deliver() has the matching + * memory barriers required around commit_cold cc_sb + * updates to ensure reserve and commit counter updates + * are not seen reordered when updated by another CPU. + */ + cmm_smp_rmb(); + commit_count = v_read(config, + &shmp_index(handle, buf->commit_cold, + sb_index)->cc_sb); + /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */ + cmm_smp_rmb(); + if (caa_unlikely(offset_cmp != v_read(config, &buf->offset))) { + /* + * The reserve counter have been concurrently updated + * while we read the commit counter. This means the + * commit counter we read might not match buf->offset + * due to concurrent update. We therefore need to retry. + */ + goto retry; + } reserve_commit_diff = (buf_trunc(offsets->begin, chan) >> chan->backend.num_subbuf_order) - - ((unsigned long) v_read(config, - &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb) - & chan->commit_count_mask); + - (commit_count & chan->commit_count_mask); if (caa_likely(reserve_commit_diff == 0)) { /* Next subbuffer not being written to. */ if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE && @@ -1707,7 +1723,8 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, /* * Next subbuffer reserve offset does not match the - * commit offset. Drop record in producer-consumer and + * commit offset, and this did not involve update to the + * reserve counter. Drop record in producer-consumer and * overwrite mode. Caused by either a writer OOPS or too * many nested writes over a reserve/commit pair. */