X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=d0649f3e36b9e33b0e1b247854fd6a2c000f714a;hb=ef9ff354212ff4b038e1a5b6a7ed0ffe1b949663;hp=5e6d4df267d2c48d9d4c0bfa44b33202574dcea0;hpb=824f40b81426c6ac82685251018dae00947786a9;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 5e6d4df2..d0649f3e 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -44,9 +44,11 @@ #include #include #include +#include #include "smp.h" -#include +#include +#include "vatomic.h" #include "backend.h" #include "frontend.h" #include "shm.h" @@ -83,25 +85,14 @@ struct switch_offsets { __thread unsigned int lib_ring_buffer_nesting; -static -void lib_ring_buffer_print_errors(struct channel *chan, - struct lib_ring_buffer *buf, int cpu, - struct shm_handle *handle); - /* - * Must be called under cpu hotplug protection. + * TODO: this is unused. Errors are saved within the ring buffer. + * Eventually, allow consumerd to print these errors. */ -void lib_ring_buffer_free(struct lib_ring_buffer *buf, - struct shm_handle *handle) -{ - struct channel *chan = shmp(handle, buf->backend.chan); - - lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle); - /* buf->commit_hot will be freed by shm teardown */ - /* buf->commit_cold will be freed by shm teardown */ - - lib_ring_buffer_backend_free(&buf->backend); -} +static +void lib_ring_buffer_print_errors(struct channel *chan, + struct lttng_ust_lib_ring_buffer *buf, int cpu, + struct lttng_ust_shm_handle *handle); /** * lib_ring_buffer_reset - Reset ring buffer to initial values. @@ -112,11 +103,11 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf, * should not be using the iterator concurrently with reset. The previous * current iterator record is reset. */ -void lib_ring_buffer_reset(struct lib_ring_buffer *buf, - struct shm_handle *handle) +void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned int i; /* @@ -167,17 +158,16 @@ void channel_reset(struct channel *chan) /* * Must be called under cpu hotplug protection. */ -int lib_ring_buffer_create(struct lib_ring_buffer *buf, +int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, struct channel_backend *chanb, int cpu, - struct shm_handle *handle, + struct lttng_ust_shm_handle *handle, struct shm_object *shmobj) { - const struct lib_ring_buffer_config *config = &chanb->config; + const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config; struct channel *chan = caa_container_of(chanb, struct channel, backend); - void *priv = chanb->priv; - unsigned int num_subbuf; + void *priv = channel_get_private(chan); size_t subbuf_header_size; - u64 tsc; + uint64_t tsc; int ret; /* Test for cpu hotplug */ @@ -207,9 +197,6 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, goto free_commit; } - num_subbuf = chan->backend.num_subbuf; - //init_waitqueue_head(&buf->read_wait); - /* * Write the subbuffer header for first subbuffer so we know the total * duration of data gathering. @@ -235,16 +222,15 @@ free_init: free_commit: /* commit_hot will be freed by shm teardown */ free_chanbuf: - lib_ring_buffer_backend_free(&buf->backend); return ret; } #if 0 static void switch_buffer_timer(unsigned long data) { - struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; + struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data; struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; /* * Only flush buffers periodically if readers are active. @@ -262,11 +248,11 @@ static void switch_buffer_timer(unsigned long data) } #endif //0 -static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf, - struct shm_handle *handle) +static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; @@ -282,8 +268,8 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf, buf->switch_timer_enabled = 1; } -static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf, - struct shm_handle *handle) +static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); @@ -301,9 +287,9 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf, */ static void read_buffer_timer(unsigned long data) { - struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; + struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data; struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; CHAN_WARN_ON(chan, !buf->backend.allocated); @@ -324,11 +310,11 @@ static void read_buffer_timer(unsigned long data) } #endif //0 -static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf, - struct shm_handle *handle) +static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -348,11 +334,11 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf, buf->read_timer_enabled = 1; } -static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf, - struct shm_handle *handle) +static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -374,20 +360,20 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf, } static void channel_unregister_notifiers(struct channel *chan, - struct shm_handle *handle) + struct lttng_ust_shm_handle *handle) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; int cpu; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); + struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); lib_ring_buffer_stop_switch_timer(buf, handle); lib_ring_buffer_stop_read_timer(buf, handle); } } else { - struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); + struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); lib_ring_buffer_stop_switch_timer(buf, handle); lib_ring_buffer_stop_read_timer(buf, handle); @@ -395,11 +381,9 @@ static void channel_unregister_notifiers(struct channel *chan, //channel_backend_unregister_notifiers(&chan->backend); } -static void channel_free(struct channel *chan, struct shm_handle *handle, +static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *handle, int shadow) { - int ret; - if (!shadow) channel_backend_free(&chan->backend, handle); /* chan is freed by shm teardown */ @@ -411,7 +395,9 @@ static void channel_free(struct channel *chan, struct shm_handle *handle, * channel_create - Create channel. * @config: ring buffer instance configuration * @name: name of the channel - * @priv: ring buffer client private data + * @priv_data: ring buffer client private data area pointer (output) + * @priv_data_size: length, in bytes, of the private data area. + * @priv_data_init: initialization data for private data. * @buf_addr: pointer the the beginning of the preallocated buffer contiguous * address mapping. It is used only by RING_BUFFER_STATIC * configuration. It can be set to NULL for other backends. @@ -425,17 +411,21 @@ static void channel_free(struct channel *chan, struct shm_handle *handle, * Holds cpu hotplug. * Returns NULL on failure. */ -struct shm_handle *channel_create(const struct lib_ring_buffer_config *config, - const char *name, void *priv, void *buf_addr, - size_t subbuf_size, +struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config, + const char *name, + void **priv_data, + size_t priv_data_align, + size_t priv_data_size, + void *priv_data_init, + void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval, - int *shm_fd, int *wait_fd, uint64_t *memory_map_size) + int **shm_fd, int **wait_fd, uint64_t **memory_map_size) { int ret, cpu; - size_t shmsize; + size_t shmsize, chansize; struct channel *chan; - struct shm_handle *handle; + struct lttng_ust_shm_handle *handle; struct shm_object *shmobj; struct shm_ref *ref; @@ -443,7 +433,7 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config, read_timer_interval)) return NULL; - handle = zmalloc(sizeof(struct shm_handle)); + handle = zmalloc(sizeof(struct lttng_ust_shm_handle)); if (!handle) return NULL; @@ -454,20 +444,43 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config, /* Calculate the shm allocation layout */ shmsize = sizeof(struct channel); + shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp)); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - shmsize += sizeof(struct lib_ring_buffer_shmp) * num_possible_cpus(); + shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * num_possible_cpus(); else - shmsize += sizeof(struct lib_ring_buffer_shmp); + shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp); + chansize = shmsize; + shmsize += offset_align(shmsize, priv_data_align); + shmsize += priv_data_size; shmobj = shm_object_table_append(handle->table, shmsize); if (!shmobj) goto error_append; - set_shmp(handle->chan, zalloc_shm(shmobj, sizeof(struct channel))); + /* struct channel is at object 0, offset 0 (hardcoded) */ + set_shmp(handle->chan, zalloc_shm(shmobj, chansize)); + assert(handle->chan._ref.index == 0); + assert(handle->chan._ref.offset == 0); chan = shmp(handle, handle->chan); if (!chan) goto error_append; - ret = channel_backend_init(&chan->backend, name, config, priv, + /* space for private data */ + if (priv_data_size) { + DECLARE_SHMP(void, priv_data_alloc); + + align_shm(shmobj, priv_data_align); + chan->priv_data_offset = shmobj->allocated_len; + set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size)); + if (!shmp(handle, priv_data_alloc)) + goto error_append; + *priv_data = channel_get_private(chan); + memcpy(*priv_data, priv_data_init, priv_data_size); + } else { + chan->priv_data_offset = -1; + *priv_data = NULL; + } + + ret = channel_backend_init(&chan->backend, name, config, subbuf_size, num_subbuf, handle); if (ret) goto error_backend_init; @@ -487,12 +500,12 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config, * In that off case, we need to allocate for all possible cpus. */ for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); + struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); lib_ring_buffer_start_switch_timer(buf, handle); lib_ring_buffer_start_read_timer(buf, handle); } } else { - struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); + struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); lib_ring_buffer_start_switch_timer(buf, handle); lib_ring_buffer_start_read_timer(buf, handle); @@ -509,13 +522,13 @@ error_table_alloc: return NULL; } -struct shm_handle *channel_handle_create(int shm_fd, int wait_fd, +struct lttng_ust_shm_handle *channel_handle_create(int shm_fd, int wait_fd, uint64_t memory_map_size) { - struct shm_handle *handle; + struct lttng_ust_shm_handle *handle; struct shm_object *object; - handle = zmalloc(sizeof(struct shm_handle)); + handle = zmalloc(sizeof(struct lttng_ust_shm_handle)); if (!handle) return NULL; @@ -528,7 +541,9 @@ struct shm_handle *channel_handle_create(int shm_fd, int wait_fd, shm_fd, wait_fd, memory_map_size); if (!object) goto error_table_object; - + /* struct channel is at object 0, offset 0 (hardcoded) */ + handle->chan._ref.index = 0; + handle->chan._ref.offset = 0; return handle; error_table_object: @@ -538,7 +553,7 @@ error_table_alloc: return NULL; } -int channel_handle_add_stream(struct shm_handle *handle, +int channel_handle_add_stream(struct lttng_ust_shm_handle *handle, int shm_fd, int wait_fd, uint64_t memory_map_size) { struct shm_object *object; @@ -552,7 +567,7 @@ int channel_handle_add_stream(struct shm_handle *handle, } static -void channel_release(struct channel *chan, struct shm_handle *handle, +void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, int shadow) { channel_free(chan, handle, shadow); @@ -566,74 +581,37 @@ void channel_release(struct channel *chan, struct shm_handle *handle, * Call "destroy" callback, finalize channels, decrement the channel * reference count. Note that when readers have completed data * consumption of finalized channels, get_subbuf() will return -ENODATA. - * They should release their handle at that point. Returns the private - * data pointer. + * They should release their handle at that point. */ -void *channel_destroy(struct channel *chan, struct shm_handle *handle, +void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, int shadow) { - const struct lib_ring_buffer_config *config = &chan->backend.config; - void *priv; - int cpu; - if (shadow) { channel_release(chan, handle, shadow); - return NULL; + return; } channel_unregister_notifiers(chan, handle); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - for_each_channel_cpu(cpu, chan) { - struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - - if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, - chan->backend.priv, - cpu, handle); - if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, - handle); - /* - * Perform flush before writing to finalized. - */ - cmm_smp_wmb(); - CMM_ACCESS_ONCE(buf->finalized) = 1; - //wake_up_interruptible(&buf->read_wait); - } - } else { - struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); + /* + * Note: the consumer takes care of finalizing and switching the + * buffers. + */ - if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, chan->backend.priv, -1, handle); - if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, - handle); - /* - * Perform flush before writing to finalized. - */ - cmm_smp_wmb(); - CMM_ACCESS_ONCE(buf->finalized) = 1; - //wake_up_interruptible(&buf->read_wait); - } - CMM_ACCESS_ONCE(chan->finalized) = 1; - //wake_up_interruptible(&chan->hp_wait); - //wake_up_interruptible(&chan->read_wait); /* * sessiond/consumer are keeping a reference on the shm file * descriptor directly. No need to refcount. */ - priv = chan->backend.priv; channel_release(chan, handle, shadow); - return priv; + return; } -struct lib_ring_buffer *channel_get_ring_buffer( - const struct lib_ring_buffer_config *config, +struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer( + const struct lttng_ust_lib_ring_buffer_config *config, struct channel *chan, int cpu, - struct shm_handle *handle, - int *shm_fd, int *wait_fd, - uint64_t *memory_map_size) + struct lttng_ust_shm_handle *handle, + int **shm_fd, int **wait_fd, + uint64_t **memory_map_size) { struct shm_ref *ref; @@ -643,6 +621,8 @@ struct lib_ring_buffer *channel_get_ring_buffer( memory_map_size); return shmp(handle, chan->backend.buf[0].shmp); } else { + if (cpu >= num_possible_cpus()) + return NULL; ref = &chan->backend.buf[cpu].shmp._ref; shm_get_object_data(handle, ref, shm_fd, wait_fd, memory_map_size); @@ -650,12 +630,10 @@ struct lib_ring_buffer *channel_get_ring_buffer( } } -int lib_ring_buffer_open_read(struct lib_ring_buffer *buf, - struct shm_handle *handle, +int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle, int shadow) { - struct channel *chan = shmp(handle, buf->backend.chan); - if (shadow) { if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0) return -EBUSY; @@ -668,8 +646,8 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf, return 0; } -void lib_ring_buffer_release_read(struct lib_ring_buffer *buf, - struct shm_handle *handle, +void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle, int shadow) { struct channel *chan = shmp(handle, buf->backend.chan); @@ -695,12 +673,12 @@ void lib_ring_buffer_release_read(struct lib_ring_buffer *buf, * data to read at consumed position, or 0 if the get operation succeeds. */ -int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, +int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf, unsigned long *consumed, unsigned long *produced, - struct shm_handle *handle) + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long consumed_cur, write_offset; int finalized; @@ -749,11 +727,11 @@ nodata: * @buf: ring buffer * @consumed_new: new consumed count value */ -void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, +void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf, unsigned long consumed_new, - struct shm_handle *handle) + struct lttng_ust_shm_handle *handle) { - struct lib_ring_buffer_backend *bufb = &buf->backend; + struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend; struct channel *chan = shmp(handle, bufb->chan); unsigned long consumed; @@ -779,12 +757,12 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no * data to read at consumed position, or 0 if the get operation succeeds. */ -int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, +int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf, unsigned long consumed, - struct shm_handle *handle) + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long consumed_cur, consumed_idx, commit_count, write_offset; int ret; int finalized; @@ -873,12 +851,12 @@ nodata: * lib_ring_buffer_put_subbuf - release exclusive subbuffer access * @buf: ring buffer */ -void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf, - struct shm_handle *handle) +void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) { - struct lib_ring_buffer_backend *bufb = &buf->backend; + struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend; struct channel *chan = shmp(handle, bufb->chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1 @@ -933,13 +911,13 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf, * position and the writer position. (inclusive) */ static -void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, +void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, unsigned long cons_offset, int cpu, - struct shm_handle *handle) + struct lttng_ust_shm_handle *handle) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long cons_idx, commit_count, commit_count_sb; cons_idx = subbuf_index(cons_offset, chan); @@ -947,7 +925,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb); if (subbuf_offset(commit_count, chan) != 0) - ERRMSG("ring buffer %s, cpu %d: " + DBG("ring buffer %s, cpu %d: " "commit count in subbuffer %lu,\n" "expecting multiples of %lu bytes\n" " [ %lu bytes committed, %lu bytes reader-visible ]\n", @@ -955,25 +933,19 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, chan->backend.subbuf_size, commit_count, commit_count_sb); - ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n", + DBG("ring buffer: %s, cpu %d: %lu bytes committed\n", chan->backend.name, cpu, commit_count); } static -void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, +void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, void *priv, int cpu, - struct shm_handle *handle) + struct lttng_ust_shm_handle *handle) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long write_offset, cons_offset; - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!chan) - return; /* * No need to order commit_count, write_offset and cons_offset reads * because we execute at teardown when no more writer nor reader @@ -982,7 +954,7 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, write_offset = v_read(config, &buf->offset); cons_offset = uatomic_read(&buf->consumed); if (write_offset != cons_offset) - ERRMSG("ring buffer %s, cpu %d: " + DBG("ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", chan->backend.name, cpu, write_offset, cons_offset); @@ -998,13 +970,13 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, static void lib_ring_buffer_print_errors(struct channel *chan, - struct lib_ring_buffer *buf, int cpu, - struct shm_handle *handle) + struct lttng_ust_lib_ring_buffer *buf, int cpu, + struct lttng_ust_shm_handle *handle) { - const struct lib_ring_buffer_config *config = &chan->backend.config; - void *priv = chan->backend.priv; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; + void *priv = channel_get_private(chan); - ERRMSG("ring buffer %s, cpu %d: %lu records written, " + DBG("ring buffer %s, cpu %d: %lu records written, " "%lu records overrun\n", chan->backend.name, cpu, v_read(config, &buf->records_count), @@ -1013,7 +985,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, if (v_read(config, &buf->records_lost_full) || v_read(config, &buf->records_lost_wrap) || v_read(config, &buf->records_lost_big)) - ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n" + DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n" " [ %lu buffer full, %lu nest buffer wrap-around, " "%lu event too big ]\n", chan->backend.name, cpu, @@ -1030,13 +1002,13 @@ void lib_ring_buffer_print_errors(struct channel *chan, * Only executed when the buffer is finalized, in SWITCH_FLUSH. */ static -void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, +void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, - struct shm_handle *handle) + uint64_t tsc, + struct lttng_ust_shm_handle *handle) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old, chan); unsigned long commit_count; @@ -1068,13 +1040,13 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, * subbuffer. */ static -void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, +void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, - struct shm_handle *handle) + uint64_t tsc, + struct lttng_ust_shm_handle *handle) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; @@ -1105,13 +1077,13 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, * that this code is executed before the deliver of this sub-buffer. */ static -void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, +void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, - struct shm_handle *handle) + uint64_t tsc, + struct lttng_ust_shm_handle *handle) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long beginidx = subbuf_index(offsets->begin, chan); unsigned long commit_count; @@ -1141,13 +1113,13 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, * have to do the deliver themselves. */ static -void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, +void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, - struct shm_handle *handle) + uint64_t tsc, + struct lttng_ust_shm_handle *handle) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long endidx = subbuf_index(offsets->end - 1, chan); unsigned long commit_count, padding_size, data_size; @@ -1177,12 +1149,12 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, */ static int lib_ring_buffer_try_switch_slow(enum switch_mode mode, - struct lib_ring_buffer *buf, + struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 *tsc) + uint64_t *tsc) { - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long off; offsets->begin = v_read(config, &buf->offset); @@ -1209,7 +1181,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * quiescence guarantees for the fusion merge. */ if (mode == SWITCH_FLUSH || off > 0) { - if (unlikely(off == 0)) { + if (caa_unlikely(off == 0)) { /* * The client does not save any header information. * Don't switch empty subbuffer on finalize, because it @@ -1238,14 +1210,14 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * operations, this function must be called from the CPU which owns the buffer * for a ACTIVE flush. */ -void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode, - struct shm_handle *handle) +void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode, + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); - const struct lib_ring_buffer_config *config = &chan->backend.config; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; - u64 tsc; + uint64_t tsc; offsets.size = 0; @@ -1297,13 +1269,13 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m * -EIO if data cannot be written into the buffer for any other reason. */ static -int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, +int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - struct lib_ring_buffer_ctx *ctx) + struct lttng_ust_lib_ring_buffer_ctx *ctx) { - const struct lib_ring_buffer_config *config = &chan->backend.config; - struct shm_handle *handle = ctx->handle; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; + struct lttng_ust_shm_handle *handle = ctx->handle; unsigned long reserve_commit_diff; offsets->begin = v_read(config, &buf->offset); @@ -1320,7 +1292,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, if (last_tsc_overflow(config, buf, ctx->tsc)) ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC; - if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) { + if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) { offsets->switch_new_start = 1; /* For offsets->begin */ } else { offsets->size = config->cb.record_header_size(config, chan, @@ -1331,19 +1303,19 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) + ctx->data_size; - if (unlikely(subbuf_offset(offsets->begin, chan) + + if (caa_unlikely(subbuf_offset(offsets->begin, chan) + offsets->size > chan->backend.subbuf_size)) { offsets->switch_old_end = 1; /* For offsets->old */ offsets->switch_new_start = 1; /* For offsets->begin */ } } - if (unlikely(offsets->switch_new_start)) { + if (caa_unlikely(offsets->switch_new_start)) { unsigned long sb_index; /* * We are typically not filling the previous buffer completely. */ - if (likely(offsets->switch_old_end)) + if (caa_likely(offsets->switch_old_end)) offsets->begin = subbuf_align(offsets->begin, chan); offsets->begin = offsets->begin + config->cb.subbuffer_header_size(); @@ -1355,9 +1327,9 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, - ((unsigned long) v_read(config, &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb) & chan->commit_count_mask); - if (likely(reserve_commit_diff == 0)) { + if (caa_likely(reserve_commit_diff == 0)) { /* Next subbuffer not being written to. */ - if (unlikely(config->mode != RING_BUFFER_OVERWRITE && + if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE && subbuf_trunc(offsets->begin, chan) - subbuf_trunc((unsigned long) uatomic_read(&buf->consumed), chan) @@ -1395,7 +1367,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) + ctx->data_size; - if (unlikely(subbuf_offset(offsets->begin, chan) + if (caa_unlikely(subbuf_offset(offsets->begin, chan) + offsets->size > chan->backend.subbuf_size)) { /* * Record too big for subbuffers, report error, don't @@ -1417,7 +1389,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, } offsets->end = offsets->begin + offsets->size; - if (unlikely(subbuf_offset(offsets->end, chan) == 0)) { + if (caa_unlikely(subbuf_offset(offsets->end, chan) == 0)) { /* * The offset_end will fall at the very beginning of the next * subbuffer. @@ -1435,12 +1407,12 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, * -EIO for other errors, else returns 0. * It will take care of sub-buffer switching. */ -int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) +int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx) { struct channel *chan = ctx->chan; - struct shm_handle *handle = ctx->handle; - const struct lib_ring_buffer_config *config = &chan->backend.config; - struct lib_ring_buffer *buf; + struct lttng_ust_shm_handle *handle = ctx->handle; + const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; + struct lttng_ust_lib_ring_buffer *buf; struct switch_offsets offsets; int ret; @@ -1455,9 +1427,9 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) do { ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets, ctx); - if (unlikely(ret)) + if (caa_unlikely(ret)) return ret; - } while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old, + } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old, offsets.end) != offsets.old)); @@ -1484,7 +1456,7 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) /* * Switch old subbuffer if needed. */ - if (unlikely(offsets.switch_old_end)) { + if (caa_unlikely(offsets.switch_old_end)) { lib_ring_buffer_clear_noref(config, &buf->backend, subbuf_index(offsets.old - 1, chan), handle); @@ -1494,10 +1466,10 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) /* * Populate new subbuffer. */ - if (unlikely(offsets.switch_new_start)) + if (caa_unlikely(offsets.switch_new_start)) lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle); - if (unlikely(offsets.switch_new_end)) + if (caa_unlikely(offsets.switch_new_end)) lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle); ctx->slot_size = offsets.size;