X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=ca19648f1d04f544d4f931bb29344fd3e43b65f4;hb=bfd265823dd6c3b272b92c6e39456e851bfd51ce;hp=310e0b93e51a435b74e6d0b9d0c6b4ebc6226abf;hpb=4318ae1be57eb7983ab4857a7a8eeb4a030a8216;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 310e0b93..ca19648f 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -38,15 +38,18 @@ * Dual LGPL v2.1/GPL v2 license. */ +#define _GNU_SOURCE #include #include #include #include #include #include +#include #include "smp.h" #include +#include "vatomic.h" #include "backend.h" #include "frontend.h" #include "shm.h" @@ -83,25 +86,15 @@ struct switch_offsets { __thread unsigned int lib_ring_buffer_nesting; +/* + * TODO: this is unused. Errors are saved within the ring buffer. + * Eventually, allow consumerd to print these errors. + */ static void lib_ring_buffer_print_errors(struct channel *chan, struct lttng_ust_lib_ring_buffer *buf, int cpu, - struct lttng_ust_shm_handle *handle); - -/* - * Must be called under cpu hotplug protection. - */ -void lib_ring_buffer_free(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle) -{ - struct channel *chan = shmp(handle, buf->backend.chan); - - lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle); - /* buf->commit_hot will be freed by shm teardown */ - /* buf->commit_cold will be freed by shm teardown */ - - lib_ring_buffer_backend_free(&buf->backend); -} + struct lttng_ust_shm_handle *handle) + __attribute__((unused)); /** * lib_ring_buffer_reset - Reset ring buffer to initial values. @@ -174,10 +167,9 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, { const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config; struct channel *chan = caa_container_of(chanb, struct channel, backend); - void *priv = chanb->priv; - unsigned int num_subbuf; + void *priv = channel_get_private(chan); size_t subbuf_header_size; - u64 tsc; + uint64_t tsc; int ret; /* Test for cpu hotplug */ @@ -207,9 +199,6 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, goto free_commit; } - num_subbuf = chan->backend.num_subbuf; - //init_waitqueue_head(&buf->read_wait); - /* * Write the subbuffer header for first subbuffer so we know the total * duration of data gathering. @@ -235,7 +224,6 @@ free_init: free_commit: /* commit_hot will be freed by shm teardown */ free_chanbuf: - lib_ring_buffer_backend_free(&buf->backend); return ret; } @@ -409,7 +397,9 @@ static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *hand * channel_create - Create channel. * @config: ring buffer instance configuration * @name: name of the channel - * @priv: ring buffer client private data + * @priv_data: ring buffer client private data area pointer (output) + * @priv_data_size: length, in bytes, of the private data area. + * @priv_data_init: initialization data for private data. * @buf_addr: pointer the the beginning of the preallocated buffer contiguous * address mapping. It is used only by RING_BUFFER_STATIC * configuration. It can be set to NULL for other backends. @@ -424,14 +414,18 @@ static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *hand * Returns NULL on failure. */ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config, - const char *name, void *priv, void *buf_addr, - size_t subbuf_size, + const char *name, + void **priv_data, + size_t priv_data_align, + size_t priv_data_size, + void *priv_data_init, + void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval, - int *shm_fd, int *wait_fd, uint64_t *memory_map_size) + int **shm_fd, int **wait_fd, uint64_t **memory_map_size) { int ret, cpu; - size_t shmsize; + size_t shmsize, chansize; struct channel *chan; struct lttng_ust_shm_handle *handle; struct shm_object *shmobj; @@ -452,23 +446,43 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff /* Calculate the shm allocation layout */ shmsize = sizeof(struct channel); + shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp)); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * num_possible_cpus(); else shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp); + chansize = shmsize; + shmsize += offset_align(shmsize, priv_data_align); + shmsize += priv_data_size; shmobj = shm_object_table_append(handle->table, shmsize); if (!shmobj) goto error_append; /* struct channel is at object 0, offset 0 (hardcoded) */ - set_shmp(handle->chan, zalloc_shm(shmobj, sizeof(struct channel))); + set_shmp(handle->chan, zalloc_shm(shmobj, chansize)); assert(handle->chan._ref.index == 0); assert(handle->chan._ref.offset == 0); chan = shmp(handle, handle->chan); if (!chan) goto error_append; - ret = channel_backend_init(&chan->backend, name, config, priv, + /* space for private data */ + if (priv_data_size) { + DECLARE_SHMP(void, priv_data_alloc); + + align_shm(shmobj, priv_data_align); + chan->priv_data_offset = shmobj->allocated_len; + set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size)); + if (!shmp(handle, priv_data_alloc)) + goto error_append; + *priv_data = channel_get_private(chan); + memcpy(*priv_data, priv_data_init, priv_data_size); + } else { + chan->priv_data_offset = -1; + *priv_data = NULL; + } + + ret = channel_backend_init(&chan->backend, name, config, subbuf_size, num_subbuf, handle); if (ret) goto error_backend_init; @@ -569,74 +583,37 @@ void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, * Call "destroy" callback, finalize channels, decrement the channel * reference count. Note that when readers have completed data * consumption of finalized channels, get_subbuf() will return -ENODATA. - * They should release their handle at that point. Returns the private - * data pointer. + * They should release their handle at that point. */ -void *channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, +void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, int shadow) { - const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - void *priv; - int cpu; - if (shadow) { channel_release(chan, handle, shadow); - return NULL; + return; } channel_unregister_notifiers(chan, handle); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - for_each_channel_cpu(cpu, chan) { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - - if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, - chan->backend.priv, - cpu, handle); - if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, - handle); - /* - * Perform flush before writing to finalized. - */ - cmm_smp_wmb(); - CMM_ACCESS_ONCE(buf->finalized) = 1; - //wake_up_interruptible(&buf->read_wait); - } - } else { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); + /* + * Note: the consumer takes care of finalizing and switching the + * buffers. + */ - if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, chan->backend.priv, -1, handle); - if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, - handle); - /* - * Perform flush before writing to finalized. - */ - cmm_smp_wmb(); - CMM_ACCESS_ONCE(buf->finalized) = 1; - //wake_up_interruptible(&buf->read_wait); - } - CMM_ACCESS_ONCE(chan->finalized) = 1; - //wake_up_interruptible(&chan->hp_wait); - //wake_up_interruptible(&chan->read_wait); /* * sessiond/consumer are keeping a reference on the shm file * descriptor directly. No need to refcount. */ - priv = chan->backend.priv; channel_release(chan, handle, shadow); - return priv; + return; } struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer( const struct lttng_ust_lib_ring_buffer_config *config, struct channel *chan, int cpu, struct lttng_ust_shm_handle *handle, - int *shm_fd, int *wait_fd, - uint64_t *memory_map_size) + int **shm_fd, int **wait_fd, + uint64_t **memory_map_size) { struct shm_ref *ref; @@ -950,7 +927,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *bu commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb); if (subbuf_offset(commit_count, chan) != 0) - ERRMSG("ring buffer %s, cpu %d: " + DBG("ring buffer %s, cpu %d: " "commit count in subbuffer %lu,\n" "expecting multiples of %lu bytes\n" " [ %lu bytes committed, %lu bytes reader-visible ]\n", @@ -958,7 +935,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *bu chan->backend.subbuf_size, commit_count, commit_count_sb); - ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n", + DBG("ring buffer: %s, cpu %d: %lu bytes committed\n", chan->backend.name, cpu, commit_count); } @@ -971,12 +948,6 @@ void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long write_offset, cons_offset; - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!chan) - return; /* * No need to order commit_count, write_offset and cons_offset reads * because we execute at teardown when no more writer nor reader @@ -985,7 +956,7 @@ void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, write_offset = v_read(config, &buf->offset); cons_offset = uatomic_read(&buf->consumed); if (write_offset != cons_offset) - ERRMSG("ring buffer %s, cpu %d: " + DBG("ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", chan->backend.name, cpu, write_offset, cons_offset); @@ -1005,9 +976,9 @@ void lib_ring_buffer_print_errors(struct channel *chan, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - void *priv = chan->backend.priv; + void *priv = channel_get_private(chan); - ERRMSG("ring buffer %s, cpu %d: %lu records written, " + DBG("ring buffer %s, cpu %d: %lu records written, " "%lu records overrun\n", chan->backend.name, cpu, v_read(config, &buf->records_count), @@ -1016,7 +987,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, if (v_read(config, &buf->records_lost_full) || v_read(config, &buf->records_lost_wrap) || v_read(config, &buf->records_lost_big)) - ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n" + DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n" " [ %lu buffer full, %lu nest buffer wrap-around, " "%lu event too big ]\n", chan->backend.name, cpu, @@ -1036,7 +1007,7 @@ static void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1074,7 +1045,7 @@ static void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1111,7 +1082,7 @@ static void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1147,7 +1118,7 @@ static void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1183,7 +1154,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 *tsc) + uint64_t *tsc) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long off; @@ -1248,7 +1219,7 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum swi const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; - u64 tsc; + uint64_t tsc; offsets.size = 0;