X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=45c0659bfafc8bcdb83077babd00e06068d5bf74;hb=ba7025296f76b11cc8547f5c1000df9b5744b529;hp=0d0f26865da9fd4da5aa140c5afcb702c4e89f08;hpb=16adecf1f2e80025667ed53f4905e725894f076a;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 0d0f2686..45c0659b 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -51,7 +51,6 @@ * - put_subbuf */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -61,6 +60,7 @@ #include #include #include +#include #include #include #include @@ -194,6 +194,7 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf, for (i = 0; i < chan->backend.num_subbuf; i++) { struct commit_counters_hot *cc_hot; struct commit_counters_cold *cc_cold; + uint64_t *ts_end; cc_hot = shmp_index(handle, buf->commit_hot, i); if (!cc_hot) @@ -201,9 +202,13 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf, cc_cold = shmp_index(handle, buf->commit_cold, i); if (!cc_cold) return; + ts_end = shmp_index(handle, buf->ts_end, i); + if (!ts_end) + return; v_set(config, &cc_hot->cc, 0); v_set(config, &cc_hot->seq, 0); v_set(config, &cc_cold->cc_sb, 0); + *ts_end = 0; } uatomic_set(&buf->consumed, 0); uatomic_set(&buf->record_disabled, 0); @@ -368,6 +373,16 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, goto free_commit; } + align_shm(shmobj, __alignof__(uint64_t)); + set_shmp(buf->ts_end, + zalloc_shm(shmobj, + sizeof(uint64_t) * chan->backend.num_subbuf)); + if (!shmp(handle, buf->ts_end)) { + ret = -ENOMEM; + goto free_commit_cold; + } + + ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu, handle, shmobj); if (ret) { @@ -414,6 +429,8 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, /* Error handling */ free_init: + /* ts_end will be freed by shm teardown */ +free_commit_cold: /* commit_cold will be freed by shm teardown */ free_commit: /* commit_hot will be freed by shm teardown */ @@ -787,6 +804,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) lib_ring_buffer_setup_timer_thread(); + memset(&sev, 0, sizeof(sev)); sev.sigev_notify = SIGEV_SIGNAL; sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH; sev.sigev_value.sival_ptr = chan; @@ -1005,11 +1023,11 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff /* Calculate the shm allocation layout */ shmsize = sizeof(struct channel); - shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp)); + shmsize += lttng_ust_offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp)); shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * nr_streams; chansize = shmsize; if (priv_data_align) - shmsize += offset_align(shmsize, priv_data_align); + shmsize += lttng_ust_offset_align(shmsize, priv_data_align); shmsize += priv_data_size; /* Allocate normal memory for channel (not shared) */ @@ -1137,7 +1155,7 @@ void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, * Call "destroy" callback, finalize channels, decrement the channel * reference count. Note that when readers have completed data * consumption of finalized channels, get_subbuf() will return -ENODATA. - * They should release their handle at that point. + * They should release their handle at that point. */ void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, int consumer) @@ -1795,15 +1813,29 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; struct commit_counters_hot *cc_hot; + uint64_t *ts_end; data_size = subbuf_offset(offsets->old - 1, chan) + 1; padding_size = chan->backend.subbuf_size - data_size; subbuffer_set_data_size(config, &buf->backend, oldidx, data_size, handle); + ts_end = shmp_index(handle, buf->ts_end, oldidx); + if (!ts_end) + return; /* - * Order all writes to buffer before the commit count update that will - * determine that the subbuffer is full. + * This is the last space reservation in that sub-buffer before + * it gets delivered. This provides exclusive access to write to + * this sub-buffer's ts_end. There are also no concurrent + * readers of that ts_end because delivery of that sub-buffer is + * postponed until the commit counter is incremented for the + * current space reservation. + */ + *ts_end = tsc; + + /* + * Order all writes to buffer and store to ts_end before the commit + * count update that will determine that the subbuffer is full. */ cmm_smp_wmb(); cc_hot = shmp_index(handle, buf->commit_hot, oldidx); @@ -1874,11 +1906,24 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long endidx, data_size; + uint64_t *ts_end; endidx = subbuf_index(offsets->end - 1, chan); data_size = subbuf_offset(offsets->end - 1, chan) + 1; subbuffer_set_data_size(config, &buf->backend, endidx, data_size, handle); + ts_end = shmp_index(handle, buf->ts_end, endidx); + if (!ts_end) + return; + /* + * This is the last space reservation in that sub-buffer before + * it gets delivered. This provides exclusive access to write to + * this sub-buffer's ts_end. There are also no concurrent + * readers of that ts_end because delivery of that sub-buffer is + * postponed until the commit counter is incremented for the + * current space reservation. + */ + *ts_end = tsc; } /* @@ -1996,9 +2041,11 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * Force a sub-buffer switch. This operation is completely reentrant : can be * called while tracing is active with absolutely no lock held. * - * Note, however, that as a v_cmpxchg is used for some atomic - * operations, this function must be called from the CPU which owns the buffer - * for a ACTIVE flush. + * For RING_BUFFER_SYNC_PER_CPU ring buffers, as a v_cmpxchg is used for + * some atomic operations, this function must be called from the CPU + * which owns the buffer for a ACTIVE flush. However, for + * RING_BUFFER_SYNC_GLOBAL ring buffers, this function can be called + * from any CPU. */ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode, struct lttng_ust_shm_handle *handle) @@ -2443,14 +2490,26 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_lib_ring_buffer_c if (caa_likely(v_cmpxchg(config, &cc_cold->cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { + uint64_t *ts_end; + /* * Start of exclusive subbuffer access. We are * guaranteed to be the last writer in this subbuffer * and any other writer trying to access this subbuffer * in this state is required to drop records. + * + * We can read the ts_end for the current sub-buffer + * which has been saved by the very last space + * reservation for the current sub-buffer. + * + * Order increment of commit counter before reading ts_end. */ + cmm_smp_mb(); + ts_end = shmp_index(handle, buf->ts_end, idx); + if (!ts_end) + return; deliver_count_events(config, buf, idx, handle); - config->cb.buffer_end(buf, tsc, idx, + config->cb.buffer_end(buf, *ts_end, idx, lib_ring_buffer_get_data_size(config, buf, idx,