X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=5913f78d70a899fa78fd2409aded4f10723686b9;hb=e92f3e285939848f248af08f11a39a04a7fcf852;hp=fdb4bdfcc9b4f0a289a2dac09facd5aefdb501a9;hpb=a3f61e7f689a5fc60b833a773f462989dc6cc78f;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index fdb4bdfc..5913f78d 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -1,7 +1,22 @@ /* * ring_buffer_frontend.c * - * (C) Copyright 2005-2010 - Mathieu Desnoyers + * Copyright (C) 2005-2012 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * * * Ring buffer wait-free buffer synchronization. Producer-consumer and flight * recorder (overwrite) modes. See thesis: @@ -34,27 +49,32 @@ * - splice one subbuffer worth of data to a pipe * - splice the data from pipe to disk/network * - put_subbuf - * - * Dual LGPL v2.1/GPL v2 license. */ +#define _GNU_SOURCE #include #include #include #include #include #include +#include #include "smp.h" #include +#include "vatomic.h" #include "backend.h" #include "frontend.h" #include "shm.h" +#include "tlsfixup.h" #ifndef max #define max(a, b) ((a) > (b) ? (a) : (b)) #endif +/* Print DBG() messages about events lost only every 1048576 hits */ +#define DBG_PRINT_NR_LOST (1UL << 20) + /* * Use POSIX SHM: shm_open(3) and shm_unlink(3). * close(2) to close the fd returned by shm_open. @@ -83,25 +103,15 @@ struct switch_offsets { __thread unsigned int lib_ring_buffer_nesting; +/* + * TODO: this is unused. Errors are saved within the ring buffer. + * Eventually, allow consumerd to print these errors. + */ static void lib_ring_buffer_print_errors(struct channel *chan, struct lttng_ust_lib_ring_buffer *buf, int cpu, - struct lttng_ust_shm_handle *handle); - -/* - * Must be called under cpu hotplug protection. - */ -void lib_ring_buffer_free(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle) -{ - struct channel *chan = shmp(handle, buf->backend.chan); - - lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle); - /* buf->commit_hot will be freed by shm teardown */ - /* buf->commit_cold will be freed by shm teardown */ - - lib_ring_buffer_backend_free(&buf->backend); -} + struct lttng_ust_shm_handle *handle) + __attribute__((unused)); /** * lib_ring_buffer_reset - Reset ring buffer to initial values. @@ -175,9 +185,8 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config; struct channel *chan = caa_container_of(chanb, struct channel, backend); void *priv = channel_get_private(chan); - unsigned int num_subbuf; size_t subbuf_header_size; - u64 tsc; + uint64_t tsc; int ret; /* Test for cpu hotplug */ @@ -207,9 +216,6 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, goto free_commit; } - num_subbuf = chan->backend.num_subbuf; - //init_waitqueue_head(&buf->read_wait); - /* * Write the subbuffer header for first subbuffer so we know the total * duration of data gathering. @@ -235,7 +241,6 @@ free_init: free_commit: /* commit_hot will be freed by shm teardown */ free_chanbuf: - lib_ring_buffer_backend_free(&buf->backend); return ret; } @@ -411,6 +416,7 @@ static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *hand * @name: name of the channel * @priv_data: ring buffer client private data area pointer (output) * @priv_data_size: length, in bytes, of the private data area. + * @priv_data_init: initialization data for private data. * @buf_addr: pointer the the beginning of the preallocated buffer contiguous * address mapping. It is used only by RING_BUFFER_STATIC * configuration. It can be set to NULL for other backends. @@ -429,10 +435,11 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff void **priv_data, size_t priv_data_align, size_t priv_data_size, + void *priv_data_init, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval, - int *shm_fd, int *wait_fd, uint64_t *memory_map_size) + int **shm_fd, int **wait_fd, uint64_t **memory_map_size) { int ret, cpu; size_t shmsize, chansize; @@ -486,6 +493,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff if (!shmp(handle, priv_data_alloc)) goto error_append; *priv_data = channel_get_private(chan); + memcpy(*priv_data, priv_data_init, priv_data_size); } else { chan->priv_data_offset = -1; *priv_data = NULL; @@ -597,9 +605,6 @@ void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, int shadow) { - const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - int cpu; - if (shadow) { channel_release(chan, handle, shadow); return; @@ -607,42 +612,11 @@ void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, channel_unregister_notifiers(chan, handle); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - for_each_channel_cpu(cpu, chan) { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - - if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, - channel_get_private(chan), - cpu, handle); - if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, - handle); - /* - * Perform flush before writing to finalized. - */ - cmm_smp_wmb(); - CMM_ACCESS_ONCE(buf->finalized) = 1; - //wake_up_interruptible(&buf->read_wait); - } - } else { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); + /* + * Note: the consumer takes care of finalizing and switching the + * buffers. + */ - if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, channel_get_private(chan), -1, handle); - if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, - handle); - /* - * Perform flush before writing to finalized. - */ - cmm_smp_wmb(); - CMM_ACCESS_ONCE(buf->finalized) = 1; - //wake_up_interruptible(&buf->read_wait); - } - CMM_ACCESS_ONCE(chan->finalized) = 1; - //wake_up_interruptible(&chan->hp_wait); - //wake_up_interruptible(&chan->read_wait); /* * sessiond/consumer are keeping a reference on the shm file * descriptor directly. No need to refcount. @@ -655,8 +629,8 @@ struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer( const struct lttng_ust_lib_ring_buffer_config *config, struct channel *chan, int cpu, struct lttng_ust_shm_handle *handle, - int *shm_fd, int *wait_fd, - uint64_t *memory_map_size) + int **shm_fd, int **wait_fd, + uint64_t **memory_map_size) { struct shm_ref *ref; @@ -970,7 +944,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *bu commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb); if (subbuf_offset(commit_count, chan) != 0) - ERRMSG("ring buffer %s, cpu %d: " + DBG("ring buffer %s, cpu %d: " "commit count in subbuffer %lu,\n" "expecting multiples of %lu bytes\n" " [ %lu bytes committed, %lu bytes reader-visible ]\n", @@ -978,7 +952,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *bu chan->backend.subbuf_size, commit_count, commit_count_sb); - ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n", + DBG("ring buffer: %s, cpu %d: %lu bytes committed\n", chan->backend.name, cpu, commit_count); } @@ -991,12 +965,6 @@ void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long write_offset, cons_offset; - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!chan) - return; /* * No need to order commit_count, write_offset and cons_offset reads * because we execute at teardown when no more writer nor reader @@ -1005,7 +973,7 @@ void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, write_offset = v_read(config, &buf->offset); cons_offset = uatomic_read(&buf->consumed); if (write_offset != cons_offset) - ERRMSG("ring buffer %s, cpu %d: " + DBG("ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", chan->backend.name, cpu, write_offset, cons_offset); @@ -1027,23 +995,30 @@ void lib_ring_buffer_print_errors(struct channel *chan, const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; void *priv = channel_get_private(chan); - ERRMSG("ring buffer %s, cpu %d: %lu records written, " - "%lu records overrun\n", - chan->backend.name, cpu, - v_read(config, &buf->records_count), - v_read(config, &buf->records_overrun)); - - if (v_read(config, &buf->records_lost_full) - || v_read(config, &buf->records_lost_wrap) - || v_read(config, &buf->records_lost_big)) - ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n" - " [ %lu buffer full, %lu nest buffer wrap-around, " - "%lu event too big ]\n", - chan->backend.name, cpu, - v_read(config, &buf->records_lost_full), - v_read(config, &buf->records_lost_wrap), - v_read(config, &buf->records_lost_big)); - + if (!strcmp(chan->backend.name, "relay-metadata-mmap")) { + DBG("ring buffer %s: %lu records written, " + "%lu records overrun\n", + chan->backend.name, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + } else { + DBG("ring buffer %s, cpu %d: %lu records written, " + "%lu records overrun\n", + chan->backend.name, cpu, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + + if (v_read(config, &buf->records_lost_full) + || v_read(config, &buf->records_lost_wrap) + || v_read(config, &buf->records_lost_big)) + DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n" + " [ %lu buffer full, %lu nest buffer wrap-around, " + "%lu event too big ]\n", + chan->backend.name, cpu, + v_read(config, &buf->records_lost_full), + v_read(config, &buf->records_lost_wrap), + v_read(config, &buf->records_lost_big)); + } lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle); } @@ -1056,7 +1031,7 @@ static void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1094,7 +1069,7 @@ static void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1131,7 +1106,7 @@ static void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1167,7 +1142,7 @@ static void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1203,7 +1178,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 *tsc) + uint64_t *tsc) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long off; @@ -1268,7 +1243,7 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum swi const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; - u64 tsc; + uint64_t tsc; offsets.size = 0; @@ -1385,11 +1360,19 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, - subbuf_trunc((unsigned long) uatomic_read(&buf->consumed), chan) >= chan->backend.buf_size)) { + unsigned long nr_lost; + /* * We do not overwrite non consumed buffers * and we are full : record is lost. */ + nr_lost = v_read(config, &buf->records_lost_full); v_inc(config, &buf->records_lost_full); + if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) { + DBG("%lu or more records lost in (%s:%d) (buffer full)\n", + nr_lost + 1, chan->backend.name, + buf->backend.cpu); + } return -ENOBUFS; } else { /* @@ -1400,13 +1383,21 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, */ } } else { + unsigned long nr_lost; + /* * Next subbuffer reserve offset does not match the * commit offset. Drop record in producer-consumer and * overwrite mode. Caused by either a writer OOPS or too * many nested writes over a reserve/commit pair. */ + nr_lost = v_read(config, &buf->records_lost_wrap); v_inc(config, &buf->records_lost_wrap); + if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) { + DBG("%lu or more records lost in (%s:%d) (wrap-around)\n", + nr_lost + 1, chan->backend.name, + buf->backend.cpu); + } return -EIO; } offsets->size = @@ -1420,11 +1411,20 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, + ctx->data_size; if (caa_unlikely(subbuf_offset(offsets->begin, chan) + offsets->size > chan->backend.subbuf_size)) { + unsigned long nr_lost; + /* * Record too big for subbuffers, report error, don't * complete the sub-buffer switch. */ + nr_lost = v_read(config, &buf->records_lost_big); v_inc(config, &buf->records_lost_big); + if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) { + DBG("%lu or more records lost in (%s:%d) record size " + " of %zu bytes is too large for buffer\n", + nr_lost + 1, chan->backend.name, + buf->backend.cpu, offsets->size); + } return -ENOSPC; } else { /* @@ -1528,3 +1528,11 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx) ctx->buf_offset = offsets.begin + offsets.pre_header_padding; return 0; } + +/* + * Force a read (imply TLS fixup for dlopen) of TLS variables. + */ +void lttng_fixup_ringbuffer_tls(void) +{ + asm volatile ("" : : "m" (lib_ring_buffer_nesting)); +}