#include <sys/ipc.h>
#include <sys/shm.h>
#include <fcntl.h>
-#include <ust/kernelcompat.h>
-#include <kcompat/kref.h>
+#include <stdlib.h>
+
+#include <ust/clock.h>
+
#include "buffers.h"
#include "channels.h"
#include "tracer.h"
static DEFINE_MUTEX(ust_buffers_channels_mutex);
-static LIST_HEAD(ust_buffers_channels);
+static CDS_LIST_HEAD(ust_buffers_channels);
static int get_n_cpus(void)
{
int result;
static int n_cpus = 0;
- if(n_cpus) {
- return n_cpus;
- }
+ if(!n_cpus) {
+ /* On Linux, when some processors are offline
+ * _SC_NPROCESSORS_CONF counts the offline
+ * processors, whereas _SC_NPROCESSORS_ONLN
+ * does not. If we used _SC_NPROCESSORS_ONLN,
+ * getcpu() could return a value greater than
+ * this sysconf, in which case the arrays
+ * indexed by processor would overflow.
+ */
+ result = sysconf(_SC_NPROCESSORS_CONF);
+ if(result == -1) {
+ return -1;
+ }
- /* On Linux, when some processors are offline
- * _SC_NPROCESSORS_CONF counts the offline
- * processors, whereas _SC_NPROCESSORS_ONLN
- * does not. If we used _SC_NPROCESSORS_ONLN,
- * getcpu() could return a value greater than
- * this sysconf, in which case the arrays
- * indexed by processor would overflow.
- */
- result = sysconf(_SC_NPROCESSORS_CONF);
- if(result == -1) {
- return -1;
+ n_cpus = result;
}
- n_cpus = result;
-
- return result;
+ return n_cpus;
}
-/* _ust_buffers_write()
+/**
+ * _ust_buffers_strncpy_fixup - Fix an incomplete string in a ltt_relay buffer.
+ * @buf : buffer
+ * @offset : offset within the buffer
+ * @len : length to write
+ * @copied: string actually copied
+ * @terminated: does string end with \0
*
- * @buf: destination buffer
- * @offset: offset in destination
- * @src: source buffer
- * @len: length of source
- * @cpy: already copied
+ * Fills string with "X" if incomplete.
*/
-
-void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
- const void *src, size_t len, ssize_t cpy)
+void _ust_buffers_strncpy_fixup(struct ust_buffer *buf, size_t offset,
+ size_t len, size_t copied, int terminated)
{
- do {
- len -= cpy;
- src += cpy;
- offset += cpy;
+ size_t buf_offset, cpy;
+
+ if (copied == len) {
+ /*
+ * Deal with non-terminated string.
+ */
+ assert(!terminated);
+ offset += copied - 1;
+ buf_offset = BUFFER_OFFSET(offset, buf->chan);
+ /*
+ * Underlying layer should never ask for writes across
+ * subbuffers.
+ */
+ assert(buf_offset
+ < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
+ ust_buffers_do_memset(buf->buf_data + buf_offset, '\0', 1);
+ return;
+ }
- WARN_ON(offset >= buf->buf_size);
+ /*
+ * Deal with incomplete string.
+ * Overwrite string's \0 with X too.
+ */
+ cpy = copied - 1;
+ assert(terminated);
+ len -= cpy;
+ offset += cpy;
+ buf_offset = BUFFER_OFFSET(offset, buf->chan);
+
+ /*
+ * Underlying layer should never ask for writes across subbuffers.
+ */
+ assert(buf_offset
+ < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
- cpy = min_t(size_t, len, buf->buf_size - offset);
- ust_buffers_do_copy(buf->buf_data + offset, src, cpy);
- } while (unlikely(len != cpy));
+ ust_buffers_do_memset(buf->buf_data + buf_offset,
+ 'X', len);
+
+ /*
+ * Overwrite last 'X' with '\0'.
+ */
+ offset += len - 1;
+ buf_offset = BUFFER_OFFSET(offset, buf->chan);
+ /*
+ * Underlying layer should never ask for writes across subbuffers.
+ */
+ assert(buf_offset
+ < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
+ ust_buffers_do_memset(buf->buf_data + buf_offset, '\0', 1);
}
-static int ust_buffers_init_buffer(struct ust_trace *trace,
- struct ust_channel *ltt_chan,
- struct ust_buffer *buf,
- unsigned int n_subbufs);
+static void ltt_buffer_begin(struct ust_buffer *buf,
+ u64 tsc, unsigned int subbuf_idx)
+{
+ struct ust_channel *channel = buf->chan;
+ struct ltt_subbuffer_header *header =
+ (struct ltt_subbuffer_header *)
+ ust_buffers_offset_address(buf,
+ subbuf_idx * buf->chan->subbuf_size);
-static int ust_buffers_alloc_buf(struct ust_buffer *buf, size_t *size)
+ header->cycle_count_begin = tsc;
+ header->data_size = 0xFFFFFFFF; /* for recognizing crashed buffers */
+ header->sb_size = 0xFFFFFFFF; /* for recognizing crashed buffers */
+ /* FIXME: add memory barrier? */
+ ltt_write_trace_header(channel->trace, header);
+}
+
+static int map_buf_data(struct ust_buffer *buf, size_t *size)
{
void *ptr;
int result;
*size = PAGE_ALIGN(*size);
result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700);
- if(result == -1 && errno == EINVAL) {
+ if (result < 0 && errno == EINVAL) {
ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased.");
return -1;
- }
- else if(result == -1) {
+ } else if (result < 0) {
PERROR("shmget");
return -1;
}
- /* FIXME: should have matching call to shmdt */
ptr = shmat(buf->shmid, NULL, 0);
- if(ptr == (void *) -1) {
+ if (ptr == (void *) -1) {
perror("shmat");
goto destroy_shmem;
}
return 0;
- destroy_shmem:
+destroy_shmem:
result = shmctl(buf->shmid, IPC_RMID, NULL);
if(result == -1) {
perror("shmctl");
return -1;
}
-int ust_buffers_create_buf(struct ust_channel *channel, int cpu)
+static int open_buf(struct ust_channel *chan, int cpu)
{
- int result;
- struct ust_buffer *buf = channel->buf[cpu];
+ int result, fds[2];
+ unsigned int j;
+ struct ust_trace *trace = chan->trace;
+ struct ust_buffer *buf = chan->buf[cpu];
+ unsigned int n_subbufs = chan->subbuf_cnt;
- buf->cpu = cpu;
- result = ust_buffers_alloc_buf(buf, &channel->alloc_size);
- if(result)
+
+ result = map_buf_data(buf, &chan->alloc_size);
+ if (result < 0)
return -1;
- buf->chan = channel;
- kref_get(&channel->kref);
- return 0;
-}
+ buf->commit_count =
+ zmalloc(sizeof(*buf->commit_count) * n_subbufs);
+ if (!buf->commit_count)
+ goto unmap_buf;
-static void ust_buffers_destroy_channel(struct kref *kref)
-{
- struct ust_channel *chan = container_of(kref, struct ust_channel, kref);
- free(chan);
-}
+ result = pipe(fds);
+ if (result < 0) {
+ PERROR("pipe");
+ goto free_commit_count;
+ }
+ buf->data_ready_fd_read = fds[0];
+ buf->data_ready_fd_write = fds[1];
-static void ust_buffers_destroy_buf(struct ust_buffer *buf)
-{
- struct ust_channel *chan = buf->chan;
- int result;
+ buf->cpu = cpu;
+ buf->chan = chan;
- result = munmap(buf->buf_data, buf->buf_size);
- if(result == -1) {
- PERROR("munmap");
+ uatomic_set(&buf->offset, ltt_subbuffer_header_size());
+ uatomic_set(&buf->consumed, 0);
+ uatomic_set(&buf->active_readers, 0);
+ for (j = 0; j < n_subbufs; j++) {
+ uatomic_set(&buf->commit_count[j].cc, 0);
+ uatomic_set(&buf->commit_count[j].cc_sb, 0);
}
-//ust// chan->buf[buf->cpu] = NULL;
- free(buf);
- kref_put(&chan->kref, ust_buffers_destroy_channel);
-}
+ ltt_buffer_begin(buf, trace->start_tsc, 0);
-/* called from kref_put */
-static void ust_buffers_remove_buf(struct kref *kref)
-{
- struct ust_buffer *buf = container_of(kref, struct ust_buffer, kref);
- ust_buffers_destroy_buf(buf);
+ uatomic_add(&buf->commit_count[0].cc, ltt_subbuffer_header_size());
+
+ uatomic_set(&buf->events_lost, 0);
+ uatomic_set(&buf->corrupted_subbuffers, 0);
+
+ memset(buf->commit_seq, 0, sizeof(buf->commit_seq[0]) * n_subbufs);
+
+ return 0;
+
+free_commit_count:
+ free(buf->commit_count);
+
+unmap_buf:
+ if (shmdt(buf->buf_data) < 0) {
+ PERROR("shmdt failed");
+ }
+
+ return -1;
}
-int ust_buffers_open_buf(struct ust_channel *chan, int cpu)
+static void ltt_relay_print_buffer_errors(struct ust_channel *chan, int cpu);
+
+static void close_buf(struct ust_buffer *buf)
{
+ struct ust_channel *chan = buf->chan;
+ int cpu = buf->cpu;
int result;
- result = ust_buffers_create_buf(chan, cpu);
- if (result == -1)
- return -1;
+ result = shmdt(buf->buf_data);
+ if (result < 0) {
+ PERROR("shmdt");
+ }
- kref_init(&chan->buf[cpu]->kref);
+ free(buf->commit_count);
- result = ust_buffers_init_buffer(chan->trace, chan, chan->buf[cpu], chan->subbuf_cnt);
- if(result == -1)
- return -1;
+ result = close(buf->data_ready_fd_read);
+ if (result < 0) {
+ PERROR("close");
+ }
- return 0;
+ result = close(buf->data_ready_fd_write);
+ if (result < 0 && errno != EBADF) {
+ PERROR("close");
+ }
- /* FIXME: decrementally destroy on error? */
+ /* FIXME: This spews out errors, are they real?:
+ * ltt_relay_print_buffer_errors(chan, cpu); */
}
-/**
- * ust_buffers_close_buf - close a channel buffer
- * @buf: buffer
- */
-static void ust_buffers_close_buf(struct ust_buffer *buf)
-{
- kref_put(&buf->kref, ust_buffers_remove_buf);
-}
-int ust_buffers_channel_open(struct ust_channel *chan, size_t subbuf_size, size_t subbuf_cnt)
+static int open_channel(struct ust_channel *chan, size_t subbuf_size,
+ size_t subbuf_cnt)
{
int i;
int result;
chan->subbuf_size_order = get_count_order(subbuf_size);
chan->alloc_size = subbuf_size * subbuf_cnt;
- kref_init(&chan->kref);
-
- mutex_lock(&ust_buffers_channels_mutex);
- for(i=0; i<chan->n_cpus; i++) {
- result = ust_buffers_open_buf(chan, i);
+ pthread_mutex_lock(&ust_buffers_channels_mutex);
+ for (i=0; i < chan->n_cpus; i++) {
+ result = open_buf(chan, i);
if (result == -1)
goto error;
}
- list_add(&chan->list, &ust_buffers_channels);
- mutex_unlock(&ust_buffers_channels_mutex);
+ cds_list_add(&chan->list, &ust_buffers_channels);
+ pthread_mutex_unlock(&ust_buffers_channels_mutex);
return 0;
/* Jump directly inside the loop to close the buffers that were already
* opened. */
for(; i>=0; i--) {
- ust_buffers_close_buf(chan->buf[i]);
+ close_buf(chan->buf[i]);
error:
do {} while(0);
}
- kref_put(&chan->kref, ust_buffers_destroy_channel);
- mutex_unlock(&ust_buffers_channels_mutex);
+ pthread_mutex_unlock(&ust_buffers_channels_mutex);
return -1;
}
-void ust_buffers_channel_close(struct ust_channel *chan)
+static void close_channel(struct ust_channel *chan)
{
int i;
if(!chan)
return;
- mutex_lock(&ust_buffers_channels_mutex);
+ pthread_mutex_lock(&ust_buffers_channels_mutex);
for(i=0; i<chan->n_cpus; i++) {
/* FIXME: if we make it here, then all buffers were necessarily allocated. Moreover, we don't
* initialize to NULL so we cannot use this check. Should we? */
//ust// if (chan->buf[i])
- ust_buffers_close_buf(chan->buf[i]);
+ close_buf(chan->buf[i]);
}
- list_del(&chan->list);
- kref_put(&chan->kref, ust_buffers_destroy_channel);
- mutex_unlock(&ust_buffers_channels_mutex);
-}
-
-/*
- * -------
- */
+ cds_list_del(&chan->list);
-static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu);
+ pthread_mutex_unlock(&ust_buffers_channels_mutex);
+}
static void ltt_force_switch(struct ust_buffer *buf,
enum force_switch_mode mode);
-/*
- * Trace callbacks
- */
-static void ltt_buffer_begin(struct ust_buffer *buf,
- u64 tsc, unsigned int subbuf_idx)
-{
- struct ust_channel *channel = buf->chan;
- struct ltt_subbuffer_header *header =
- (struct ltt_subbuffer_header *)
- ust_buffers_offset_address(buf,
- subbuf_idx * buf->chan->subbuf_size);
- header->cycle_count_begin = tsc;
- header->data_size = 0xFFFFFFFF; /* for recognizing crashed buffers */
- header->sb_size = 0xFFFFFFFF; /* for recognizing crashed buffers */
- /* FIXME: add memory barrier? */
- ltt_write_trace_header(channel->trace, header);
-}
/*
* offset is assumed to never be 0 here : never deliver a completely empty
header->data_size = data_size;
header->sb_size = PAGE_ALIGN(data_size);
header->cycle_count_end = tsc;
- header->events_lost = local_read(&buf->events_lost);
- header->subbuf_corrupt = local_read(&buf->corrupted_subbuffers);
+ header->events_lost = uatomic_read(&buf->events_lost);
+ header->subbuf_corrupt = uatomic_read(&buf->corrupted_subbuffers);
+ if(unlikely(header->events_lost > 0)) {
+ DBG("Some events (%d) were lost in %s_%d", header->events_lost, buf->chan->channel_name, buf->cpu);
+ }
}
/*
}
/*
- * Promote compiler barrier to a smp_mb().
+ * Promote compiler cmm_barrier to a smp_mb().
* For the specific LTTng case, this IPI call should be removed if the
* architecture does not reorder writes. This should eventually be provided by
* a separate architecture-specific infrastructure.
long consumed_old, consumed_idx, commit_count, write_offset;
//ust// int retval;
- consumed_old = atomic_long_read(&buf->consumed);
+ consumed_old = uatomic_read(&buf->consumed);
consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
- commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb);
+ commit_count = uatomic_read(&buf->commit_count[consumed_idx].cc_sb);
/*
* Make sure we read the commit count before reading the buffer
* data and the write offset. Correct consumed offset ordering
* this is OK because then there is no wmb to execute there.
* If our thread is executing on the same CPU as the on the buffers
* belongs to, we don't have to synchronize it at all. If we are
- * migrated, the scheduler will take care of the memory barriers.
+ * migrated, the scheduler will take care of the memory cmm_barriers.
* Normally, smp_call_function_single() should ensure program order when
* executing the remote function, which implies that it surrounds the
* function execution with :
* required ourself, even if duplicated. It has no performance impact
* anyway.
*
- * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
+ * smp_mb() is needed because cmm_smp_rmb() and cmm_smp_wmb() only order read vs
* read and write vs write. They do not ensure core synchronization. We
- * really have to ensure total order between the 3 barriers running on
+ * really have to ensure total order between the 3 cmm_barriers running on
* the 2 CPUs.
*/
//ust// #ifdef LTT_NO_IPI_BARRIER
* Local rmb to match the remote wmb to read the commit count before the
* buffer data and the write offset.
*/
- smp_rmb();
+ cmm_smp_rmb();
//ust// #else
//ust// if (raw_smp_processor_id() != buf->cpu) {
//ust// smp_mb(); /* Total order with IPI handler smp_mb() */
//ust// }
//ust// #endif
- write_offset = local_read(&buf->offset);
+ write_offset = uatomic_read(&buf->offset);
/*
* Check that the subbuffer we are trying to consume has been
* already fully committed.
{
long consumed_new, consumed_old;
- consumed_old = atomic_long_read(&buf->consumed);
+ consumed_old = uatomic_read(&buf->consumed);
consumed_old = consumed_old & (~0xFFFFFFFFL);
consumed_old = consumed_old | uconsumed_old;
consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
//ust// spin_lock(<t_buf->full_lock);
- if (atomic_long_cmpxchg(&buf->consumed, consumed_old,
+ if (uatomic_cmpxchg(&buf->consumed, consumed_old,
consumed_new)
!= consumed_old) {
/* We have been pushed by the writer : the last
return 0;
}
-//ust// static void switch_buffer(unsigned long data)
-//ust// {
-//ust// struct ltt_channel_buf_struct *ltt_buf =
-//ust// (struct ltt_channel_buf_struct *)data;
-//ust// struct rchan_buf *buf = ltt_buf->rbuf;
-//ust//
-//ust// if (buf)
-//ust// ltt_force_switch(buf, FORCE_ACTIVE);
-//ust//
-//ust// ltt_buf->switch_timer.expires += ltt_buf->switch_timer_interval;
-//ust// add_timer_on(<t_buf->switch_timer, smp_processor_id());
-//ust// }
-//ust//
-//ust// static void start_switch_timer(struct ltt_channel_struct *ltt_channel)
-//ust// {
-//ust// struct rchan *rchan = ltt_channel->trans_channel_data;
-//ust// int cpu;
-//ust//
-//ust// if (!ltt_channel->switch_timer_interval)
-//ust// return;
-//ust//
-//ust// // TODO : hotplug
-//ust// for_each_online_cpu(cpu) {
-//ust// struct ltt_channel_buf_struct *ltt_buf;
-//ust// struct rchan_buf *buf;
-//ust//
-//ust// buf = rchan->buf[cpu];
-//ust// ltt_buf = buf->chan_private;
-//ust// buf->random_access = 1;
-//ust// ltt_buf->switch_timer_interval =
-//ust// ltt_channel->switch_timer_interval;
-//ust// init_timer(<t_buf->switch_timer);
-//ust// ltt_buf->switch_timer.function = switch_buffer;
-//ust// ltt_buf->switch_timer.expires = jiffies +
-//ust// ltt_buf->switch_timer_interval;
-//ust// ltt_buf->switch_timer.data = (unsigned long)ltt_buf;
-//ust// add_timer_on(<t_buf->switch_timer, cpu);
-//ust// }
-//ust// }
-//ust//
-//ust// /*
-//ust// * Cannot use del_timer_sync with add_timer_on, so use an IPI to locally
-//ust// * delete the timer.
-//ust// */
-//ust// static void stop_switch_timer_ipi(void *info)
-//ust// {
-//ust// struct ltt_channel_buf_struct *ltt_buf =
-//ust// (struct ltt_channel_buf_struct *)info;
-//ust//
-//ust// del_timer(<t_buf->switch_timer);
-//ust// }
-//ust//
-//ust// static void stop_switch_timer(struct ltt_channel_struct *ltt_channel)
-//ust// {
-//ust// struct rchan *rchan = ltt_channel->trans_channel_data;
-//ust// int cpu;
-//ust//
-//ust// if (!ltt_channel->switch_timer_interval)
-//ust// return;
-//ust//
-//ust// // TODO : hotplug
-//ust// for_each_online_cpu(cpu) {
-//ust// struct ltt_channel_buf_struct *ltt_buf;
-//ust// struct rchan_buf *buf;
-//ust//
-//ust// buf = rchan->buf[cpu];
-//ust// ltt_buf = buf->chan_private;
-//ust// smp_call_function(stop_switch_timer_ipi, ltt_buf, 1);
-//ust// buf->random_access = 0;
-//ust// }
-//ust// }
-
-//ust// static void ust_buffers_print_written(struct ust_channel *chan,
-//ust// long cons_off, unsigned int cpu)
-//ust// {
-//ust// struct ust_buffer *buf = chan->buf[cpu];
-//ust// long cons_idx, events_count;
-//ust//
-//ust// cons_idx = SUBBUF_INDEX(cons_off, chan);
-//ust// events_count = local_read(&buf->commit_count[cons_idx].events);
-//ust//
-//ust// if (events_count)
-//ust// printk(KERN_INFO
-//ust// "channel %s: %lu events written (cpu %u, index %lu)\n",
-//ust// chan->channel_name, events_count, cpu, cons_idx);
-//ust// }
-
static void ltt_relay_print_subbuffer_errors(
struct ust_channel *channel,
long cons_off, int cpu)
long cons_idx, commit_count, commit_count_sb, write_offset;
cons_idx = SUBBUF_INDEX(cons_off, channel);
- commit_count = local_read(<t_buf->commit_count[cons_idx].cc);
- commit_count_sb = local_read(<t_buf->commit_count[cons_idx].cc_sb);
+ commit_count = uatomic_read(<t_buf->commit_count[cons_idx].cc);
+ commit_count_sb = uatomic_read(<t_buf->commit_count[cons_idx].cc_sb);
/*
* No need to order commit_count and write_offset reads because we
* execute after trace is stopped when there are no readers left.
*/
- write_offset = local_read(<t_buf->offset);
+ write_offset = uatomic_read(<t_buf->offset);
WARN( "LTT : unread channel %s offset is %ld "
"and cons_off : %ld (cpu %d)\n",
channel->channel_name, write_offset, cons_off, cpu);
//ust// for (cons_off = 0; cons_off < rchan->alloc_size;
//ust// cons_off = SUBBUF_ALIGN(cons_off, rchan))
//ust// ust_buffers_print_written(ltt_chan, cons_off, cpu);
- for (cons_off = atomic_long_read(<t_buf->consumed);
- (SUBBUF_TRUNC(local_read(<t_buf->offset),
+ for (cons_off = uatomic_read(<t_buf->consumed);
+ (SUBBUF_TRUNC(uatomic_read(<t_buf->offset),
channel)
- cons_off) > 0;
cons_off = SUBBUF_ALIGN(cons_off, channel))
struct ust_trace *trace = channel->trace;
struct ust_buffer *ltt_buf = channel->buf[cpu];
- if (local_read(<t_buf->events_lost))
+ if (uatomic_read(<t_buf->events_lost))
ERR("channel %s: %ld events lost (cpu %d)",
channel->channel_name,
- local_read(<t_buf->events_lost), cpu);
- if (local_read(<t_buf->corrupted_subbuffers))
+ uatomic_read(<t_buf->events_lost), cpu);
+ if (uatomic_read(<t_buf->corrupted_subbuffers))
ERR("channel %s : %ld corrupted subbuffers (cpu %d)",
channel->channel_name,
- local_read(<t_buf->corrupted_subbuffers), cpu);
+ uatomic_read(<t_buf->corrupted_subbuffers), cpu);
ltt_relay_print_errors(trace, channel, cpu);
}
-static void ltt_relay_release_channel(struct kref *kref)
-{
- struct ust_channel *ltt_chan = container_of(kref,
- struct ust_channel, kref);
- free(ltt_chan->buf);
-}
-
-/*
- * Create ltt buffer.
- */
-//ust// static int ltt_relay_create_buffer(struct ust_trace *trace,
-//ust// struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf,
-//ust// unsigned int cpu, unsigned int n_subbufs)
-//ust// {
-//ust// struct ltt_channel_buf_struct *ltt_buf =
-//ust// percpu_ptr(ltt_chan->buf, cpu);
-//ust// unsigned int j;
-//ust//
-//ust// ltt_buf->commit_count =
-//ust// kzalloc_node(sizeof(ltt_buf->commit_count) * n_subbufs,
-//ust// GFP_KERNEL, cpu_to_node(cpu));
-//ust// if (!ltt_buf->commit_count)
-//ust// return -ENOMEM;
-//ust// kref_get(&trace->kref);
-//ust// kref_get(&trace->ltt_transport_kref);
-//ust// kref_get(<t_chan->kref);
-//ust// local_set(<t_buf->offset, ltt_subbuffer_header_size());
-//ust// atomic_long_set(<t_buf->consumed, 0);
-//ust// atomic_long_set(<t_buf->active_readers, 0);
-//ust// for (j = 0; j < n_subbufs; j++)
-//ust// local_set(<t_buf->commit_count[j], 0);
-//ust// init_waitqueue_head(<t_buf->write_wait);
-//ust// atomic_set(<t_buf->wakeup_readers, 0);
-//ust// spin_lock_init(<t_buf->full_lock);
-//ust//
-//ust// ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
-//ust// /* atomic_add made on local variable on data that belongs to
-//ust// * various CPUs : ok because tracing not started (for this cpu). */
-//ust// local_add(ltt_subbuffer_header_size(), <t_buf->commit_count[0]);
-//ust//
-//ust// local_set(<t_buf->events_lost, 0);
-//ust// local_set(<t_buf->corrupted_subbuffers, 0);
-//ust//
-//ust// return 0;
-//ust// }
-
-static int ust_buffers_init_buffer(struct ust_trace *trace,
- struct ust_channel *ltt_chan, struct ust_buffer *buf,
- unsigned int n_subbufs)
-{
- unsigned int j;
- int fds[2];
- int result;
-
- buf->commit_count =
- zmalloc(sizeof(*buf->commit_count) * n_subbufs);
- if (!buf->commit_count)
- return -ENOMEM;
- kref_get(&trace->kref);
- kref_get(&trace->ltt_transport_kref);
- kref_get(<t_chan->kref);
- local_set(&buf->offset, ltt_subbuffer_header_size());
- atomic_long_set(&buf->consumed, 0);
- atomic_long_set(&buf->active_readers, 0);
- for (j = 0; j < n_subbufs; j++) {
- local_set(&buf->commit_count[j].cc, 0);
- local_set(&buf->commit_count[j].cc_sb, 0);
- }
-//ust// init_waitqueue_head(&buf->write_wait);
-//ust// atomic_set(&buf->wakeup_readers, 0);
-//ust// spin_lock_init(&buf->full_lock);
-
- ltt_buffer_begin(buf, trace->start_tsc, 0);
-
- local_add(ltt_subbuffer_header_size(), &buf->commit_count[0].cc);
-
- local_set(&buf->events_lost, 0);
- local_set(&buf->corrupted_subbuffers, 0);
-
- result = pipe(fds);
- if(result == -1) {
- PERROR("pipe");
- return -1;
- }
- buf->data_ready_fd_read = fds[0];
- buf->data_ready_fd_write = fds[1];
-
- /* FIXME: do we actually need this? */
- result = fcntl(fds[0], F_SETFL, O_NONBLOCK);
- if(result == -1) {
- PERROR("fcntl");
- }
-
-//ust// buf->commit_seq = malloc(sizeof(buf->commit_seq) * n_subbufs);
-//ust// if(!ltt_buf->commit_seq) {
-//ust// return -1;
-//ust// }
- memset(buf->commit_seq, 0, sizeof(buf->commit_seq[0]) * n_subbufs);
-
- /* FIXME: decrementally destroy on error */
-
- return 0;
-}
-
-/* FIXME: use this function */
-static void ust_buffers_destroy_buffer(struct ust_channel *ltt_chan, int cpu)
-{
- struct ust_trace *trace = ltt_chan->trace;
- struct ust_buffer *ltt_buf = ltt_chan->buf[cpu];
-
- kref_put(<t_chan->trace->ltt_transport_kref,
- ltt_release_transport);
- ltt_relay_print_buffer_errors(ltt_chan, cpu);
-//ust// free(ltt_buf->commit_seq);
- kfree(ltt_buf->commit_count);
- ltt_buf->commit_count = NULL;
- kref_put(<t_chan->kref, ltt_relay_release_channel);
- kref_put(&trace->kref, ltt_release_trace);
-//ust// wake_up_interruptible(&trace->kref_wq);
-}
-
-static int ust_buffers_alloc_channel_buf_structs(struct ust_channel *chan)
+static int map_buf_structs(struct ust_channel *chan)
{
void *ptr;
int result;
goto destroy_previous;
}
- /* FIXME: should have matching call to shmdt */
ptr = shmat(chan->buf_struct_shmids[i], NULL, 0);
if(ptr == (void *) -1) {
perror("shmat");
return -1;
}
+static int unmap_buf_structs(struct ust_channel *chan)
+{
+ int i;
+
+ for (i=0; i < chan->n_cpus; i++) {
+ if (shmdt(chan->buf[i]) < 0) {
+ PERROR("shmdt");
+ }
+ }
+}
+
/*
* Create channel.
*/
-static int ust_buffers_create_channel(const char *trace_name, struct ust_trace *trace,
- const char *channel_name, struct ust_channel *ltt_chan,
+static int create_channel(const char *trace_name, struct ust_trace *trace,
+ const char *channel_name, struct ust_channel *chan,
unsigned int subbuf_size, unsigned int n_subbufs, int overwrite)
{
- int result;
+ int i, result;
- kref_init(<t_chan->kref);
+ chan->trace = trace;
+ chan->overwrite = overwrite;
+ chan->n_subbufs_order = get_count_order(n_subbufs);
+ chan->commit_count_mask = (~0UL >> chan->n_subbufs_order);
+ chan->n_cpus = get_n_cpus();
- ltt_chan->trace = trace;
- ltt_chan->overwrite = overwrite;
- ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
- ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
- ltt_chan->n_cpus = get_n_cpus();
-//ust// ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct), GFP_KERNEL, cpu_possible_map);
- ltt_chan->buf = (void *) malloc(ltt_chan->n_cpus * sizeof(void *));
- if(ltt_chan->buf == NULL) {
+ /* These mappings should ideall be per-cpu, if somebody can do that
+ * from userspace, that would be cool!
+ */
+ chan->buf = (void *) zmalloc(chan->n_cpus * sizeof(void *));
+ if(chan->buf == NULL) {
goto error;
}
- ltt_chan->buf_struct_shmids = (int *) malloc(ltt_chan->n_cpus * sizeof(int));
- if(ltt_chan->buf_struct_shmids == NULL)
+ chan->buf_struct_shmids = (int *) zmalloc(chan->n_cpus * sizeof(int));
+ if(chan->buf_struct_shmids == NULL)
goto free_buf;
- result = ust_buffers_alloc_channel_buf_structs(ltt_chan);
+ result = map_buf_structs(chan);
if(result != 0) {
goto free_buf_struct_shmids;
}
- result = ust_buffers_channel_open(ltt_chan, subbuf_size, n_subbufs);
+ result = open_channel(chan, subbuf_size, n_subbufs);
if (result != 0) {
ERR("Cannot open channel for trace %s", trace_name);
- goto unalloc_buf_structs;
+ goto unmap_buf_structs;
}
return 0;
-unalloc_buf_structs:
- /* FIXME: put a call here to unalloc the buf structs! */
+unmap_buf_structs:
+ for (i=0; i < chan->n_cpus; i++) {
+ if (shmdt(chan->buf[i]) < 0) {
+ PERROR("shmdt bufstruct");
+ }
+ }
free_buf_struct_shmids:
- free(ltt_chan->buf_struct_shmids);
+ free(chan->buf_struct_shmids);
free_buf:
- free(ltt_chan->buf);
+ free(chan->buf);
error:
return -1;
}
-/*
- * LTTng channel flush function.
- *
- * Must be called when no tracing is active in the channel, because of
- * accesses across CPUs.
- */
-static notrace void ltt_relay_buffer_flush(struct ust_buffer *buf)
+
+static void remove_channel(struct ust_channel *chan)
{
- int result;
+ close_channel(chan);
-//ust// buf->finalized = 1;
- ltt_force_switch(buf, FORCE_FLUSH);
+ unmap_buf_structs(chan);
+
+ free(chan->buf_struct_shmids);
+
+ free(chan->buf);
- result = write(buf->data_ready_fd_write, "1", 1);
- if(result == -1) {
- PERROR("write (in ltt_relay_buffer_flush)");
- ERR("this should never happen!");
- }
}
static void ltt_relay_async_wakeup_chan(struct ust_channel *ltt_channel)
//ust// struct ltt_channel_buf_struct *ltt_buf =
//ust// percpu_ptr(ltt_channel->buf, i);
//ust//
-//ust// if (atomic_read(<t_buf->wakeup_readers) == 1) {
-//ust// atomic_set(<t_buf->wakeup_readers, 0);
+//ust// if (uatomic_read(<t_buf->wakeup_readers) == 1) {
+//ust// uatomic_set(<t_buf->wakeup_readers, 0);
//ust// wake_up_interruptible(&rchan->buf[i]->read_wait);
//ust// }
//ust// }
if (channel->buf[cpu]) {
struct ust_buffer *buf = channel->buf[cpu];
- ltt_relay_buffer_flush(buf);
-//ust// ltt_relay_wake_writers(ltt_buf);
+ ltt_force_switch(buf, FORCE_FLUSH);
+
/* closing the pipe tells the consumer the buffer is finished */
-
- //result = write(ltt_buf->data_ready_fd_write, "D", 1);
- //if(result == -1) {
- // PERROR("write (in ltt_relay_finish_buffer)");
- // ERR("this should never happen!");
- //}
close(buf->data_ready_fd_write);
}
}
-static void ltt_relay_finish_channel(struct ust_channel *channel)
+static void finish_channel(struct ust_channel *channel)
{
unsigned int i;
}
}
-static void ltt_relay_remove_channel(struct ust_channel *channel)
-{
- ust_buffers_channel_close(channel);
- kref_put(&channel->kref, ltt_relay_release_channel);
-}
-
-//ust// /*
-//ust// * Returns :
-//ust// * 0 if ok
-//ust// * !0 if execution must be aborted.
-//ust// */
-//ust// static inline int ltt_relay_try_reserve(
-//ust// struct ust_channel *channel, struct ust_buffer *buf,
-//ust// struct ltt_reserve_switch_offsets *offsets, size_t data_size,
-//ust// u64 *tsc, unsigned int *rflags, int largest_align)
-//ust// {
-//ust// offsets->begin = local_read(&buf->offset);
-//ust// offsets->old = offsets->begin;
-//ust// offsets->begin_switch = 0;
-//ust// offsets->end_switch_current = 0;
-//ust// offsets->end_switch_old = 0;
-//ust//
-//ust// *tsc = trace_clock_read64();
-//ust// if (last_tsc_overflow(buf, *tsc))
-//ust// *rflags = LTT_RFLAG_ID_SIZE_TSC;
-//ust//
-//ust// if (SUBBUF_OFFSET(offsets->begin, buf->chan) == 0) {
-//ust// offsets->begin_switch = 1; /* For offsets->begin */
-//ust// } else {
-//ust// offsets->size = ust_get_header_size(channel,
-//ust// offsets->begin, data_size,
-//ust// &offsets->before_hdr_pad, *rflags);
-//ust// offsets->size += ltt_align(offsets->begin + offsets->size,
-//ust// largest_align)
-//ust// + data_size;
-//ust// if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
-//ust// > buf->chan->subbuf_size) {
-//ust// offsets->end_switch_old = 1; /* For offsets->old */
-//ust// offsets->begin_switch = 1; /* For offsets->begin */
-//ust// }
-//ust// }
-//ust// if (offsets->begin_switch) {
-//ust// long subbuf_index;
-//ust//
-//ust// if (offsets->end_switch_old)
-//ust// offsets->begin = SUBBUF_ALIGN(offsets->begin,
-//ust// buf->chan);
-//ust// offsets->begin = offsets->begin + ltt_subbuffer_header_size();
-//ust// /* Test new buffer integrity */
-//ust// subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
-//ust// offsets->reserve_commit_diff =
-//ust// (BUFFER_TRUNC(offsets->begin, buf->chan)
-//ust// >> channel->n_subbufs_order)
-//ust// - (local_read(&buf->commit_count[subbuf_index])
-//ust// & channel->commit_count_mask);
-//ust// if (offsets->reserve_commit_diff == 0) {
-//ust// long consumed;
-//ust//
-//ust// consumed = atomic_long_read(&buf->consumed);
-//ust//
-//ust// /* Next buffer not corrupted. */
-//ust// if (!channel->overwrite &&
-//ust// (SUBBUF_TRUNC(offsets->begin, buf->chan)
-//ust// - SUBBUF_TRUNC(consumed, buf->chan))
-//ust// >= channel->alloc_size) {
-//ust//
-//ust// long consumed_idx = SUBBUF_INDEX(consumed, buf->chan);
-//ust// long commit_count = local_read(&buf->commit_count[consumed_idx]);
-//ust// if(((commit_count - buf->chan->subbuf_size) & channel->commit_count_mask) - (BUFFER_TRUNC(consumed, buf->chan) >> channel->n_subbufs_order) != 0) {
-//ust// WARN("Event dropped. Caused by non-committed event.");
-//ust// }
-//ust// else {
-//ust// WARN("Event dropped. Caused by non-consumed buffer.");
-//ust// }
-//ust// /*
-//ust// * We do not overwrite non consumed buffers
-//ust// * and we are full : event is lost.
-//ust// */
-//ust// local_inc(&buf->events_lost);
-//ust// return -1;
-//ust// } else {
-//ust// /*
-//ust// * next buffer not corrupted, we are either in
-//ust// * overwrite mode or the buffer is not full.
-//ust// * It's safe to write in this new subbuffer.
-//ust// */
-//ust// }
-//ust// } else {
-//ust// /*
-//ust// * Next subbuffer corrupted. Force pushing reader even
-//ust// * in normal mode. It's safe to write in this new
-//ust// * subbuffer.
-//ust// */
-//ust// }
-//ust// offsets->size = ust_get_header_size(channel,
-//ust// offsets->begin, data_size,
-//ust// &offsets->before_hdr_pad, *rflags);
-//ust// offsets->size += ltt_align(offsets->begin + offsets->size,
-//ust// largest_align)
-//ust// + data_size;
-//ust// if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
-//ust// > buf->chan->subbuf_size) {
-//ust// /*
-//ust// * Event too big for subbuffers, report error, don't
-//ust// * complete the sub-buffer switch.
-//ust// */
-//ust// local_inc(&buf->events_lost);
-//ust// return -1;
-//ust// } else {
-//ust// /*
-//ust// * We just made a successful buffer switch and the event
-//ust// * fits in the new subbuffer. Let's write.
-//ust// */
-//ust// }
-//ust// } else {
-//ust// /*
-//ust// * Event fits in the current buffer and we are not on a switch
-//ust// * boundary. It's safe to write.
-//ust// */
-//ust// }
-//ust// offsets->end = offsets->begin + offsets->size;
-//ust//
-//ust// if ((SUBBUF_OFFSET(offsets->end, buf->chan)) == 0) {
-//ust// /*
-//ust// * The offset_end will fall at the very beginning of the next
-//ust// * subbuffer.
-//ust// */
-//ust// offsets->end_switch_current = 1; /* For offsets->begin */
-//ust// }
-//ust// return 0;
-//ust// }
-//ust//
-//ust// /*
-//ust// * Returns :
-//ust// * 0 if ok
-//ust// * !0 if execution must be aborted.
-//ust// */
-//ust// static inline int ltt_relay_try_switch(
-//ust// enum force_switch_mode mode,
-//ust// struct ust_channel *channel,
-//ust// struct ust_buffer *buf,
-//ust// struct ltt_reserve_switch_offsets *offsets,
-//ust// u64 *tsc)
-//ust// {
-//ust// long subbuf_index;
-//ust//
-//ust// offsets->begin = local_read(&buf->offset);
-//ust// offsets->old = offsets->begin;
-//ust// offsets->begin_switch = 0;
-//ust// offsets->end_switch_old = 0;
-//ust//
-//ust// *tsc = trace_clock_read64();
-//ust//
-//ust// if (SUBBUF_OFFSET(offsets->begin, buf->chan) != 0) {
-//ust// offsets->begin = SUBBUF_ALIGN(offsets->begin, buf->chan);
-//ust// offsets->end_switch_old = 1;
-//ust// } else {
-//ust// /* we do not have to switch : buffer is empty */
-//ust// return -1;
-//ust// }
-//ust// if (mode == FORCE_ACTIVE)
-//ust// offsets->begin += ltt_subbuffer_header_size();
-//ust// /*
-//ust// * Always begin_switch in FORCE_ACTIVE mode.
-//ust// * Test new buffer integrity
-//ust// */
-//ust// subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
-//ust// offsets->reserve_commit_diff =
-//ust// (BUFFER_TRUNC(offsets->begin, buf->chan)
-//ust// >> channel->n_subbufs_order)
-//ust// - (local_read(&buf->commit_count[subbuf_index])
-//ust// & channel->commit_count_mask);
-//ust// if (offsets->reserve_commit_diff == 0) {
-//ust// /* Next buffer not corrupted. */
-//ust// if (mode == FORCE_ACTIVE
-//ust// && !channel->overwrite
-//ust// && offsets->begin - atomic_long_read(&buf->consumed)
-//ust// >= channel->alloc_size) {
-//ust// /*
-//ust// * We do not overwrite non consumed buffers and we are
-//ust// * full : ignore switch while tracing is active.
-//ust// */
-//ust// return -1;
-//ust// }
-//ust// } else {
-//ust// /*
-//ust// * Next subbuffer corrupted. Force pushing reader even in normal
-//ust// * mode
-//ust// */
-//ust// }
-//ust// offsets->end = offsets->begin;
-//ust// return 0;
-//ust// }
-//ust//
-//ust// static inline void ltt_reserve_push_reader(
-//ust// struct ust_channel *channel,
-//ust// struct ust_buffer *buf,
-//ust// struct ltt_reserve_switch_offsets *offsets)
-//ust// {
-//ust// long consumed_old, consumed_new;
-//ust//
-//ust// do {
-//ust// consumed_old = atomic_long_read(&buf->consumed);
-//ust// /*
-//ust// * If buffer is in overwrite mode, push the reader consumed
-//ust// * count if the write position has reached it and we are not
-//ust// * at the first iteration (don't push the reader farther than
-//ust// * the writer). This operation can be done concurrently by many
-//ust// * writers in the same buffer, the writer being at the farthest
-//ust// * write position sub-buffer index in the buffer being the one
-//ust// * which will win this loop.
-//ust// * If the buffer is not in overwrite mode, pushing the reader
-//ust// * only happens if a sub-buffer is corrupted.
-//ust// */
-//ust// if ((SUBBUF_TRUNC(offsets->end-1, buf->chan)
-//ust// - SUBBUF_TRUNC(consumed_old, buf->chan))
-//ust// >= channel->alloc_size)
-//ust// consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
-//ust// else {
-//ust// consumed_new = consumed_old;
-//ust// break;
-//ust// }
-//ust// } while (atomic_long_cmpxchg(&buf->consumed, consumed_old,
-//ust// consumed_new) != consumed_old);
-//ust//
-//ust// if (consumed_old != consumed_new) {
-//ust// /*
-//ust// * Reader pushed : we are the winner of the push, we can
-//ust// * therefore reequilibrate reserve and commit. Atomic increment
-//ust// * of the commit count permits other writers to play around
-//ust// * with this variable before us. We keep track of
-//ust// * corrupted_subbuffers even in overwrite mode :
-//ust// * we never want to write over a non completely committed
-//ust// * sub-buffer : possible causes : the buffer size is too low
-//ust// * compared to the unordered data input, or there is a writer
-//ust// * that died between the reserve and the commit.
-//ust// */
-//ust// if (offsets->reserve_commit_diff) {
-//ust// /*
-//ust// * We have to alter the sub-buffer commit count.
-//ust// * We do not deliver the previous subbuffer, given it
-//ust// * was either corrupted or not consumed (overwrite
-//ust// * mode).
-//ust// */
-//ust// local_add(offsets->reserve_commit_diff,
-//ust// &buf->commit_count[
-//ust// SUBBUF_INDEX(offsets->begin,
-//ust// buf->chan)]);
-//ust// if (!channel->overwrite
-//ust// || offsets->reserve_commit_diff
-//ust// != channel->subbuf_size) {
-//ust// /*
-//ust// * The reserve commit diff was not subbuf_size :
-//ust// * it means the subbuffer was partly written to
-//ust// * and is therefore corrupted. If it is multiple
-//ust// * of subbuffer size and we are in flight
-//ust// * recorder mode, we are skipping over a whole
-//ust// * subbuffer.
-//ust// */
-//ust// local_inc(&buf->corrupted_subbuffers);
-//ust// }
-//ust// }
-//ust// }
-//ust// }
-//ust//
-//ust// /**
-//ust// * ltt_relay_reserve_slot - Atomic slot reservation in a LTTng buffer.
-//ust// * @trace: the trace structure to log to.
-//ust// * @ltt_channel: channel structure
-//ust// * @transport_data: data structure specific to ltt relay
-//ust// * @data_size: size of the variable length data to log.
-//ust// * @slot_size: pointer to total size of the slot (out)
-//ust// * @buf_offset : pointer to reserved buffer offset (out)
-//ust// * @tsc: pointer to the tsc at the slot reservation (out)
-//ust// * @cpu: cpuid
-//ust// *
-//ust// * Return : -ENOSPC if not enough space, else returns 0.
-//ust// * It will take care of sub-buffer switching.
-//ust// */
-//ust// static notrace int ltt_relay_reserve_slot(struct ust_trace *trace,
-//ust// struct ust_channel *channel, void **transport_data,
-//ust// size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
-//ust// unsigned int *rflags, int largest_align, int cpu)
-//ust// {
-//ust// struct ust_buffer *buf = *transport_data = channel->buf[cpu];
-//ust// struct ltt_reserve_switch_offsets offsets;
-//ust//
-//ust// offsets.reserve_commit_diff = 0;
-//ust// offsets.size = 0;
-//ust//
-//ust// /*
-//ust// * Perform retryable operations.
-//ust// */
-//ust// if (ltt_nesting > 4) {
-//ust// local_inc(&buf->events_lost);
-//ust// return -EPERM;
-//ust// }
-//ust// do {
-//ust// if (ltt_relay_try_reserve(channel, buf, &offsets, data_size, tsc, rflags,
-//ust// largest_align))
-//ust// return -ENOSPC;
-//ust// } while (local_cmpxchg(&buf->offset, offsets.old,
-//ust// offsets.end) != offsets.old);
-//ust//
-//ust// /*
-//ust// * Atomically update last_tsc. This update races against concurrent
-//ust// * atomic updates, but the race will always cause supplementary full TSC
-//ust// * events, never the opposite (missing a full TSC event when it would be
-//ust// * needed).
-//ust// */
-//ust// save_last_tsc(buf, *tsc);
-//ust//
-//ust// /*
-//ust// * Push the reader if necessary
-//ust// */
-//ust// ltt_reserve_push_reader(channel, buf, &offsets);
-//ust//
-//ust// /*
-//ust// * Switch old subbuffer if needed.
-//ust// */
-//ust// if (offsets.end_switch_old)
-//ust// ltt_reserve_switch_old_subbuf(channel, buf, &offsets, tsc);
-//ust//
-//ust// /*
-//ust// * Populate new subbuffer.
-//ust// */
-//ust// if (offsets.begin_switch)
-//ust// ltt_reserve_switch_new_subbuf(channel, buf, &offsets, tsc);
-//ust//
-//ust// if (offsets.end_switch_current)
-//ust// ltt_reserve_end_switch_current(channel, buf, &offsets, tsc);
-//ust//
-//ust// *slot_size = offsets.size;
-//ust// *buf_offset = offsets.begin + offsets.before_hdr_pad;
-//ust// return 0;
-//ust// }
-//ust//
-//ust// /*
-//ust// * Force a sub-buffer switch for a per-cpu buffer. This operation is
-//ust// * completely reentrant : can be called while tracing is active with
-//ust// * absolutely no lock held.
-//ust// *
-//ust// * Note, however, that as a local_cmpxchg is used for some atomic
-//ust// * operations, this function must be called from the CPU which owns the buffer
-//ust// * for a ACTIVE flush.
-//ust// */
-//ust// static notrace void ltt_force_switch(struct ust_buffer *buf,
-//ust// enum force_switch_mode mode)
-//ust// {
-//ust// struct ust_channel *channel = buf->chan;
-//ust// struct ltt_reserve_switch_offsets offsets;
-//ust// u64 tsc;
-//ust//
-//ust// offsets.reserve_commit_diff = 0;
-//ust// offsets.size = 0;
-//ust//
-//ust// /*
-//ust// * Perform retryable operations.
-//ust// */
-//ust// do {
-//ust// if (ltt_relay_try_switch(mode, channel, buf, &offsets, &tsc))
-//ust// return;
-//ust// } while (local_cmpxchg(&buf->offset, offsets.old,
-//ust// offsets.end) != offsets.old);
-//ust//
-//ust// /*
-//ust// * Atomically update last_tsc. This update races against concurrent
-//ust// * atomic updates, but the race will always cause supplementary full TSC
-//ust// * events, never the opposite (missing a full TSC event when it would be
-//ust// * needed).
-//ust// */
-//ust// save_last_tsc(buf, tsc);
-//ust//
-//ust// /*
-//ust// * Push the reader if necessary
-//ust// */
-//ust// if (mode == FORCE_ACTIVE)
-//ust// ltt_reserve_push_reader(channel, buf, &offsets);
-//ust//
-//ust// /*
-//ust// * Switch old subbuffer if needed.
-//ust// */
-//ust// if (offsets.end_switch_old)
-//ust// ltt_reserve_switch_old_subbuf(channel, buf, &offsets, &tsc);
-//ust//
-//ust// /*
-//ust// * Populate new subbuffer.
-//ust// */
-//ust// if (mode == FORCE_ACTIVE)
-//ust// ltt_reserve_switch_new_subbuf(channel, buf, &offsets, &tsc);
-//ust// }
/*
* ltt_reserve_switch_old_subbuf: switch old subbuffer
/*
* Must write slot data before incrementing commit count.
- * This compiler barrier is upgraded into a smp_wmb() by the IPI
- * sent by get_subbuf() when it does its smp_rmb().
+ * This compiler barrier is upgraded into a cmm_smp_wmb() by the IPI
+ * sent by get_subbuf() when it does its cmm_smp_rmb().
*/
- barrier();
- local_add(padding_size,
- &buf->commit_count[oldidx].cc);
- commit_count = local_read(&buf->commit_count[oldidx].cc);
+ cmm_smp_wmb();
+ uatomic_add(&buf->commit_count[oldidx].cc, padding_size);
+ commit_count = uatomic_read(&buf->commit_count[oldidx].cc);
ltt_check_deliver(chan, buf, offsets->old - 1, commit_count, oldidx);
- ltt_write_commit_counter(buf, oldidx,
+ ltt_write_commit_counter(chan, buf, oldidx,
offsets->old, commit_count, padding_size);
}
/*
* Must write slot data before incrementing commit count.
- * This compiler barrier is upgraded into a smp_wmb() by the IPI
- * sent by get_subbuf() when it does its smp_rmb().
+ * This compiler barrier is upgraded into a cmm_smp_wmb() by the IPI
+ * sent by get_subbuf() when it does its cmm_smp_rmb().
*/
- barrier();
- local_add(ltt_subbuffer_header_size(),
- &buf->commit_count[beginidx].cc);
- commit_count = local_read(&buf->commit_count[beginidx].cc);
+ cmm_smp_wmb();
+ uatomic_add(&buf->commit_count[beginidx].cc, ltt_subbuffer_header_size());
+ commit_count = uatomic_read(&buf->commit_count[beginidx].cc);
/* Check if the written buffer has to be delivered */
ltt_check_deliver(chan, buf, offsets->begin, commit_count, beginidx);
- ltt_write_commit_counter(buf, beginidx,
+ ltt_write_commit_counter(chan, buf, beginidx,
offsets->begin, commit_count, ltt_subbuffer_header_size());
}
/*
* Must write slot data before incrementing commit count.
- * This compiler barrier is upgraded into a smp_wmb() by the IPI
- * sent by get_subbuf() when it does its smp_rmb().
+ * This compiler barrier is upgraded into a cmm_smp_wmb() by the IPI
+ * sent by get_subbuf() when it does its cmm_smp_rmb().
*/
- barrier();
- local_add(padding_size,
- &buf->commit_count[endidx].cc);
- commit_count = local_read(&buf->commit_count[endidx].cc);
+ cmm_smp_wmb();
+ uatomic_add(&buf->commit_count[endidx].cc, padding_size);
+ commit_count = uatomic_read(&buf->commit_count[endidx].cc);
ltt_check_deliver(chan, buf,
offsets->end - 1, commit_count, endidx);
- ltt_write_commit_counter(buf, endidx,
+ ltt_write_commit_counter(chan, buf, endidx,
offsets->end, commit_count, padding_size);
}
long subbuf_index;
long reserve_commit_diff;
- offsets->begin = local_read(&buf->offset);
+ offsets->begin = uatomic_read(&buf->offset);
offsets->old = offsets->begin;
offsets->begin_switch = 0;
offsets->end_switch_old = 0;
reserve_commit_diff =
(BUFFER_TRUNC(offsets->begin, buf->chan)
>> chan->n_subbufs_order)
- - (local_read(&buf->commit_count[subbuf_index].cc_sb)
+ - (uatomic_read(&buf->commit_count[subbuf_index].cc_sb)
& chan->commit_count_mask);
if (reserve_commit_diff == 0) {
/* Next buffer not corrupted. */
if (mode == FORCE_ACTIVE
&& !chan->overwrite
- && offsets->begin - atomic_long_read(&buf->consumed)
+ && offsets->begin - uatomic_read(&buf->consumed)
>= chan->alloc_size) {
/*
* We do not overwrite non consumed buffers and we are
* Force a sub-buffer switch for a per-cpu buffer. This operation is
* completely reentrant : can be called while tracing is active with
* absolutely no lock held.
- *
- * Note, however, that as a local_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
*/
void ltt_force_switch_lockless_slow(struct ust_buffer *buf,
enum force_switch_mode mode)
offsets.size = 0;
+ DBG("Switching (forced) %s_%d", chan->channel_name, buf->cpu);
/*
* Perform retryable operations.
*/
if (ltt_relay_try_switch_slow(mode, chan, buf,
&offsets, &tsc))
return;
- } while (local_cmpxchg(&buf->offset, offsets.old,
+ } while (uatomic_cmpxchg(&buf->offset, offsets.old,
offsets.end) != offsets.old);
/*
{
long reserve_commit_diff;
- offsets->begin = local_read(&buf->offset);
+ offsets->begin = uatomic_read(&buf->offset);
offsets->old = offsets->begin;
offsets->begin_switch = 0;
offsets->end_switch_current = 0;
reserve_commit_diff =
(BUFFER_TRUNC(offsets->begin, buf->chan)
>> chan->n_subbufs_order)
- - (local_read(&buf->commit_count[subbuf_index].cc_sb)
+ - (uatomic_read(&buf->commit_count[subbuf_index].cc_sb)
& chan->commit_count_mask);
if (likely(reserve_commit_diff == 0)) {
/* Next buffer not corrupted. */
if (unlikely(!chan->overwrite &&
(SUBBUF_TRUNC(offsets->begin, buf->chan)
- - SUBBUF_TRUNC(atomic_long_read(
+ - SUBBUF_TRUNC(uatomic_read(
&buf->consumed),
buf->chan))
>= chan->alloc_size)) {
* We do not overwrite non consumed buffers
* and we are full : event is lost.
*/
- local_inc(&buf->events_lost);
+ uatomic_inc(&buf->events_lost);
return -1;
} else {
/*
* overwrite mode. Caused by either a writer OOPS or
* too many nested writes over a reserve/commit pair.
*/
- local_inc(&buf->events_lost);
+ uatomic_inc(&buf->events_lost);
return -1;
}
offsets->size = ust_get_header_size(chan,
* Event too big for subbuffers, report error, don't
* complete the sub-buffer switch.
*/
- local_inc(&buf->events_lost);
+ uatomic_inc(&buf->events_lost);
return -1;
} else {
/*
* Return : -ENOSPC if not enough space, else returns 0.
* It will take care of sub-buffer switching.
*/
-int ltt_reserve_slot_lockless_slow(struct ust_trace *trace,
- struct ust_channel *chan, void **transport_data,
- size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
- unsigned int *rflags, int largest_align, int cpu)
+int ltt_reserve_slot_lockless_slow(struct ust_channel *chan,
+ struct ust_trace *trace, size_t data_size,
+ int largest_align, int cpu,
+ struct ust_buffer **ret_buf,
+ size_t *slot_size, long *buf_offset,
+ u64 *tsc, unsigned int *rflags)
{
- struct ust_buffer *buf = chan->buf[cpu];
+ struct ust_buffer *buf = *ret_buf = chan->buf[cpu];
struct ltt_reserve_switch_offsets offsets;
offsets.size = 0;
if (unlikely(ltt_relay_try_reserve_slow(chan, buf, &offsets,
data_size, tsc, rflags, largest_align)))
return -ENOSPC;
- } while (unlikely(local_cmpxchg(&buf->offset, offsets.old,
+ } while (unlikely(uatomic_cmpxchg(&buf->offset, offsets.old,
offsets.end) != offsets.old));
/*
if (unlikely(offsets.end_switch_old)) {
//ust// ltt_clear_noref_flag(chan, buf, SUBBUF_INDEX(offsets.old - 1, chan));
ltt_reserve_switch_old_subbuf(chan, buf, &offsets, tsc);
+ DBG("Switching %s_%d", chan->channel_name, cpu);
}
/*
static struct ltt_transport ust_relay_transport = {
.name = "ustrelay",
.ops = {
- .create_channel = ust_buffers_create_channel,
- .finish_channel = ltt_relay_finish_channel,
- .remove_channel = ltt_relay_remove_channel,
+ .create_channel = create_channel,
+ .finish_channel = finish_channel,
+ .remove_channel = remove_channel,
.wakeup_channel = ltt_relay_async_wakeup_chan,
},
};
ltt_transport_unregister(&ust_relay_transport);
}
-size_t ltt_write_event_header_slow(struct ust_trace *trace,
- struct ust_channel *channel,
+size_t ltt_write_event_header_slow(struct ust_channel *channel,
struct ust_buffer *buf, long buf_offset,
u16 eID, u32 event_size,
u64 tsc, unsigned int rflags)