Ongoing transition to the generic ring buffer
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sun, 10 Oct 2010 13:47:44 +0000 (09:47 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sun, 10 Oct 2010 13:47:44 +0000 (09:47 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
15 files changed:
Makefile
ltt-channels.c [new file with mode: 0644]
ltt-channels.h [new file with mode: 0644]
ltt-event-header.c
ltt-relay-alloc.c [deleted file]
ltt-relay-lockless.c [deleted file]
ltt-relay-lockless.h [deleted file]
ltt-relay-splice.c [deleted file]
ltt-relay-vfs.c [deleted file]
ltt-relay.h [deleted file]
ltt-ring-buffer-client.c [new file with mode: 0644]
ltt-trace-control.c
ltt-tracer.c
ltt-tracer.h
ltt-type-serializer.h

index 81d81288a48625277a01732326074dd047cd0537..b16c4c6fdb0b90ffea628d176623249693f0a19c 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -10,9 +10,8 @@ obj-m += ltt-tracer.o
 obj-m += ltt-marker-control.o
 
 obj-m += ltt-relay.o
-ltt-relay-objs := ltt-relay-lockless.o ltt-relay-alloc.o ltt-relay-splice.o \
-                 ltt-relay-vfs.o ltt-event-header.o ltt-ascii.o \
-                 ltt-serialize.o ltt-type-serializer.o
+ltt-relay-objs :=  ltt-event-header.o ltt-serialize.o ltt-type-serializer.o
+#ltt-ascii.o
 
 obj-m += ltt-statedump.o
 obj-m += ltt-trace-control.o
diff --git a/ltt-channels.c b/ltt-channels.c
new file mode 100644 (file)
index 0000000..962c81a
--- /dev/null
@@ -0,0 +1,397 @@
+/*
+ * ltt/ltt-channels.c
+ *
+ * (C) Copyright 2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
+ *
+ * LTTng channel management.
+ *
+ * Author:
+ *     Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/module.h>
+#include <linux/mutex.h>
+#include <linux/slab.h>
+#include <linux/vmalloc.h>
+#include "ltt-channels.h"
+
+/*
+ * ltt_channel_mutex may be nested inside the LTT trace mutex.
+ * ltt_channel_mutex mutex may be nested inside markers mutex.
+ */
+static DEFINE_MUTEX(ltt_channel_mutex);
+static LIST_HEAD(ltt_channels);
+/*
+ * Index of next channel in array. Makes sure that as long as a trace channel is
+ * allocated, no array index will be re-used when a channel is freed and then
+ * another channel is allocated. This index is cleared and the array indexeds
+ * get reassigned when the index_kref goes back to 0, which indicates that no
+ * more trace channels are allocated.
+ */
+static unsigned int free_index;
+/* index_kref is protected by both ltt_channel_mutex and lock_markers */
+static struct kref index_kref; /* Keeps track of allocated trace channels */
+
+static struct ltt_channel_setting *lookup_channel(const char *name)
+{
+       struct ltt_channel_setting *iter;
+
+       list_for_each_entry(iter, &ltt_channels, list)
+               if (strcmp(name, iter->name) == 0)
+                       return iter;
+       return NULL;
+}
+
+/*
+ * Must be called when channel refcount falls to 0 _and_ also when the last
+ * trace is freed. This function is responsible for compacting the channel and
+ * event IDs when no users are active.
+ *
+ * Called with lock_markers() and channels mutex held.
+ */
+static void release_channel_setting(struct kref *kref)
+{
+       struct ltt_channel_setting *setting = container_of(kref,
+               struct ltt_channel_setting, kref);
+       struct ltt_channel_setting *iter;
+
+       if (atomic_read(&index_kref.refcount) == 0
+           && atomic_read(&setting->kref.refcount) == 0) {
+               list_del(&setting->list);
+               kfree(setting);
+
+               free_index = 0;
+               list_for_each_entry(iter, &ltt_channels, list) {
+                       iter->index = free_index++;
+                       iter->free_event_id = 0;
+               }
+       }
+}
+
+/*
+ * Perform channel index compaction when the last trace channel is freed.
+ *
+ * Called with lock_markers() and channels mutex held.
+ */
+static void release_trace_channel(struct kref *kref)
+{
+       struct ltt_channel_setting *iter, *n;
+
+       list_for_each_entry_safe(iter, n, &ltt_channels, list)
+               release_channel_setting(&iter->kref);
+       if (atomic_read(&index_kref.refcount) == 0)
+               markers_compact_event_ids();
+}
+
+/*
+ * ltt_channel_trace_ref :  Is there an existing trace session ?
+ *
+ * Must be called with lock_markers() held.
+ */
+int ltt_channels_trace_ref(void)
+{
+       return !!atomic_read(&index_kref.refcount);
+}
+EXPORT_SYMBOL_GPL(ltt_channels_trace_ref);
+
+/**
+ * ltt_channels_register - Register a trace channel.
+ * @name: channel name
+ *
+ * Uses refcounting.
+ */
+int ltt_channels_register(const char *name)
+{
+       struct ltt_channel_setting *setting;
+       int ret = 0;
+
+       mutex_lock(&ltt_channel_mutex);
+       setting = lookup_channel(name);
+       if (setting) {
+               if (atomic_read(&setting->kref.refcount) == 0)
+                       goto init_kref;
+               else {
+                       kref_get(&setting->kref);
+                       goto end;
+               }
+       }
+       setting = kzalloc(sizeof(*setting), GFP_KERNEL);
+       if (!setting) {
+               ret = -ENOMEM;
+               goto end;
+       }
+       list_add(&setting->list, &ltt_channels);
+       strncpy(setting->name, name, PATH_MAX-1);
+       setting->index = free_index++;
+init_kref:
+       kref_init(&setting->kref);
+end:
+       mutex_unlock(&ltt_channel_mutex);
+       return ret;
+}
+EXPORT_SYMBOL_GPL(ltt_channels_register);
+
+/**
+ * ltt_channels_unregister - Unregister a trace channel.
+ * @name: channel name
+ * @compacting: performing compaction
+ *
+ * Must be called with markers mutex held.
+ */
+int ltt_channels_unregister(const char *name, int compacting)
+{
+       struct ltt_channel_setting *setting;
+       int ret = 0;
+
+       if (!compacting)
+               mutex_lock(&ltt_channel_mutex);
+       setting = lookup_channel(name);
+       if (!setting || atomic_read(&setting->kref.refcount) == 0) {
+               ret = -ENOENT;
+               goto end;
+       }
+       kref_put(&setting->kref, release_channel_setting);
+       if (!compacting && atomic_read(&index_kref.refcount) == 0)
+                       markers_compact_event_ids();
+end:
+       if (!compacting)
+               mutex_unlock(&ltt_channel_mutex);
+       return ret;
+}
+EXPORT_SYMBOL_GPL(ltt_channels_unregister);
+
+/**
+ * ltt_channels_set_default - Set channel default behavior.
+ * @name: default channel name
+ * @sb_size: size of the subbuffers
+ * @n_sb: number of subbuffers
+ */
+int ltt_channels_set_default(const char *name,
+                            unsigned int sb_size,
+                            unsigned int n_sb)
+{
+       struct ltt_channel_setting *setting;
+       int ret = 0;
+
+       mutex_lock(&ltt_channel_mutex);
+       setting = lookup_channel(name);
+       if (!setting || atomic_read(&setting->kref.refcount) == 0) {
+               ret = -ENOENT;
+               goto end;
+       }
+       setting->sb_size = sb_size;
+       setting->n_sb = n_sb;
+end:
+       mutex_unlock(&ltt_channel_mutex);
+       return ret;
+}
+EXPORT_SYMBOL_GPL(ltt_channels_set_default);
+
+/**
+ * ltt_channels_get_name_from_index - get channel name from channel index
+ * @index: channel index
+ *
+ * Allows to lookup the channel name given its index. Done to keep the name
+ * information outside of each trace channel instance.
+ */
+const char *ltt_channels_get_name_from_index(unsigned int index)
+{
+       struct ltt_channel_setting *iter;
+
+       list_for_each_entry(iter, &ltt_channels, list)
+               if (iter->index == index && atomic_read(&iter->kref.refcount))
+                       return iter->name;
+       return NULL;
+}
+EXPORT_SYMBOL_GPL(ltt_channels_get_name_from_index);
+
+static struct ltt_channel_setting *
+ltt_channels_get_setting_from_name(const char *name)
+{
+       struct ltt_channel_setting *iter;
+
+       list_for_each_entry(iter, &ltt_channels, list)
+               if (!strcmp(iter->name, name)
+                   && atomic_read(&iter->kref.refcount))
+                       return iter;
+       return NULL;
+}
+
+/**
+ * ltt_channels_get_index_from_name - get channel index from channel name
+ * @name: channel name
+ *
+ * Allows to lookup the channel index given its name. Done to keep the name
+ * information outside of each trace channel instance.
+ * Returns -1 if not found.
+ */
+int ltt_channels_get_index_from_name(const char *name)
+{
+       struct ltt_channel_setting *setting;
+
+       setting = ltt_channels_get_setting_from_name(name);
+       if (setting)
+               return setting->index;
+       else
+               return -1;
+}
+EXPORT_SYMBOL_GPL(ltt_channels_get_index_from_name);
+
+/**
+ * ltt_channels_trace_alloc - Allocate channel structures for a trace
+ *
+ * Use the current channel list to allocate the channels for a trace.
+ * Called with trace lock held. Does not perform the trace buffer allocation,
+ * because we must let the user overwrite specific channel sizes.
+ */
+int ltt_channels_trace_alloc(struct ltt_trace *trace, int overwrite)
+{
+       struct channel **chan = NULL;
+       struct ltt_channel_setting *chans, *iter;
+       int ret = 0;
+
+       lock_markers();
+       mutex_lock(&ltt_channel_mutex);
+       if (!free_index)
+               goto end;
+       if (!atomic_read(&index_kref.refcount))
+               kref_init(&index_kref);
+       else
+               kref_get(&index_kref);
+       trace->nr_channels = free_index;
+       chan = kzalloc(sizeof(struct channel *) * free_index, GFP_KERNEL);
+       if (!chan)
+               goto end;
+       chans = kzalloc(sizeof(struct ltt_channel_setting) * free_index,
+                       GFP_KERNEL);
+       if (!chan_settings)
+               goto free_chan;
+       list_for_each_entry(iter, &ltt_channels, list) {
+               if (!atomic_read(&iter->kref.refcount))
+                       continue;
+               chans[iter->index].sb_size = iter->sb_size;
+               chans[iter->index].n_sb = iter->n_sb;
+               chans[iter->index].overwrite = overwrite;
+               strncpy(chans[iter->index].filename, iter->name,
+                       NAME_MAX - 1);
+               chans[iter->index].switch_timer_interval = 0;
+               chans[iter->index].read_timer_interval = LTT_READ_TIMER_INTERVAL;
+       }
+       trace->channels = chan;
+       trace->settings = chans;
+end:
+       mutex_unlock(&ltt_channel_mutex);
+       unlock_markers();
+       return ret;
+
+free_chan:
+       kfree(chan);
+       ret = -ENOMEM;
+       goto end;
+}
+EXPORT_SYMBOL_GPL(ltt_channels_trace_alloc);
+
+/**
+ * ltt_channels_trace_free - Free one trace's channels
+ * @channels: channels to free
+ *
+ * Called with trace lock held. The actual channel buffers must be freed before
+ * this function is called.
+ */
+void ltt_channels_trace_free(struct ltt_trace *trace)
+{
+       lock_markers();
+       mutex_lock(&ltt_channel_mutex);
+       kfree(trace->settings);
+       kfree(trace->channels);
+       kref_put(&index_kref, release_trace_channel);
+       mutex_unlock(&ltt_channel_mutex);
+       unlock_markers();
+       marker_update_probes();
+}
+EXPORT_SYMBOL_GPL(ltt_channels_trace_free);
+
+/**
+ * ltt_channels_trace_set_timer - set switch timer
+ * @channel: channel
+ * @interval: interval of timer interrupt, in jiffies. 0 inhibits timer.
+ */
+
+void ltt_channels_trace_set_timer(struct ltt_chan *chan,
+                                 unsigned long interval)
+{
+       chan->switch_timer_interval = interval;
+}
+EXPORT_SYMBOL_GPL(ltt_channels_trace_set_timer);
+
+/**
+ * _ltt_channels_get_event_id - get next event ID for a marker
+ * @channel: channel name
+ * @name: event name
+ *
+ * Returns a unique event ID (for this channel) or < 0 on error.
+ * Must be called with channels mutex held.
+ */
+int _ltt_channels_get_event_id(const char *channel, const char *name)
+{
+       struct ltt_channel_setting *setting;
+       int ret;
+
+       setting = ltt_channels_get_setting_from_name(channel);
+       if (!setting) {
+               ret = -ENOENT;
+               goto end;
+       }
+       if (strcmp(channel, "metadata") == 0) {
+               if (strcmp(name, "core_marker_id") == 0)
+                       ret = 0;
+               else if (strcmp(name, "core_marker_format") == 0)
+                       ret = 1;
+               else
+                       ret = -ENOENT;
+               goto end;
+       }
+       if (setting->free_event_id == EVENTS_PER_CHANNEL - 1) {
+               ret = -ENOSPC;
+               goto end;
+       }
+       ret = setting->free_event_id++;
+end:
+       return ret;
+}
+
+/**
+ * ltt_channels_get_event_id - get next event ID for a marker
+ * @channel: channel name
+ * @name: event name
+ *
+ * Returns a unique event ID (for this channel) or < 0 on error.
+ */
+int ltt_channels_get_event_id(const char *channel, const char *name)
+{
+       int ret;
+
+       mutex_lock(&ltt_channel_mutex);
+       ret = _ltt_channels_get_event_id(channel, name);
+       mutex_unlock(&ltt_channel_mutex);
+       return ret;
+}
+
+/**
+ * ltt_channels_reset_event_ids - reset event IDs at compaction
+ *
+ * Called with lock marker and channel mutex held.
+ */
+void _ltt_channels_reset_event_ids(void)
+{
+       struct ltt_channel_setting *iter;
+
+       list_for_each_entry(iter, &ltt_channels, list)
+               iter->free_event_id = 0;
+}
+
+MODULE_LICENSE("GPL and additional rights");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Channel Management");
diff --git a/ltt-channels.h b/ltt-channels.h
new file mode 100644 (file)
index 0000000..9eb604b
--- /dev/null
@@ -0,0 +1,83 @@
+#ifndef _LTT_CHANNELS_H
+#define _LTT_CHANNELS_H
+
+/*
+ * Copyright (C) 2008 Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
+ *
+ * Dynamic tracer channel allocation.
+
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/limits.h>
+#include <linux/kref.h>
+#include <linux/list.h>
+#include <linux/timer.h>
+#include <linux/ltt-core.h>
+
+#define EVENTS_PER_CHANNEL             65536
+
+#define LTT_READ_TIMER_INTERVAL                10000 /* us */
+
+/*
+ * Forward declaration of locking-specific per-cpu buffer structure.
+ */
+struct ltt_trace;
+struct ltt_serialize_closure;
+struct ltt_probe_private_data;
+
+/* Serialization callback '%k' */
+typedef size_t (*ltt_serialize_cb)(struct ltt_chanbuf *buf, size_t buf_offset,
+                                  struct ltt_serialize_closure *closure,
+                                  void *serialize_private,
+                                  unsigned int stack_pos_ctx,
+                                  int *largest_align,
+                                  const char *fmt, va_list *args);
+
+struct ltt_probe_private_data {
+       struct ltt_trace *trace;        /*
+                                        * Target trace, for metadata
+                                        * or statedump.
+                                        */
+       ltt_serialize_cb serializer;    /*
+                                        * Serialization function override.
+                                        */
+       void *serialize_private;        /*
+                                        * Private data for serialization
+                                        * functions.
+                                        */
+};
+
+struct ltt_channel_setting {
+       unsigned int sb_size;
+       unsigned int n_sb;
+       int overwrite;
+       unsigned long switch_timer_interval;
+       unsigned long read_timer_interval;
+       struct kref kref;       /* Number of references to structure content */
+       struct list_head list;
+       unsigned int index;     /* index of channel in trace channel array */
+       u16 free_event_id;      /* Next event ID to allocate */
+       char name[PATH_MAX];
+};
+
+int ltt_channels_register(const char *name);
+int ltt_channels_unregister(const char *name, int compacting);
+int ltt_channels_set_default(const char *name,
+                            unsigned int subbuf_size,
+                            unsigned int subbuf_cnt);
+const char *ltt_channels_get_name_from_index(unsigned int index);
+int ltt_channels_get_index_from_name(const char *name);
+int ltt_channels_trace_ref(void);
+struct ltt_chan *ltt_channels_trace_alloc(unsigned int *nr_channels,
+                                         int overwrite, int active);
+void ltt_channels_trace_free(struct ltt_chan *channels,
+                            unsigned int nr_channels);
+void ltt_channels_trace_set_timer(struct ltt_channel_setting *chan,
+                                 unsigned long interval);
+
+int _ltt_channels_get_event_id(const char *channel, const char *name);
+int ltt_channels_get_event_id(const char *channel, const char *name);
+void _ltt_channels_reset_event_ids(void);
+
+#endif /* _LTT_CHANNELS_H */
index 4f049d31e5e841356391de7994f80d967b864fcd..94e29cbc3402492fcb1196951159608694847479 100644 (file)
  */
 
 #include <linux/module.h>
-
 #include "ltt-tracer.h"
-#include "ltt-relay.h"
 
-size_t ltt_write_event_header_slow(struct ltt_chanbuf_alloc *bufa,
-                                  struct ltt_chan_alloc *chana,
-                                  long buf_offset, u16 eID, u32 event_size,
-                                  u64 tsc, unsigned int rflags)
+size_t ltt_write_event_header_slow(const struct lib_ring_buffer_config *config,
+                                  struct lib_ring_buffer_ctx *ctx,
+                                  u16 eID, u32 event_size)
 {
-       struct ltt_event_header header;
+       struct event_header header;
        u16 small_size;
 
        switch (rflags) {
@@ -40,50 +37,31 @@ size_t ltt_write_event_header_slow(struct ltt_chanbuf_alloc *bufa,
        }
 
        header.id_time |= (u32)tsc & LTT_TSC_MASK;
-       ltt_relay_write(bufa, chana, buf_offset, &header, sizeof(header));
-       buf_offset += sizeof(header);
+       lib_ring_buffer_write(config, ctx, &header, sizeof(header));
 
        switch (rflags) {
        case LTT_RFLAG_ID_SIZE_TSC:
                small_size = (u16)min_t(u32, event_size, LTT_MAX_SMALL_SIZE);
-               ltt_relay_write(bufa, chana, buf_offset,
-                       &eID, sizeof(u16));
-               buf_offset += sizeof(u16);
-               ltt_relay_write(bufa, chana, buf_offset,
-                       &small_size, sizeof(u16));
-               buf_offset += sizeof(u16);
-               if (small_size == LTT_MAX_SMALL_SIZE) {
-                       ltt_relay_write(bufa, chana, buf_offset,
-                               &event_size, sizeof(u32));
-                       buf_offset += sizeof(u32);
-               }
-               buf_offset += ltt_align(buf_offset, sizeof(u64));
-               ltt_relay_write(bufa, chana, buf_offset,
-                       &tsc, sizeof(u64));
-               buf_offset += sizeof(u64);
+               lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
+               lib_ring_buffer_write(config, ctx, &small_size, sizeof(u16));
+               if (small_size == LTT_MAX_SMALL_SIZE)
+                       lib_ring_buffer_write(config, ctx, &event_size,
+                                             sizeof(u32));
+               lib_ring_buffer_align_ctx(config, ctx, sizeof(u64));
+               lib_ring_buffer_write(config, ctx, &ctx->tsc, sizeof(u64));
                break;
        case LTT_RFLAG_ID_SIZE:
                small_size = (u16)min_t(u32, event_size, LTT_MAX_SMALL_SIZE);
-               ltt_relay_write(bufa, chana, buf_offset,
-                       &eID, sizeof(u16));
-               buf_offset += sizeof(u16);
-               ltt_relay_write(bufa, chana, buf_offset,
-                       &small_size, sizeof(u16));
-               buf_offset += sizeof(u16);
-               if (small_size == LTT_MAX_SMALL_SIZE) {
-                       ltt_relay_write(bufa, chana, buf_offset,
-                               &event_size, sizeof(u32));
-                       buf_offset += sizeof(u32);
-               }
+               lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
+               lib_ring_buffer_write(config, ctx, &small_size, sizeof(u16));
+               if (small_size == LTT_MAX_SMALL_SIZE)
+                       lib_ring_buffer_write(config, ctx, &event_size,
+                                             sizeof(u32));
                break;
        case LTT_RFLAG_ID:
-               ltt_relay_write(bufa, chana, buf_offset,
-                       &eID, sizeof(u16));
-               buf_offset += sizeof(u16);
+               lib_ring_buffer_write(config, ctx, &eID, sizeof(u16));
                break;
        }
-
-       return buf_offset;
 }
 EXPORT_SYMBOL_GPL(ltt_write_event_header_slow);
 
diff --git a/ltt-relay-alloc.c b/ltt-relay-alloc.c
deleted file mode 100644 (file)
index 6ff552e..0000000
+++ /dev/null
@@ -1,734 +0,0 @@
-/*
- * ltt-relay-alloc.c
- *
- * Copyright (C) 2008,2009 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include <linux/errno.h>
-#include <linux/stddef.h>
-#include <linux/slab.h>
-#include <linux/module.h>
-#include <linux/string.h>
-#include <linux/vmalloc.h>
-#include <linux/mm.h>
-#include <linux/cpu.h>
-#include <linux/bitops.h>
-#include <linux/delay.h>
-
-#include "ltt-relay.h"
-#include "ltt-tracer.h"
-#include "ltt-relay-lockless.h"        /* for cpu hotplug */
-
-/**
- * ltt_chanbuf_allocate - allocate a channel buffer
- * @buf: the buffer struct
- * @size: total size of the buffer
- * @n_sb: number of subbuffers
- * @extra_reader_sb: need extra subbuffer for reader
- */
-static
-int ltt_chanbuf_allocate(struct ltt_chanbuf_alloc *buf, size_t size,
-                        size_t n_sb, int extra_reader_sb)
-{
-       long i, j, n_pages, n_pages_per_sb, page_idx = 0;
-       struct page **pages;
-       void **virt;
-
-       n_pages = size >> PAGE_SHIFT;
-       n_pages_per_sb = n_pages >> get_count_order(n_sb);
-       if (extra_reader_sb)
-               n_pages += n_pages_per_sb;      /* Add pages for reader */
-
-       pages = kmalloc_node(max_t(size_t, sizeof(*pages) * n_pages,
-                                  1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(buf->cpu));
-       if (unlikely(!pages))
-               goto pages_error;
-
-       virt = kmalloc_node(ALIGN(sizeof(*virt) * n_pages,
-                                 1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(buf->cpu));
-       if (unlikely(!virt))
-               goto virt_error;
-
-       for (i = 0; i < n_pages; i++) {
-               pages[i] = alloc_pages_node(cpu_to_node(buf->cpu),
-                       GFP_KERNEL | __GFP_ZERO, 0);
-               if (unlikely(!pages[i]))
-                       goto depopulate;
-               virt[i] = page_address(pages[i]);
-       }
-       buf->nr_pages = n_pages;
-       buf->_pages = pages;
-       buf->_virt = virt;
-
-       /* Allocate write-side page index */
-       buf->buf_wsb = kzalloc_node(max_t(size_t,
-                               sizeof(struct chanbuf_sb) * n_sb,
-                               1 << INTERNODE_CACHE_SHIFT),
-                               GFP_KERNEL, cpu_to_node(buf->cpu));
-       if (unlikely(!buf->buf_wsb))
-               goto depopulate;
-
-       for (i = 0; i < n_sb; i++) {
-               buf->buf_wsb[i].pages =
-                       kzalloc_node(max_t(size_t,
-                               sizeof(struct chanbuf_page) * n_pages_per_sb,
-                               1 << INTERNODE_CACHE_SHIFT),
-                               GFP_KERNEL, cpu_to_node(buf->cpu));
-               if (!buf->buf_wsb[i].pages)
-                       goto free_buf_wsb;
-       }
-
-       if (extra_reader_sb) {
-               /* Allocate read-side page index */
-               buf->buf_rsb.pages =
-                       kzalloc_node(max_t(size_t,
-                               sizeof(struct chanbuf_page) * n_pages_per_sb,
-                               1 << INTERNODE_CACHE_SHIFT),
-                               GFP_KERNEL, cpu_to_node(buf->cpu));
-               if (unlikely(!buf->buf_rsb.pages))
-                       goto free_buf_wsb;
-       } else {
-               buf->buf_rsb.pages = buf->buf_wsb[0].pages;
-       }
-
-       /* Assign pages to write-side page index */
-       for (i = 0; i < n_sb; i++) {
-               for (j = 0; j < n_pages_per_sb; j++) {
-                       WARN_ON(page_idx > n_pages);
-                       buf->buf_wsb[i].pages[j].virt = virt[page_idx];
-                       buf->buf_wsb[i].pages[j].page = pages[page_idx];
-                       page_idx++;
-               }
-               RCHAN_SB_SET_NOREF(buf->buf_wsb[i].pages);
-       }
-
-       if (extra_reader_sb) {
-               for (j = 0; j < n_pages_per_sb; j++) {
-                       WARN_ON(page_idx > n_pages);
-                       buf->buf_rsb.pages[j].virt = virt[page_idx];
-                       buf->buf_rsb.pages[j].page = pages[page_idx];
-                       page_idx++;
-               }
-               RCHAN_SB_SET_NOREF(buf->buf_rsb.pages);
-       }
-
-       /*
-        * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
-        * will not fault.
-        */
-       vmalloc_sync_all();
-       return 0;
-
-free_buf_wsb:
-       for (i = 0; i < n_sb; i++) {
-               RCHAN_SB_CLEAR_NOREF(buf->buf_wsb[i].pages);
-               kfree(buf->buf_wsb[i].pages);
-       }
-       kfree(buf->buf_wsb);
-depopulate:
-       /*
-        * Free all pages from [ i - 1 down to 0 ].
-        * If i = 0, don't free anything.
-        */
-       for (i--; i >= 0; i--)
-               __free_page(pages[i]);
-       kfree(virt);
-virt_error:
-       kfree(pages);
-pages_error:
-       return -ENOMEM;
-}
-
-int ltt_chanbuf_alloc_create(struct ltt_chanbuf_alloc *buf,
-                            struct ltt_chan_alloc *chan, int cpu)
-{
-       int ret = 0;
-
-       ret = ltt_chanbuf_allocate(buf, chan->buf_size, chan->n_sb,
-                                  chan->extra_reader_sb);
-       if (ret)
-               goto end;
-
-       buf->chan = chan;
-       buf->cpu = cpu;
-end:
-       return ret;
-}
-
-void ltt_chanbuf_alloc_free(struct ltt_chanbuf_alloc *buf)
-{
-       struct ltt_chan_alloc *chan = buf->chan;
-       struct page **pages;
-       long i;
-
-       /* Destroy index */
-       if (chan->extra_reader_sb) {
-               RCHAN_SB_CLEAR_NOREF(buf->buf_rsb.pages);
-               kfree(buf->buf_rsb.pages);
-       }
-       for (i = 0; i < chan->n_sb; i++) {
-               RCHAN_SB_CLEAR_NOREF(buf->buf_wsb[i].pages);
-               kfree(buf->buf_wsb[i].pages);
-       }
-       kfree(buf->buf_wsb);
-
-       /* Destroy pages */
-       pages = buf->_pages;
-       for (i = 0; i < buf->nr_pages; i++)
-               __free_page(pages[i]);
-       kfree(buf->_pages);
-       kfree(buf->_virt);
-       buf->allocated = 0;
-}
-
-/**
- *     ltt_relay_hotcpu_callback - CPU hotplug callback
- *     @nb: notifier block
- *     @action: hotplug action to take
- *     @hcpu: CPU number
- *
- *     Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
- */
-static
-int __cpuinit ltt_relay_hotcpu_callback(struct notifier_block *nb,
-                                       unsigned long action,
-                                       void *hcpu)
-{
-       unsigned int cpu = (unsigned long)hcpu;
-       struct ltt_trace *trace;
-       struct ltt_chan *chan;
-       struct ltt_chanbuf *buf;
-       int ret, i;
-
-       switch (action) {
-       case CPU_UP_PREPARE:
-       case CPU_UP_PREPARE_FROZEN:
-               /*
-                * CPU hotplug lock protects trace lock from this callback.
-                */
-               __list_for_each_entry_rcu(trace, &ltt_traces.head, list) {
-                       for (i = 0; i < trace->nr_channels; i++) {
-                               chan = &trace->channels[i];
-                               buf = per_cpu_ptr(chan->a.buf, cpu);
-                               ret = ltt_chanbuf_create(buf, &chan->a, cpu);
-                               if (ret) {
-                                       printk(KERN_ERR
-                                         "ltt_relay_hotcpu_callback: cpu %d "
-                                         "buffer creation failed\n", cpu);
-                                       return NOTIFY_BAD;
-                               }
-
-                       }
-               }
-               break;
-       case CPU_DEAD:
-       case CPU_DEAD_FROZEN:
-               /* No need to do a buffer switch here, because it will happen
-                * when tracing is stopped, or will be done by switch timer CPU
-                * DEAD callback. */
-               break;
-       }
-       return NOTIFY_OK;
-}
-
-/*
- * Must be called with either trace lock or rcu read lock sched held.
- */
-void ltt_chan_for_each_channel(void (*cb) (struct ltt_chanbuf *buf), int cpu)
-{
-       struct ltt_trace *trace;
-       struct ltt_chan *chan;
-       struct ltt_chanbuf *buf;
-       int i;
-
-       __list_for_each_entry_rcu(trace, &ltt_traces.head, list) {
-               for (i = 0; i < trace->nr_channels; i++) {
-                       chan = &trace->channels[i];
-                       if (!chan->active)
-                               continue;
-                       buf = per_cpu_ptr(chan->a.buf, cpu);
-                       cb(buf);
-               }
-       }
-}
-
-/**
- * ltt_chan_create - create a new relay channel
- * @chan: channel
- * @trace: trace
- * @base_filename: base name of files to create
- * @parent: dentry of parent directory, %NULL for root directory
- * @sb_size: size of sub-buffers (> PAGE_SIZE, power of 2)
- * @n_sb: number of sub-buffers (power of 2)
- * @extra_reader_sb: allocate an extra subbuffer for the reader
- * @overwrite: channel is in overwrite mode
- *
- * Returns channel pointer if successful, %NULL otherwise.
- *
- * Creates per-cpu channel buffers using the sizes and attributes
- * specified.  The created channel buffer files will be named
- * base_filename_0...base_filename_N-1.  File permissions will
- * be %S_IRUSR.
- */
-int ltt_chan_alloc_init(struct ltt_chan_alloc *chan, struct ltt_trace *trace,
-                       const char *base_filename,
-                       struct dentry *parent, size_t sb_size,
-                       size_t n_sb, int extra_reader_sb, int overwrite)
-{
-       unsigned int i;
-       int ret;
-
-       if (!base_filename)
-               return -EPERM;
-
-       if (!(sb_size && n_sb))
-               return -EPERM;
-
-       /* Check that the subbuffer size is larger than a page. */
-       WARN_ON_ONCE(sb_size < PAGE_SIZE);
-
-       /*
-        * Make sure the number of subbuffers and subbuffer size are power of 2.
-        */
-       WARN_ON_ONCE(hweight32(sb_size) != 1);
-       WARN_ON(hweight32(n_sb) != 1);
-
-       chan->trace = trace;
-       chan->buf_size = n_sb * sb_size;
-       chan->sb_size = sb_size;
-       chan->sb_size_order = get_count_order(sb_size);
-       chan->n_sb_order = get_count_order(n_sb);
-       chan->extra_reader_sb = extra_reader_sb;
-       chan->n_sb = n_sb;
-       chan->parent = parent;
-       strlcpy(chan->filename, base_filename, NAME_MAX);
-       kref_init(&chan->kref);
-       kref_get(&chan->trace->kref);
-
-       /* Allocating the child structure */
-       chan->buf = alloc_percpu(struct ltt_chanbuf);
-       if (!chan->buf)
-               goto free_chan;
-
-       for_each_online_cpu(i) {
-               ret = ltt_chanbuf_create(per_cpu_ptr(chan->buf, i), chan, i);
-               if (ret)
-                       goto free_bufs;
-       }
-
-       return 0;
-
-free_bufs:
-       for_each_possible_cpu(i) {
-               struct ltt_chanbuf *buf = per_cpu_ptr(chan->buf, i);
-
-               if (!buf->a.allocated)
-                       continue;
-               ltt_chanbuf_remove_file(buf);
-               ltt_chanbuf_free(buf);
-       }
-       free_percpu(chan->buf);
-free_chan:
-       kref_put(&chan->kref, ltt_chan_free);
-       return -ENOMEM;
-}
-
-/**
- * ltt_chan_alloc_remove_files - remove channel files.
- * @chan: the channel
- *
- * Remove all channel files and wait for dentry use counts to become zero.
- */
-void ltt_chan_alloc_remove_files(struct ltt_chan_alloc *chan)
-{
-       unsigned int i;
-       struct dentry *dentry;
-
-       for_each_possible_cpu(i) {
-               struct ltt_chanbuf *buf = per_cpu_ptr(chan->buf, i);
-
-               if (!buf->a.allocated)
-                       continue;
-               dentry = dget(buf->a.dentry);
-               ltt_chanbuf_remove_file(buf);
-               /* TODO: wait / wakeup instead */
-               /*
-                * Wait for every reference to the dentry to be gone,
-                * except us.
-                */
-               while (atomic_read(&dentry->d_count) != 1)
-                       msleep(100);
-               dput(dentry);
-       }
-}
-
-/**
- * ltt_chan_alloc_free - destroy the channel
- * @chan: the channel
- *
- * Destroy all channel buffers and frees the channel.
- */
-void ltt_chan_alloc_free(struct ltt_chan_alloc *chan)
-{
-       unsigned int i;
-
-       for_each_possible_cpu(i) {
-               struct ltt_chanbuf *buf = per_cpu_ptr(chan->buf, i);
-
-               if (!buf->a.allocated)
-                       continue;
-               ltt_chanbuf_free(buf);
-       }
-       free_percpu(chan->buf);
-       kref_put(&chan->trace->kref, ltt_release_trace);
-       wake_up_interruptible(&chan->trace->kref_wq);
-}
-
-/**
- * _ltt_relay_write - write data to a ltt_relay buffer.
- * @bufa : buffer
- * @offset : offset within the buffer
- * @src : source address
- * @len : length to write
- * @pagecpy : page size copied so far
- */
-void _ltt_relay_write(struct ltt_chanbuf_alloc *bufa, size_t offset,
-                     const void *src, size_t len, ssize_t pagecpy)
-{
-       struct ltt_chan_alloc *chana = bufa->chan;
-       size_t sbidx, index;
-       struct chanbuf_page *rpages;
-
-       do {
-               len -= pagecpy;
-               src += pagecpy;
-               offset += pagecpy;
-               sbidx = offset >> chana->sb_size_order;
-               index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-
-               /*
-                * Underlying layer should never ask for writes across
-                * subbuffers.
-                */
-               WARN_ON(offset >= chana->buf_size);
-
-               pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
-               rpages = bufa->buf_wsb[sbidx].pages;
-               WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-               ltt_relay_do_copy(rpages[index].virt + (offset & ~PAGE_MASK),
-                                 src, pagecpy);
-       } while (unlikely(len != pagecpy));
-}
-EXPORT_SYMBOL_GPL(_ltt_relay_write);
-
-/**
- * _ltt_relay_strncpy_fixup - Fix an incomplete string in a ltt_relay buffer.
- * @bufa : buffer
- * @offset : offset within the buffer
- * @len : length to write
- * @copied: string actually copied
- * @terminated: does string end with \0
- *
- * Fills string with "X" if incomplete.
- */
-void _ltt_relay_strncpy_fixup(struct ltt_chanbuf_alloc *bufa, size_t offset,
-                             size_t len, size_t copied, int terminated)
-{
-       struct ltt_chan_alloc *chana = bufa->chan;
-       size_t sbidx, index;
-       ssize_t pagecpy;
-       struct chanbuf_page *rpages;
-
-       if (copied == len) {
-               /*
-                * Deal with non-terminated string.
-                */
-               WARN_ON_ONCE(terminated);
-               offset += copied - 1;
-               sbidx = offset >> chana->sb_size_order;
-               index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-               /*
-                * Underlying layer should never ask for writes across
-                * subbuffers.
-                */
-               WARN_ON(offset >= chana->buf_size);
-               rpages = bufa->buf_wsb[sbidx].pages;
-               WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-               ltt_relay_do_memset(rpages[index].virt + (offset & ~PAGE_MASK),
-                                   '\0', 1);
-               return;
-       }
-
-       /*
-        * Deal with incomplete string.
-        * Overwrite string's \0 with X too.
-        */
-       pagecpy = copied - 1;
-       do {
-               WARN_ON_ONCE(!terminated);
-               len -= pagecpy;
-               offset += pagecpy;
-               sbidx = offset >> chana->sb_size_order;
-               index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-
-               /*
-                * Underlying layer should never ask for writes across
-                * subbuffers.
-                */
-               WARN_ON(offset >= chana->buf_size);
-
-               pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
-               rpages = bufa->buf_wsb[sbidx].pages;
-               WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-               ltt_relay_do_memset(rpages[index].virt + (offset & ~PAGE_MASK),
-                                   'X', pagecpy);
-       } while (unlikely(len != pagecpy));
-       /*
-        * Overwrite last 'X' with '\0'.
-        */
-       offset += pagecpy - 1;
-       sbidx = offset >> chana->sb_size_order;
-       index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-       /*
-        * Underlying layer should never ask for writes across subbuffers.
-        */
-       WARN_ON(offset >= chana->buf_size);
-       rpages = bufa->buf_wsb[sbidx].pages;
-       WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-       ltt_relay_do_memset(rpages[index].virt + (offset & ~PAGE_MASK),
-                           '\0', 1);
-}
-EXPORT_SYMBOL_GPL(_ltt_relay_strncpy_fixup);
-
-/**
- * _ltt_relay_strncpy - copy a string to a ltt_relay buffer.
- * @bufa : buffer
- * @offset : offset within the buffer
- * @src : source address
- * @len : length to write
- * @pagecpy : page size copied so far
- */
-void _ltt_relay_strncpy(struct ltt_chanbuf_alloc *bufa, size_t offset,
-                       const void *src, size_t len, ssize_t pagecpy)
-{
-       struct ltt_chan_alloc *chana = bufa->chan;
-       size_t sbidx, index, copied;
-       struct chanbuf_page *rpages;
-       int terminated;
-
-       do {
-               len -= pagecpy;
-               src += pagecpy;
-               offset += pagecpy;
-               sbidx = offset >> chana->sb_size_order;
-               index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-
-               /*
-                * Underlying layer should never ask for writes across
-                * subbuffers.
-                */
-               WARN_ON(offset >= chana->buf_size);
-
-               pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
-               rpages = bufa->buf_wsb[sbidx].pages;
-               WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-               copied = ltt_relay_do_strncpy(rpages[index].virt
-                                             + (offset & ~PAGE_MASK),
-                                             src, pagecpy, &terminated);
-               if (copied < pagecpy || ((len == pagecpy) && !terminated)) {
-                       _ltt_relay_strncpy_fixup(bufa, offset, len, copied,
-                                                terminated);
-                       break;
-               }
-       } while (unlikely(len != pagecpy));
-}
-EXPORT_SYMBOL_GPL(_ltt_relay_strncpy);
-
-/**
- * ltt_relay_read - read data from ltt_relay_buffer.
- * @bufa : buffer
- * @offset : offset within the buffer
- * @dest : destination address
- * @len : length to write
- *
- * Should be protected by get_subbuf/put_subbuf.
- */
-int ltt_relay_read(struct ltt_chanbuf_alloc *bufa, size_t offset, void *dest,
-                  size_t len)
-{
-       struct ltt_chan_alloc *chana = bufa->chan;
-       size_t index;
-       ssize_t pagecpy, orig_len;
-       struct chanbuf_page *rpages;
-
-       orig_len = len;
-       offset &= chana->buf_size - 1;
-       index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-       if (unlikely(!len))
-               return 0;
-       for (;;) {
-               pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
-               rpages = bufa->buf_rsb.pages;
-               WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-               memcpy(dest, rpages[index].virt + (offset & ~PAGE_MASK),
-                      pagecpy);
-               len -= pagecpy;
-               if (likely(!len))
-                       break;
-               dest += pagecpy;
-               offset += pagecpy;
-               index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-               /*
-                * Underlying layer should never ask for reads across
-                * subbuffers.
-                */
-               WARN_ON(offset >= chana->buf_size);
-       }
-       return orig_len;
-}
-EXPORT_SYMBOL_GPL(ltt_relay_read);
-
-/**
- * ltt_relay_read_cstr - read a C-style string from ltt_relay_buffer.
- * @bufa : buffer
- * @offset : offset within the buffer
- * @dest : destination address
- * @len : destination's length
- *
- * return string's length
- * Should be protected by get_subbuf/put_subbuf.
- */
-int ltt_relay_read_cstr(struct ltt_chanbuf_alloc *bufa, size_t offset,
-                       void *dest, size_t len)
-{
-       struct ltt_chan_alloc *chana = bufa->chan;
-       size_t index;
-       ssize_t pagecpy, pagelen, strpagelen, orig_offset;
-       char *str;
-       struct chanbuf_page *rpages;
-
-       offset &= chana->buf_size - 1;
-       index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-       orig_offset = offset;
-       for (;;) {
-               rpages = bufa->buf_rsb.pages;
-               WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-               str = (char *)rpages[index].virt + (offset & ~PAGE_MASK);
-               pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
-               strpagelen = strnlen(str, pagelen);
-               if (len) {
-                       pagecpy = min_t(size_t, len, strpagelen);
-                       if (dest) {
-                               memcpy(dest, str, pagecpy);
-                               dest += pagecpy;
-                       }
-                       len -= pagecpy;
-               }
-               offset += strpagelen;
-               index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-               if (strpagelen < pagelen)
-                       break;
-               /*
-                * Underlying layer should never ask for reads across
-                * subbuffers.
-                */
-               WARN_ON(offset >= chana->buf_size);
-       }
-       if (dest && len)
-               ((char *)dest)[0] = 0;
-       return offset - orig_offset;
-}
-EXPORT_SYMBOL_GPL(ltt_relay_read_cstr);
-
-/**
- * ltt_relay_read_get_page - Get a whole page to read from
- * @bufa : buffer
- * @offset : offset within the buffer
- *
- * Should be protected by get_subbuf/put_subbuf.
- */
-struct page *ltt_relay_read_get_page(struct ltt_chanbuf_alloc *bufa,
-                                    size_t offset)
-{
-       size_t index;
-       struct chanbuf_page *rpages;
-       struct ltt_chan_alloc *chana = bufa->chan;
-
-       offset &= chana->buf_size - 1;
-       index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-       rpages = bufa->buf_rsb.pages;
-       WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-       return rpages[index].page;
-}
-EXPORT_SYMBOL_GPL(ltt_relay_read_get_page);
-
-/**
- * ltt_relay_read_offset_address - get address of a location within the buffer
- * @bufa : buffer
- * @offset : offset within the buffer.
- *
- * Return the address where a given offset is located (for read).
- * Should be used to get the current subbuffer header pointer. Given we know
- * it's never on a page boundary, it's safe to write directly to this address,
- * as long as the write is never bigger than a page size.
- */
-void *ltt_relay_read_offset_address(struct ltt_chanbuf_alloc *bufa,
-                                   size_t offset)
-{
-       size_t index;
-       struct chanbuf_page *rpages;
-       struct ltt_chan_alloc *chana = bufa->chan;
-
-       offset &= chana->buf_size - 1;
-       index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-       rpages = bufa->buf_rsb.pages;
-       WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-       return rpages[index].virt + (offset & ~PAGE_MASK);
-}
-EXPORT_SYMBOL_GPL(ltt_relay_read_offset_address);
-
-/**
- * ltt_relay_offset_address - get address of a location within the buffer
- * @bufa : buffer
- * @offset : offset within the buffer.
- *
- * Return the address where a given offset is located.
- * Should be used to get the current subbuffer header pointer. Given we know
- * it's never on a page boundary, it's safe to write directly to this address,
- * as long as the write is never bigger than a page size.
- */
-void *ltt_relay_offset_address(struct ltt_chanbuf_alloc *bufa, size_t offset)
-{
-       size_t sbidx, index;
-       struct chanbuf_page *rpages;
-       struct ltt_chan_alloc *chana = bufa->chan;
-
-       offset &= chana->buf_size - 1;
-       sbidx = offset >> chana->sb_size_order;
-       index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-       rpages = bufa->buf_wsb[sbidx].pages;
-       WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-       return rpages[index].virt + (offset & ~PAGE_MASK);
-}
-EXPORT_SYMBOL_GPL(ltt_relay_offset_address);
-
-static __init int ltt_relay_alloc_init(void)
-{
-       hotcpu_notifier(ltt_relay_hotcpu_callback, 5);
-       ltt_relay_init();
-       ltt_ascii_init();
-       return 0;
-}
-
-static void __exit ltt_relay_alloc_exit(void)
-{
-       ltt_ascii_exit();
-       ltt_relay_exit();
-}
-
-module_init(ltt_relay_alloc_init);
-module_exit(ltt_relay_alloc_exit);
diff --git a/ltt-relay-lockless.c b/ltt-relay-lockless.c
deleted file mode 100644 (file)
index db4362d..0000000
+++ /dev/null
@@ -1,1366 +0,0 @@
-/*
- * ltt/ltt-relay-lockless.c
- *
- * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * LTTng lockless buffer space management (reader/writer).
- *
- * Author:
- *     Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * Inspired from LTT :
- *  Karim Yaghmour (karim@opersys.com)
- *  Tom Zanussi (zanussi@us.ibm.com)
- *  Bob Wisniewski (bob@watson.ibm.com)
- * And from K42 :
- *  Bob Wisniewski (bob@watson.ibm.com)
- *
- * Changelog:
- *  08/10/08, Cleanup.
- *  19/10/05, Complete lockless mechanism.
- *  27/05/05, Modular redesign and rewrite.
- *
- * Userspace reader semantic :
- * while (poll fd != POLLHUP) {
- *   - ioctl RELAY_GET_SUBBUF_SIZE
- *   while (1) {
- *     - ioctl GET_SUBBUF
- *     - splice 1 subbuffer worth of data to a pipe
- *     - splice the data from pipe to disk/network
- *     - ioctl PUT_SUBBUF, check error value
- *       if err val < 0, previous subbuffer was corrupted.
- *   }
- * }
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include <linux/time.h>
-#include <linux/module.h>
-#include <linux/string.h>
-#include <linux/slab.h>
-#include <linux/init.h>
-#include <linux/rcupdate.h>
-#include <linux/timer.h>
-#include <linux/sched.h>
-#include <linux/bitops.h>
-#include <linux/smp_lock.h>
-#include <linux/stat.h>
-#include <linux/cpu.h>
-#include <linux/idle.h>
-#include <linux/delay.h>
-#include <linux/notifier.h>
-#include <asm/atomic.h>
-#include <asm/local.h>
-
-#include "ltt-tracer.h"
-#include "ltt-relay.h"
-#include "ltt-relay-lockless.h"
-
-#if 0
-#define printk_dbg(fmt, args...) printk(fmt, args)
-#else
-#define printk_dbg(fmt, args...)
-#endif
-
-struct ltt_reserve_switch_offsets {
-       long begin, end, old;
-       long begin_switch, end_switch_current, end_switch_old;
-       size_t before_hdr_pad, size;
-};
-
-static
-void ltt_force_switch(struct ltt_chanbuf *buf, enum force_switch_mode mode);
-
-static
-void ltt_relay_print_buffer_errors(struct ltt_chan *chan, unsigned int cpu);
-
-static const struct file_operations ltt_file_operations;
-
-static
-void ltt_buffer_begin(struct ltt_chanbuf *buf, u64 tsc, unsigned int subbuf_idx)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-       struct ltt_subbuffer_header *header =
-               (struct ltt_subbuffer_header *)
-                       ltt_relay_offset_address(&buf->a,
-                               subbuf_idx * chan->a.sb_size);
-
-       header->cycle_count_begin = tsc;
-       header->data_size = 0xFFFFFFFF; /* for debugging */
-       ltt_write_trace_header(chan->a.trace, header);
-}
-
-/*
- * offset is assumed to never be 0 here : never deliver a completely empty
- * subbuffer. The lost size is between 0 and subbuf_size-1.
- */
-static
-void ltt_buffer_end(struct ltt_chanbuf *buf, u64 tsc, unsigned int offset,
-                   unsigned int subbuf_idx)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-       struct ltt_subbuffer_header *header =
-               (struct ltt_subbuffer_header *)
-                       ltt_relay_offset_address(&buf->a,
-                               subbuf_idx * chan->a.sb_size);
-       u32 data_size = SUBBUF_OFFSET(offset - 1, chan) + 1;
-
-       header->data_size = data_size;
-       header->sb_size = PAGE_ALIGN(data_size);
-       header->cycle_count_end = tsc;
-       header->events_lost = local_read(&buf->events_lost);
-       header->subbuf_corrupt = local_read(&buf->corrupted_subbuffers);
-}
-
-/*
- * Must be called under trace lock or cpu hotplug protection.
- */
-void ltt_chanbuf_free(struct ltt_chanbuf *buf)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
-       ltt_relay_print_buffer_errors(chan, buf->a.cpu);
-#ifdef LTT_VMCORE
-       kfree(buf->commit_seq);
-#endif
-       kfree(buf->commit_count);
-
-       ltt_chanbuf_alloc_free(&buf->a);
-}
-
-/*
- * Must be called under trace lock or cpu hotplug protection.
- */
-int ltt_chanbuf_create(struct ltt_chanbuf *buf, struct ltt_chan_alloc *chana,
-                      int cpu)
-{
-       struct ltt_chan *chan = container_of(chana, struct ltt_chan, a);
-       struct ltt_trace *trace = chana->trace;
-       unsigned int j, n_sb;
-       int ret;
-
-       /* Test for cpu hotplug */
-       if (buf->a.allocated)
-               return 0;
-
-       ret = ltt_chanbuf_alloc_create(&buf->a, &chan->a, cpu);
-       if (ret)
-               return ret;
-
-       buf->commit_count =
-               kzalloc_node(ALIGN(sizeof(*buf->commit_count) * chan->a.n_sb,
-                                  1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(cpu));
-       if (!buf->commit_count) {
-               ret = -ENOMEM;
-               goto free_chanbuf;
-       }
-
-#ifdef LTT_VMCORE
-       buf->commit_seq =
-               kzalloc_node(ALIGN(sizeof(*buf->commit_seq) * chan->a.n_sb,
-                                  1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(cpu));
-       if (!buf->commit_seq) {
-               kfree(buf->commit_count);
-               ret = -ENOMEM;
-               goto free_commit;
-       }
-#endif
-
-       local_set(&buf->offset, ltt_sb_header_size());
-       atomic_long_set(&buf->consumed, 0);
-       atomic_long_set(&buf->active_readers, 0);
-       n_sb = chan->a.n_sb;
-       for (j = 0; j < n_sb; j++) {
-               local_set(&buf->commit_count[j].cc, 0);
-               local_set(&buf->commit_count[j].cc_sb, 0);
-               local_set(&buf->commit_count[j].events, 0);
-       }
-       init_waitqueue_head(&buf->write_wait);
-       init_waitqueue_head(&buf->read_wait);
-       spin_lock_init(&buf->full_lock);
-
-       RCHAN_SB_CLEAR_NOREF(buf->a.buf_wsb[0].pages);
-       ltt_buffer_begin(buf, trace->start_tsc, 0);
-       /* atomic_add made on local variable on data that belongs to
-        * various CPUs : ok because tracing not started (for this cpu). */
-       local_add(ltt_sb_header_size(), &buf->commit_count[0].cc);
-
-       local_set(&buf->events_lost, 0);
-       local_set(&buf->corrupted_subbuffers, 0);
-       buf->finalized = 0;
-
-       ret = ltt_chanbuf_create_file(chan->a.filename, chan->a.parent,
-                                     S_IRUSR, buf);
-       if (ret)
-               goto free_init;
-
-       /*
-        * Ensure the buffer is ready before setting it to allocated.
-        * Used for cpu hotplug vs async wakeup.
-        */
-       smp_wmb();
-       buf->a.allocated = 1;
-
-       return 0;
-
-       /* Error handling */
-free_init:
-#ifdef LTT_VMCORE
-       kfree(buf->commit_seq);
-free_commit:
-#endif
-       kfree(buf->commit_count);
-free_chanbuf:
-       ltt_chanbuf_alloc_free(&buf->a);
-       return ret;
-}
-
-void ltt_chan_remove_files(struct ltt_chan *chan)
-{
-       ltt_ascii_remove(chan);
-       ltt_chan_alloc_remove_files(&chan->a);
-}
-EXPORT_SYMBOL_GPL(ltt_chan_remove_files);
-
-
-void ltt_chan_free(struct kref *kref)
-{
-       struct ltt_chan *chan = container_of(kref, struct ltt_chan, a.kref);
-
-       ltt_chan_alloc_free(&chan->a);
-}
-EXPORT_SYMBOL_GPL(ltt_chan_free);
-
-/**
- * ltt_chan_create - Create channel.
- */
-int ltt_chan_create(const char *base_filename,
-                   struct ltt_chan *chan, struct dentry *parent,
-                   size_t sb_size, size_t n_sb,
-                   int overwrite, struct ltt_trace *trace)
-{
-       int ret;
-
-       chan->overwrite = overwrite;
-
-       ret = ltt_chan_alloc_init(&chan->a, trace, base_filename, parent,
-                                 sb_size, n_sb, overwrite, overwrite);
-       if (ret)
-               goto error;
-
-       chan->commit_count_mask = (~0UL >> chan->a.n_sb_order);
-
-       ret = ltt_ascii_create(chan);
-       if (ret)
-               goto error_chan_alloc_free;
-
-       return ret;
-
-error_chan_alloc_free:
-       ltt_chan_alloc_free(&chan->a);
-error:
-       return ret;
-}
-EXPORT_SYMBOL_GPL(ltt_chan_create);
-
-int ltt_chanbuf_open_read(struct ltt_chanbuf *buf)
-{
-       kref_get(&buf->a.chan->kref);
-       if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) {
-               kref_put(&buf->a.chan->kref, ltt_chan_free);
-               return -EBUSY;
-       }
-
-       return 0;
-}
-EXPORT_SYMBOL_GPL(ltt_chanbuf_open_read);
-
-void ltt_chanbuf_release_read(struct ltt_chanbuf *buf)
-{
-       //ltt_relay_destroy_buffer(&buf->a.chan->a, buf->a.cpu);
-       WARN_ON(atomic_long_read(&buf->active_readers) != 1);
-       atomic_long_dec(&buf->active_readers);
-       kref_put(&buf->a.chan->kref, ltt_chan_free);
-}
-EXPORT_SYMBOL_GPL(ltt_chanbuf_release_read);
-
-/*
- * Wake writers :
- *
- * This must be done after the trace is removed from the RCU list so that there
- * are no stalled writers.
- */
-static void ltt_relay_wake_writers(struct ltt_chanbuf *buf)
-{
-
-       if (waitqueue_active(&buf->write_wait))
-               wake_up_interruptible(&buf->write_wait);
-}
-
-/*
- * This function should not be called from NMI interrupt context
- */
-static void ltt_buf_unfull(struct ltt_chanbuf *buf)
-{
-       ltt_relay_wake_writers(buf);
-}
-
-/*
- * Promote compiler barrier to a smp_mb().
- * For the specific LTTng case, this IPI call should be removed if the
- * architecture does not reorder writes.  This should eventually be provided by
- * a separate architecture-specific infrastructure.
- */
-static void remote_mb(void *info)
-{
-       smp_mb();
-}
-
-int ltt_chanbuf_get_subbuf(struct ltt_chanbuf *buf, unsigned long *consumed)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-       long consumed_old, consumed_idx, commit_count, write_offset;
-       int ret;
-
-       consumed_old = atomic_long_read(&buf->consumed);
-       consumed_idx = SUBBUF_INDEX(consumed_old, chan);
-       commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb);
-       /*
-        * Make sure we read the commit count before reading the buffer
-        * data and the write offset. Correct consumed offset ordering
-        * wrt commit count is insured by the use of cmpxchg to update
-        * the consumed offset.
-        * smp_call_function_single can fail if the remote CPU is offline,
-        * this is OK because then there is no wmb to execute there.
-        * If our thread is executing on the same CPU as the on the buffers
-        * belongs to, we don't have to synchronize it at all. If we are
-        * migrated, the scheduler will take care of the memory barriers.
-        * Normally, smp_call_function_single() should ensure program order when
-        * executing the remote function, which implies that it surrounds the
-        * function execution with :
-        * smp_mb()
-        * send IPI
-        * csd_lock_wait
-        *                recv IPI
-        *                smp_mb()
-        *                exec. function
-        *                smp_mb()
-        *                csd unlock
-        * smp_mb()
-        *
-        * However, smp_call_function_single() does not seem to clearly execute
-        * such barriers. It depends on spinlock semantic to provide the barrier
-        * before executing the IPI and, when busy-looping, csd_lock_wait only
-        * executes smp_mb() when it has to wait for the other CPU.
-        *
-        * I don't trust this code. Therefore, let's add the smp_mb() sequence
-        * required ourself, even if duplicated. It has no performance impact
-        * anyway.
-        *
-        * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
-        * read and write vs write. They do not ensure core synchronization. We
-        * really have to ensure total order between the 3 barriers running on
-        * the 2 CPUs.
-        */
-#ifdef LTT_NO_IPI_BARRIER
-       /*
-        * Local rmb to match the remote wmb to read the commit count before the
-        * buffer data and the write offset.
-        */
-       smp_rmb();
-#else
-       if (raw_smp_processor_id() != buf->a.cpu) {
-               smp_mb();       /* Total order with IPI handler smp_mb() */
-               smp_call_function_single(buf->a.cpu, remote_mb, NULL, 1);
-               smp_mb();       /* Total order with IPI handler smp_mb() */
-       }
-#endif
-       write_offset = local_read(&buf->offset);
-       /*
-        * Check that the subbuffer we are trying to consume has been
-        * already fully committed.
-        */
-       if (((commit_count - chan->a.sb_size)
-            & chan->commit_count_mask)
-           - (BUFFER_TRUNC(consumed_old, chan)
-              >> chan->a.n_sb_order)
-           != 0) {
-               return -EAGAIN;
-       }
-       /*
-        * Check that we are not about to read the same subbuffer in
-        * which the writer head is.
-        */
-       if ((SUBBUF_TRUNC(write_offset, chan)
-          - SUBBUF_TRUNC(consumed_old, chan))
-          == 0) {
-               return -EAGAIN;
-       }
-
-       ret = update_read_sb_index(&buf->a, &chan->a, consumed_idx);
-       if (ret)
-               return ret;
-
-       *consumed = consumed_old;
-       return 0;
-}
-EXPORT_SYMBOL_GPL(ltt_chanbuf_get_subbuf);
-
-int ltt_chanbuf_put_subbuf(struct ltt_chanbuf *buf, unsigned long consumed)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-       long consumed_new, consumed_old;
-
-       WARN_ON(atomic_long_read(&buf->active_readers) != 1);
-
-       consumed_old = consumed;
-       consumed_new = SUBBUF_ALIGN(consumed_old, chan);
-       WARN_ON_ONCE(RCHAN_SB_IS_NOREF(buf->a.buf_rsb.pages));
-       RCHAN_SB_SET_NOREF(buf->a.buf_rsb.pages);
-
-       spin_lock(&buf->full_lock);
-       if (atomic_long_cmpxchg(&buf->consumed, consumed_old, consumed_new)
-           != consumed_old) {
-               /* We have been pushed by the writer. */
-               spin_unlock(&buf->full_lock);
-               /*
-                * We exchanged the subbuffer pages. No corruption possible
-                * even if the writer did push us. No more -EIO possible.
-                */
-               return 0;
-       } else {
-               /* tell the client that buffer is now unfull */
-               int index;
-               long data;
-               index = SUBBUF_INDEX(consumed_old, chan);
-               data = BUFFER_OFFSET(consumed_old, chan);
-               ltt_buf_unfull(buf);
-               spin_unlock(&buf->full_lock);
-       }
-       return 0;
-}
-EXPORT_SYMBOL_GPL(ltt_chanbuf_put_subbuf);
-
-static void switch_buffer(unsigned long data)
-{
-       struct ltt_chanbuf *buf = (struct ltt_chanbuf *)data;
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
-       /*
-        * Only flush buffers periodically if readers are active.
-        */
-       if (atomic_long_read(&buf->active_readers))
-               ltt_force_switch(buf, FORCE_ACTIVE);
-
-       mod_timer_pinned(&buf->switch_timer,
-                        jiffies + chan->switch_timer_interval);
-}
-
-static void ltt_chanbuf_start_switch_timer(struct ltt_chanbuf *buf)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
-       if (!chan->switch_timer_interval)
-               return;
-
-       init_timer_deferrable(&buf->switch_timer);
-       buf->switch_timer.function = switch_buffer;
-       buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
-       buf->switch_timer.data = (unsigned long)buf;
-       add_timer_on(&buf->switch_timer, buf->a.cpu);
-}
-
-/*
- * called with ltt traces lock held.
- */
-void ltt_chan_start_switch_timer(struct ltt_chan *chan)
-{
-       int cpu;
-
-       if (!chan->switch_timer_interval)
-               return;
-
-       for_each_online_cpu(cpu) {
-               struct ltt_chanbuf *buf;
-
-               buf = per_cpu_ptr(chan->a.buf, cpu);
-               ltt_chanbuf_start_switch_timer(buf);
-       }
-}
-
-static void ltt_chanbuf_stop_switch_timer(struct ltt_chanbuf *buf)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
-       if (!chan->switch_timer_interval)
-               return;
-
-       del_timer_sync(&buf->switch_timer);
-}
-
-/*
- * called with ltt traces lock held.
- */
-void ltt_chan_stop_switch_timer(struct ltt_chan *chan)
-{
-       int cpu;
-
-       if (!chan->switch_timer_interval)
-               return;
-
-       for_each_online_cpu(cpu) {
-               struct ltt_chanbuf *buf;
-
-               buf = per_cpu_ptr(chan->a.buf, cpu);
-               ltt_chanbuf_stop_switch_timer(buf);
-       }
-}
-
-static void ltt_chanbuf_idle_switch(struct ltt_chanbuf *buf)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
-       if (chan->switch_timer_interval)
-               ltt_force_switch(buf, FORCE_ACTIVE);
-}
-
-/*
- * ltt_chanbuf_switch is called from a remote CPU to ensure that the buffers of
- * a cpu which went down are flushed. Note that if we execute concurrently
- * with trace allocation, a buffer might appear be unallocated (because it
- * detects that the target CPU is offline).
- */
-static void ltt_chanbuf_switch(struct ltt_chanbuf *buf)
-{
-       if (buf->a.allocated)
-               ltt_force_switch(buf, FORCE_ACTIVE);
-}
-
-/**
- *     ltt_chanbuf_hotcpu_callback - CPU hotplug callback
- *     @nb: notifier block
- *     @action: hotplug action to take
- *     @hcpu: CPU number
- *
- *     Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
- */
-static
-int ltt_chanbuf_hotcpu_callback(struct notifier_block *nb,
-                                         unsigned long action,
-                                         void *hcpu)
-{
-       unsigned int cpu = (unsigned long)hcpu;
-
-       switch (action) {
-       case CPU_DOWN_FAILED:
-       case CPU_DOWN_FAILED_FROZEN:
-       case CPU_ONLINE:
-       case CPU_ONLINE_FROZEN:
-               /*
-                * CPU hotplug lock protects trace lock from this callback.
-                */
-               ltt_chan_for_each_channel(ltt_chanbuf_start_switch_timer, cpu);
-               return NOTIFY_OK;
-
-       case CPU_DOWN_PREPARE:
-       case CPU_DOWN_PREPARE_FROZEN:
-               /*
-                * Performs an IPI to delete the timer locally on the target
-                * CPU. CPU hotplug lock protects trace lock from this
-                * callback.
-                */
-               ltt_chan_for_each_channel(ltt_chanbuf_stop_switch_timer, cpu);
-               return NOTIFY_OK;
-
-       case CPU_DEAD:
-       case CPU_DEAD_FROZEN:
-               /*
-                * Performing a buffer switch on a remote CPU. Performed by
-                * the CPU responsible for doing the hotunplug after the target
-                * CPU stopped running completely. Ensures that all data
-                * from that remote CPU is flushed. CPU hotplug lock protects
-                * trace lock from this callback.
-                */
-               ltt_chan_for_each_channel(ltt_chanbuf_switch, cpu);
-               return NOTIFY_OK;
-
-       default:
-               return NOTIFY_DONE;
-       }
-}
-
-static int pm_idle_entry_callback(struct notifier_block *self,
-                                 unsigned long val, void *data)
-{
-       if (val == IDLE_START) {
-               rcu_read_lock_sched_notrace();
-               ltt_chan_for_each_channel(ltt_chanbuf_idle_switch,
-                                         smp_processor_id());
-               rcu_read_unlock_sched_notrace();
-       }
-       return 0;
-}
-
-struct notifier_block pm_idle_entry_notifier = {
-       .notifier_call = pm_idle_entry_callback,
-       .priority = ~0U,        /* smallest prio, run after tracing events */
-};
-
-static
-void ltt_relay_print_written(struct ltt_chan *chan, long cons_off,
-                            unsigned int cpu)
-{
-       struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
-       long cons_idx, events_count;
-
-       cons_idx = SUBBUF_INDEX(cons_off, chan);
-       events_count = local_read(&buf->commit_count[cons_idx].events);
-
-       if (events_count)
-               printk(KERN_INFO
-                       "LTT: %lu events written in channel %s "
-                       "(cpu %u, index %lu)\n",
-                       events_count, chan->a.filename, cpu, cons_idx);
-}
-
-static
-void ltt_relay_print_subbuffer_errors(struct ltt_chanbuf *buf,
-                                     struct ltt_chan *chan, long cons_off,
-                                     unsigned int cpu)
-{
-       long cons_idx, commit_count, commit_count_sb, write_offset;
-
-       cons_idx = SUBBUF_INDEX(cons_off, chan);
-       commit_count = local_read(&buf->commit_count[cons_idx].cc);
-       commit_count_sb = local_read(&buf->commit_count[cons_idx].cc_sb);
-       /*
-        * No need to order commit_count and write_offset reads because we
-        * execute after trace is stopped when there are no readers left.
-        */
-       write_offset = local_read(&buf->offset);
-       printk(KERN_WARNING
-              "LTT : unread channel %s offset is %ld "
-              "and cons_off : %ld (cpu %u)\n",
-              chan->a.filename, write_offset, cons_off, cpu);
-       /* Check each sub-buffer for non filled commit count */
-       if (((commit_count - chan->a.sb_size) & chan->commit_count_mask)
-           - (BUFFER_TRUNC(cons_off, chan) >> chan->a.n_sb_order)
-           != 0)
-               printk(KERN_ALERT
-                      "LTT : %s : subbuffer %lu has non filled "
-                      "commit count [cc, cc_sb] [%lu,%lu].\n",
-                      chan->a.filename, cons_idx, commit_count,
-                      commit_count_sb);
-       printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %lu\n",
-              chan->a.filename, commit_count, chan->a.sb_size);
-}
-
-static
-void ltt_relay_print_errors(struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                           struct ltt_trace *trace, int cpu)
-{
-       long cons_off;
-
-       /*
-        * Can be called in the error path of allocation when
-        * trans_channel_data is not yet set.
-        */
-       if (!chan)
-               return;
-       for (cons_off = 0; cons_off < chan->a.buf_size;
-            cons_off = SUBBUF_ALIGN(cons_off, chan))
-               ltt_relay_print_written(chan, cons_off, cpu);
-       for (cons_off = atomic_long_read(&buf->consumed);
-                       (SUBBUF_TRUNC(local_read(&buf->offset), chan)
-                        - cons_off) > 0;
-                       cons_off = SUBBUF_ALIGN(cons_off, chan))
-               ltt_relay_print_subbuffer_errors(buf, chan, cons_off, cpu);
-}
-
-static
-void ltt_relay_print_buffer_errors(struct ltt_chan *chan, unsigned int cpu)
-{
-       struct ltt_trace *trace = chan->a.trace;
-       struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
-
-       if (local_read(&buf->events_lost))
-               printk(KERN_ALERT
-                      "LTT : %s : %ld events lost "
-                      "in %s channel (cpu %u).\n",
-                      chan->a.filename, local_read(&buf->events_lost),
-                      chan->a.filename, cpu);
-       if (local_read(&buf->corrupted_subbuffers))
-               printk(KERN_ALERT
-                      "LTT : %s : %ld corrupted subbuffers "
-                      "in %s channel (cpu %u).\n",
-                      chan->a.filename,
-                      local_read(&buf->corrupted_subbuffers),
-                      chan->a.filename, cpu);
-
-       ltt_relay_print_errors(buf, chan, trace, cpu);
-}
-
-static void ltt_relay_remove_dirs(struct ltt_trace *trace)
-{
-       ltt_ascii_remove_dir(trace);
-       debugfs_remove(trace->dentry.trace_root);
-}
-
-static int ltt_relay_create_dirs(struct ltt_trace *new_trace)
-{
-       struct dentry *ltt_root_dentry;
-       int ret;
-
-       ltt_root_dentry = get_ltt_root();
-       if (!ltt_root_dentry)
-               return ENOENT;
-
-       new_trace->dentry.trace_root = debugfs_create_dir(new_trace->trace_name,
-                                                         ltt_root_dentry);
-       put_ltt_root();
-       if (new_trace->dentry.trace_root == NULL) {
-               printk(KERN_ERR "LTT : Trace directory name %s already taken\n",
-                      new_trace->trace_name);
-               return EEXIST;
-       }
-       ret = ltt_ascii_create_dir(new_trace);
-       if (ret)
-               printk(KERN_WARNING "LTT : Unable to create ascii output file "
-                                   "for trace %s\n", new_trace->trace_name);
-
-       return 0;
-}
-
-/*
- * LTTng channel flush function.
- *
- * Must be called when no tracing is active in the channel, because of
- * accesses across CPUs.
- */
-static notrace void ltt_relay_buffer_flush(struct ltt_chanbuf *buf)
-{
-       buf->finalized = 1;
-       ltt_force_switch(buf, FORCE_FLUSH);
-}
-
-static void ltt_relay_async_wakeup_chan(struct ltt_chan *chan)
-{
-       unsigned int i;
-
-       for_each_possible_cpu(i) {
-               struct ltt_chanbuf *buf;
-
-               buf = per_cpu_ptr(chan->a.buf, i);
-               if (!buf->a.allocated)
-                       continue;
-               /*
-                * Ensure the buffer has been allocated before reading its
-                * content. Sync cpu hotplug vs async wakeup.
-                */
-               smp_rmb();
-               if (ltt_poll_deliver(buf, chan))
-                       wake_up_interruptible(&buf->read_wait);
-       }
-}
-
-static void ltt_relay_finish_buffer(struct ltt_chan *chan, unsigned int cpu)
-{
-       struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
-
-       if (buf->a.allocated) {
-               ltt_relay_buffer_flush(buf);
-               ltt_relay_wake_writers(buf);
-       }
-}
-
-
-static void ltt_relay_finish_channel(struct ltt_chan *chan)
-{
-       unsigned int i;
-
-       for_each_possible_cpu(i)
-               ltt_relay_finish_buffer(chan, i);
-}
-
-/*
- * This is called with preemption disabled when user space has requested
- * blocking mode.  If one of the active traces has free space below a
- * specific threshold value, we reenable preemption and block.
- */
-static
-int ltt_relay_user_blocking(struct ltt_trace *trace, unsigned int chan_index,
-                           size_t data_size, struct user_dbg_data *dbg)
-{
-       struct ltt_chanbuf *buf;
-       struct ltt_chan *chan;
-       int cpu;
-       DECLARE_WAITQUEUE(wait, current);
-
-       chan = &trace->channels[chan_index];
-       cpu = smp_processor_id();
-       buf = per_cpu_ptr(chan->a.buf, cpu);
-
-       /*
-        * Check if data is too big for the channel : do not
-        * block for it.
-        */
-       if (LTT_RESERVE_CRITICAL + data_size > chan->a.sb_size)
-               return 0;
-
-       /*
-        * If free space too low, we block. We restart from the
-        * beginning after we resume (cpu id may have changed
-        * while preemption is active).
-        */
-       spin_lock(&buf->full_lock);
-       if (!chan->overwrite) {
-               dbg->write = local_read(&buf->offset);
-               dbg->read = atomic_long_read(&buf->consumed);
-               dbg->avail_size = dbg->write + LTT_RESERVE_CRITICAL + data_size
-                                 - SUBBUF_TRUNC(dbg->read, chan);
-               if (dbg->avail_size > chan->a.buf_size) {
-                       __set_current_state(TASK_INTERRUPTIBLE);
-                       add_wait_queue(&buf->write_wait, &wait);
-                       spin_unlock(&buf->full_lock);
-                       preempt_enable();
-                       schedule();
-                       __set_current_state(TASK_RUNNING);
-                       remove_wait_queue(&buf->write_wait, &wait);
-                       if (signal_pending(current))
-                               return -ERESTARTSYS;
-                       preempt_disable();
-                       return 1;
-               }
-       }
-       spin_unlock(&buf->full_lock);
-       return 0;
-}
-
-static
-void ltt_relay_print_user_errors(struct ltt_trace *trace,
-                                unsigned int chan_index, size_t data_size,
-                                struct user_dbg_data *dbg, int cpu)
-{
-       struct ltt_chanbuf *buf;
-       struct ltt_chan *chan;
-
-       chan = &trace->channels[chan_index];
-       buf = per_cpu_ptr(chan->a.buf, cpu);
-
-       printk(KERN_ERR "Error in LTT usertrace : "
-              "buffer full : event lost in blocking "
-              "mode. Increase LTT_RESERVE_CRITICAL.\n");
-       printk(KERN_ERR "LTT nesting level is %u.\n",
-              per_cpu(ltt_nesting, cpu));
-       printk(KERN_ERR "LTT available size %lu.\n",
-              dbg->avail_size);
-       printk(KERN_ERR "available write : %lu, read : %lu\n",
-              dbg->write, dbg->read);
-
-       dbg->write = local_read(&buf->offset);
-       dbg->read = atomic_long_read(&buf->consumed);
-
-       printk(KERN_ERR "LTT current size %lu.\n",
-               dbg->write + LTT_RESERVE_CRITICAL + data_size
-               - SUBBUF_TRUNC(dbg->read, chan));
-       printk(KERN_ERR "current write : %lu, read : %lu\n",
-                       dbg->write, dbg->read);
-}
-
-/*
- * ltt_reserve_switch_old_subbuf: switch old subbuffer
- *
- * Concurrency safe because we are the last and only thread to alter this
- * sub-buffer. As long as it is not delivered and read, no other thread can
- * alter the offset, alter the reserve_count or call the
- * client_buffer_end_callback on this sub-buffer.
- *
- * The only remaining threads could be the ones with pending commits. They will
- * have to do the deliver themselves.  Not concurrency safe in overwrite mode.
- * We detect corrupted subbuffers with commit and reserve counts. We keep a
- * corrupted sub-buffers count and push the readers across these sub-buffers.
- *
- * Not concurrency safe if a writer is stalled in a subbuffer and another writer
- * switches in, finding out it's corrupted.  The result will be than the old
- * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
- * will be declared corrupted too because of the commit count adjustment.
- *
- * Note : offset_old should never be 0 here.
- */
-static
-void ltt_reserve_switch_old_subbuf(struct ltt_chanbuf *buf,
-                                  struct ltt_chan *chan,
-                                  struct ltt_reserve_switch_offsets *offsets,
-                                  u64 *tsc)
-{
-       long oldidx = SUBBUF_INDEX(offsets->old - 1, chan);
-       long commit_count, padding_size;
-
-       padding_size = chan->a.sb_size
-                       - (SUBBUF_OFFSET(offsets->old - 1, chan) + 1);
-       ltt_buffer_end(buf, *tsc, offsets->old, oldidx);
-
-       /*
-        * Must write slot data before incrementing commit count.
-        * This compiler barrier is upgraded into a smp_wmb() by the IPI
-        * sent by get_subbuf() when it does its smp_rmb().
-        */
-       barrier();
-       local_add(padding_size, &buf->commit_count[oldidx].cc);
-       commit_count = local_read(&buf->commit_count[oldidx].cc);
-       ltt_check_deliver(buf, chan, offsets->old - 1, commit_count, oldidx);
-       ltt_write_commit_counter(buf, chan, oldidx, offsets->old, commit_count,
-                                padding_size);
-}
-
-/*
- * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
- *
- * This code can be executed unordered : writers may already have written to the
- * sub-buffer before this code gets executed, caution.  The commit makes sure
- * that this code is executed before the deliver of this sub-buffer.
- */
-static
-void ltt_reserve_switch_new_subbuf(struct ltt_chanbuf *buf,
-                                  struct ltt_chan *chan,
-                                  struct ltt_reserve_switch_offsets *offsets,
-                                  u64 *tsc)
-{
-       long beginidx = SUBBUF_INDEX(offsets->begin, chan);
-       long commit_count;
-
-       ltt_buffer_begin(buf, *tsc, beginidx);
-
-       /*
-        * Must write slot data before incrementing commit count.
-        * This compiler barrier is upgraded into a smp_wmb() by the IPI
-        * sent by get_subbuf() when it does its smp_rmb().
-        */
-       barrier();
-       local_add(ltt_sb_header_size(), &buf->commit_count[beginidx].cc);
-       commit_count = local_read(&buf->commit_count[beginidx].cc);
-       /* Check if the written buffer has to be delivered */
-       ltt_check_deliver(buf, chan, offsets->begin, commit_count, beginidx);
-       ltt_write_commit_counter(buf, chan, beginidx, offsets->begin,
-                                commit_count, ltt_sb_header_size());
-}
-
-
-/*
- * ltt_reserve_end_switch_current: finish switching current subbuffer
- *
- * Concurrency safe because we are the last and only thread to alter this
- * sub-buffer. As long as it is not delivered and read, no other thread can
- * alter the offset, alter the reserve_count or call the
- * client_buffer_end_callback on this sub-buffer.
- *
- * The only remaining threads could be the ones with pending commits. They will
- * have to do the deliver themselves.  Not concurrency safe in overwrite mode.
- * We detect corrupted subbuffers with commit and reserve counts. We keep a
- * corrupted sub-buffers count and push the readers across these sub-buffers.
- *
- * Not concurrency safe if a writer is stalled in a subbuffer and another writer
- * switches in, finding out it's corrupted.  The result will be than the old
- * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
- * will be declared corrupted too because of the commit count adjustment.
- */
-static
-void ltt_reserve_end_switch_current(struct ltt_chanbuf *buf,
-                                   struct ltt_chan *chan,
-                                   struct ltt_reserve_switch_offsets *offsets,
-                                   u64 *tsc)
-{
-       long endidx = SUBBUF_INDEX(offsets->end - 1, chan);
-       long commit_count, padding_size;
-
-       padding_size = chan->a.sb_size
-                       - (SUBBUF_OFFSET(offsets->end - 1, chan) + 1);
-
-       ltt_buffer_end(buf, *tsc, offsets->end, endidx);
-
-       /*
-        * Must write slot data before incrementing commit count.
-        * This compiler barrier is upgraded into a smp_wmb() by the IPI
-        * sent by get_subbuf() when it does its smp_rmb().
-        */
-       barrier();
-       local_add(padding_size, &buf->commit_count[endidx].cc);
-       commit_count = local_read(&buf->commit_count[endidx].cc);
-       ltt_check_deliver(buf, chan, offsets->end - 1, commit_count, endidx);
-       ltt_write_commit_counter(buf, chan, endidx, offsets->end, commit_count,
-                                padding_size);
-}
-
-/*
- * Returns :
- * 0 if ok
- * !0 if execution must be aborted.
- */
-static
-int ltt_relay_try_switch_slow(enum force_switch_mode mode,
-                             struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                             struct ltt_reserve_switch_offsets *offsets,
-                             u64 *tsc)
-{
-       long sb_index;
-       long reserve_commit_diff;
-       long off;
-
-       offsets->begin = local_read(&buf->offset);
-       offsets->old = offsets->begin;
-       offsets->begin_switch = 0;
-       offsets->end_switch_old = 0;
-
-       *tsc = trace_clock_read64();
-
-       off = SUBBUF_OFFSET(offsets->begin, chan);
-       if ((mode != FORCE_ACTIVE && off > 0) || off > ltt_sb_header_size()) {
-               offsets->begin = SUBBUF_ALIGN(offsets->begin, chan);
-               offsets->end_switch_old = 1;
-       } else {
-               /* we do not have to switch : buffer is empty */
-               return -1;
-       }
-       if (mode == FORCE_ACTIVE)
-               offsets->begin += ltt_sb_header_size();
-       /*
-        * Always begin_switch in FORCE_ACTIVE mode.
-        * Test new buffer integrity
-        */
-       sb_index = SUBBUF_INDEX(offsets->begin, chan);
-       reserve_commit_diff =
-               (BUFFER_TRUNC(offsets->begin, chan)
-                >> chan->a.n_sb_order)
-               - (local_read(&buf->commit_count[sb_index].cc_sb)
-                       & chan->commit_count_mask);
-       if (reserve_commit_diff == 0) {
-               /* Next buffer not corrupted. */
-               if (mode == FORCE_ACTIVE
-                   && !chan->overwrite
-                   && offsets->begin - atomic_long_read(&buf->consumed)
-                      >= chan->a.buf_size) {
-                       /*
-                        * We do not overwrite non consumed buffers and we are
-                        * full : ignore switch while tracing is active.
-                        */
-                       return -1;
-               }
-       } else {
-               /*
-                * Next subbuffer corrupted. Force pushing reader even in normal
-                * mode
-                */
-       }
-       offsets->end = offsets->begin;
-       return 0;
-}
-
-/*
- * Force a sub-buffer switch for a per-cpu buffer. This operation is
- * completely reentrant : can be called while tracing is active with
- * absolutely no lock held.
- *
- * Note, however, that as a local_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
- */
-void ltt_force_switch_lockless_slow(struct ltt_chanbuf *buf,
-                                   enum force_switch_mode mode)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-       struct ltt_reserve_switch_offsets offsets;
-       u64 tsc;
-
-       offsets.size = 0;
-
-       /*
-        * Perform retryable operations.
-        */
-       do {
-               if (ltt_relay_try_switch_slow(mode, buf, chan, &offsets, &tsc))
-                       return;
-       } while (local_cmpxchg(&buf->offset, offsets.old, offsets.end)
-                != offsets.old);
-
-       /*
-        * Atomically update last_tsc. This update races against concurrent
-        * atomic updates, but the race will always cause supplementary full TSC
-        * events, never the opposite (missing a full TSC event when it would be
-        * needed).
-        */
-       save_last_tsc(buf, tsc);
-
-       /*
-        * Push the reader if necessary
-        */
-       if (mode == FORCE_ACTIVE) {
-               ltt_reserve_push_reader(buf, chan, offsets.end - 1);
-               ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.end - 1,
-                                                          chan));
-       }
-
-       /*
-        * Switch old subbuffer if needed.
-        */
-       if (offsets.end_switch_old) {
-               ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.old - 1,
-                                                          chan));
-               ltt_reserve_switch_old_subbuf(buf, chan, &offsets, &tsc);
-       }
-
-       /*
-        * Populate new subbuffer.
-        */
-       if (mode == FORCE_ACTIVE)
-               ltt_reserve_switch_new_subbuf(buf, chan, &offsets, &tsc);
-}
-EXPORT_SYMBOL_GPL(ltt_force_switch_lockless_slow);
-
-/*
- * Returns :
- * 0 if ok
- * !0 if execution must be aborted.
- */
-static
-int ltt_relay_try_reserve_slow(struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                              struct ltt_reserve_switch_offsets *offsets,
-                              size_t data_size, u64 *tsc, unsigned int *rflags,
-                              int largest_align)
-{
-       long reserve_commit_diff;
-
-       offsets->begin = local_read(&buf->offset);
-       offsets->old = offsets->begin;
-       offsets->begin_switch = 0;
-       offsets->end_switch_current = 0;
-       offsets->end_switch_old = 0;
-
-       *tsc = trace_clock_read64();
-       if (last_tsc_overflow(buf, *tsc))
-               *rflags = LTT_RFLAG_ID_SIZE_TSC;
-
-       if (unlikely(SUBBUF_OFFSET(offsets->begin, chan) == 0)) {
-               offsets->begin_switch = 1;              /* For offsets->begin */
-       } else {
-               offsets->size = ltt_get_header_size(chan, offsets->begin,
-                                                   data_size,
-                                                   &offsets->before_hdr_pad,
-                                                   *rflags);
-               offsets->size += ltt_align(offsets->begin + offsets->size,
-                                          largest_align)
-                                + data_size;
-               if (unlikely((SUBBUF_OFFSET(offsets->begin, chan) +
-                            offsets->size) > chan->a.sb_size)) {
-                       offsets->end_switch_old = 1;    /* For offsets->old */
-                       offsets->begin_switch = 1;      /* For offsets->begin */
-               }
-       }
-       if (unlikely(offsets->begin_switch)) {
-               long sb_index;
-
-               /*
-                * We are typically not filling the previous buffer completely.
-                */
-               if (likely(offsets->end_switch_old))
-                       offsets->begin = SUBBUF_ALIGN(offsets->begin, chan);
-               offsets->begin = offsets->begin + ltt_sb_header_size();
-               /* Test new buffer integrity */
-               sb_index = SUBBUF_INDEX(offsets->begin, chan);
-               reserve_commit_diff =
-                 (BUFFER_TRUNC(offsets->begin, chan)
-                  >> chan->a.n_sb_order)
-                 - (local_read(&buf->commit_count[sb_index].cc_sb)
-                               & chan->commit_count_mask);
-               if (likely(reserve_commit_diff == 0)) {
-                       /* Next buffer not corrupted. */
-                       if (unlikely(!chan->overwrite &&
-                               (SUBBUF_TRUNC(offsets->begin, chan)
-                                - SUBBUF_TRUNC(atomic_long_read(&buf->consumed),
-                                               chan))
-                               >= chan->a.buf_size)) {
-                               /*
-                                * We do not overwrite non consumed buffers
-                                * and we are full : event is lost.
-                                */
-                               local_inc(&buf->events_lost);
-                               return -1;
-                       } else {
-                               /*
-                                * next buffer not corrupted, we are either in
-                                * overwrite mode or the buffer is not full.
-                                * It's safe to write in this new subbuffer.
-                                */
-                       }
-               } else {
-                       /*
-                        * Next subbuffer corrupted. Drop event in normal and
-                        * overwrite mode. Caused by either a writer OOPS or
-                        * too many nested writes over a reserve/commit pair.
-                        */
-                       local_inc(&buf->events_lost);
-                       return -1;
-               }
-               offsets->size = ltt_get_header_size(chan, offsets->begin,
-                                                   data_size,
-                                                   &offsets->before_hdr_pad,
-                                                   *rflags);
-               offsets->size += ltt_align(offsets->begin + offsets->size,
-                                          largest_align)
-                                + data_size;
-               if (unlikely((SUBBUF_OFFSET(offsets->begin, chan)
-                            + offsets->size) > chan->a.sb_size)) {
-                       /*
-                        * Event too big for subbuffers, report error, don't
-                        * complete the sub-buffer switch.
-                        */
-                       local_inc(&buf->events_lost);
-                       return -1;
-               } else {
-                       /*
-                        * We just made a successful buffer switch and the event
-                        * fits in the new subbuffer. Let's write.
-                        */
-               }
-       } else {
-               /*
-                * Event fits in the current buffer and we are not on a switch
-                * boundary. It's safe to write.
-                */
-       }
-       offsets->end = offsets->begin + offsets->size;
-
-       if (unlikely((SUBBUF_OFFSET(offsets->end, chan)) == 0)) {
-               /*
-                * The offset_end will fall at the very beginning of the next
-                * subbuffer.
-                */
-               offsets->end_switch_current = 1;        /* For offsets->begin */
-       }
-       return 0;
-}
-
-/**
- * ltt_relay_reserve_slot_lockless_slow - Atomic slot reservation in a buffer.
- * @trace: the trace structure to log to.
- * @ltt_channel: channel structure
- * @transport_data: data structure specific to ltt relay
- * @data_size: size of the variable length data to log.
- * @slot_size: pointer to total size of the slot (out)
- * @buf_offset : pointer to reserved buffer offset (out)
- * @tsc: pointer to the tsc at the slot reservation (out)
- * @cpu: cpuid
- *
- * Return : -ENOSPC if not enough space, else returns 0.
- * It will take care of sub-buffer switching.
- */
-int ltt_reserve_slot_lockless_slow(struct ltt_chan *chan,
-                                  struct ltt_trace *trace, size_t data_size,
-                                  int largest_align, int cpu,
-                                  struct ltt_chanbuf **ret_buf,
-                                  size_t *slot_size, long *buf_offset,
-                                  u64 *tsc, unsigned int *rflags)
-{
-       struct ltt_chanbuf *buf = *ret_buf = per_cpu_ptr(chan->a.buf, cpu);
-       struct ltt_reserve_switch_offsets offsets;
-
-       offsets.size = 0;
-
-       do {
-               if (unlikely(ltt_relay_try_reserve_slow(buf, chan, &offsets,
-                                                       data_size, tsc, rflags,
-                                                       largest_align)))
-                       return -ENOSPC;
-       } while (unlikely(local_cmpxchg(&buf->offset, offsets.old, offsets.end)
-                         != offsets.old));
-
-       /*
-        * Atomically update last_tsc. This update races against concurrent
-        * atomic updates, but the race will always cause supplementary full TSC
-        * events, never the opposite (missing a full TSC event when it would be
-        * needed).
-        */
-       save_last_tsc(buf, *tsc);
-
-       /*
-        * Push the reader if necessary
-        */
-       ltt_reserve_push_reader(buf, chan, offsets.end - 1);
-
-       /*
-        * Clear noref flag for this subbuffer.
-        */
-       ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.end - 1, chan));
-
-       /*
-        * Switch old subbuffer if needed.
-        */
-       if (unlikely(offsets.end_switch_old)) {
-               ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.old - 1,
-                                                         chan));
-               ltt_reserve_switch_old_subbuf(buf, chan, &offsets, tsc);
-       }
-
-       /*
-        * Populate new subbuffer.
-        */
-       if (unlikely(offsets.begin_switch))
-               ltt_reserve_switch_new_subbuf(buf, chan, &offsets, tsc);
-
-       if (unlikely(offsets.end_switch_current))
-               ltt_reserve_end_switch_current(buf, chan, &offsets, tsc);
-
-       *slot_size = offsets.size;
-       *buf_offset = offsets.begin + offsets.before_hdr_pad;
-       return 0;
-}
-EXPORT_SYMBOL_GPL(ltt_reserve_slot_lockless_slow);
-
-static struct ltt_transport ltt_relay_transport = {
-       .name = "relay",
-       .owner = THIS_MODULE,
-       .ops = {
-               .create_dirs = ltt_relay_create_dirs,
-               .remove_dirs = ltt_relay_remove_dirs,
-               .create_channel = ltt_chan_create,
-               .finish_channel = ltt_relay_finish_channel,
-               .remove_channel = ltt_chan_free,
-               .remove_channel_files = ltt_chan_remove_files,
-               .wakeup_channel = ltt_relay_async_wakeup_chan,
-               .user_blocking = ltt_relay_user_blocking,
-               .user_errors = ltt_relay_print_user_errors,
-               .start_switch_timer = ltt_chan_start_switch_timer,
-               .stop_switch_timer = ltt_chan_stop_switch_timer,
-       },
-};
-
-static struct notifier_block fn_ltt_chanbuf_hotcpu_callback = {
-       .notifier_call = ltt_chanbuf_hotcpu_callback,
-       .priority = 6,
-};
-
-int __init ltt_relay_init(void)
-{
-       printk(KERN_INFO "LTT : ltt-relay init\n");
-
-       ltt_transport_register(&ltt_relay_transport);
-       register_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback);
-       register_idle_notifier(&pm_idle_entry_notifier);
-
-       return 0;
-}
-
-void __exit ltt_relay_exit(void)
-{
-       printk(KERN_INFO "LTT : ltt-relay exit\n");
-
-       unregister_idle_notifier(&pm_idle_entry_notifier);
-       unregister_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback);
-       ltt_transport_unregister(&ltt_relay_transport);
-}
-
-MODULE_LICENSE("GPL and additional rights");
-MODULE_AUTHOR("Mathieu Desnoyers");
-MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");
diff --git a/ltt-relay-lockless.h b/ltt-relay-lockless.h
deleted file mode 100644 (file)
index 73c6a3d..0000000
+++ /dev/null
@@ -1,549 +0,0 @@
-#ifndef _LTT_LTT_RELAY_LOCKLESS_H
-#define _LTT_LTT_RELAY_LOCKLESS_H
-
-/*
- * ltt/ltt-relay-lockless.h
- *
- * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * LTTng lockless buffer space management (reader/writer).
- *
- * Author:
- *     Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * Inspired from LTT :
- *  Karim Yaghmour (karim@opersys.com)
- *  Tom Zanussi (zanussi@us.ibm.com)
- *  Bob Wisniewski (bob@watson.ibm.com)
- * And from K42 :
- *  Bob Wisniewski (bob@watson.ibm.com)
- *
- * Changelog:
- *  08/10/08, Cleanup.
- *  19/10/05, Complete lockless mechanism.
- *  27/05/05, Modular redesign and rewrite.
- *
- * Userspace reader semantic :
- * while (poll fd != POLLHUP) {
- *   - ioctl RELAY_GET_SUBBUF_SIZE
- *   while (1) {
- *     - ioctl GET_SUBBUF
- *     - splice 1 subbuffer worth of data to a pipe
- *     - splice the data from pipe to disk/network
- *     - ioctl PUT_SUBBUF, check error value
- *       if err val < 0, previous subbuffer was corrupted.
- *   }
- * }
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include <linux/cache.h>
-#include <linux/time.h>
-#include <linux/module.h>
-#include <linux/string.h>
-#include <linux/slab.h>
-#include <linux/init.h>
-#include <linux/rcupdate.h>
-#include <linux/timer.h>
-#include <linux/sched.h>
-#include <linux/bitops.h>
-#include <linux/fs.h>
-#include <linux/smp_lock.h>
-#include <linux/debugfs.h>
-#include <linux/stat.h>
-#include <linux/cpu.h>
-#include <linux/pipe_fs_i.h>
-#include <linux/splice.h>
-#include <asm/atomic.h>
-#include <asm/local.h>
-
-#include "ltt-tracer.h"
-#include "ltt-relay.h"
-
-#if 0
-#define printk_dbg(fmt, args...) printk(fmt, args)
-#else
-#define printk_dbg(fmt, args...)
-#endif
-
-struct commit_counters {
-       local_t cc;
-       local_t cc_sb;                  /* Incremented _once_ at sb switch */
-       local_t events;                 /* Event count */
-};
-
-/* LTTng lockless logging buffer info */
-struct ltt_chanbuf {
-       struct ltt_chanbuf_alloc a;     /* Parent. First field. */
-       /* First 32 bytes cache-hot cacheline */
-       local_t offset;                 /* Current offset in the buffer */
-       struct commit_counters *commit_count;
-                                       /* Commit count per sub-buffer */
-       atomic_long_t consumed;         /*
-                                        * Current offset in the buffer
-                                        * standard atomic access (shared)
-                                        */
-       unsigned long last_tsc;         /*
-                                        * Last timestamp written in the buffer.
-                                        */
-       /* End of first 32 bytes cacheline */
-#ifdef LTT_VMCORE
-       local_t *commit_seq;            /* Consecutive commits */
-#endif
-       atomic_long_t active_readers;   /*
-                                        * Active readers count
-                                        * standard atomic access (shared)
-                                        */
-       local_t events_lost;
-       local_t corrupted_subbuffers;
-       spinlock_t full_lock;           /*
-                                        * buffer full condition spinlock, only
-                                        * for userspace tracing blocking mode
-                                        * synchronization with reader.
-                                        */
-       wait_queue_head_t write_wait;   /*
-                                        * Wait queue for blocking user space
-                                        * writers
-                                        */
-       wait_queue_head_t read_wait;    /* reader wait queue */
-       unsigned int finalized;         /* buffer has been finalized */
-       struct timer_list switch_timer; /* timer for periodical switch */
-};
-
-/*
- * A switch is done during tracing or as a final flush after tracing (so it
- * won't write in the new sub-buffer).
- */
-enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
-
-extern
-int ltt_reserve_slot_lockless_slow(struct ltt_chan *chan,
-                                  struct ltt_trace *trace, size_t data_size,
-                                  int largest_align, int cpu,
-                                  struct ltt_chanbuf **ret_buf,
-                                  size_t *slot_size, long *buf_offset,
-                                  u64 *tsc, unsigned int *rflags);
-
-extern void ltt_force_switch_lockless_slow(struct ltt_chanbuf *buf,
-                                          enum force_switch_mode mode);
-
-/*
- * Last TSC comparison functions. Check if the current TSC overflows
- * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
- * atomically.
- */
-
-#if (BITS_PER_LONG == 32)
-static __inline__ void save_last_tsc(struct ltt_chanbuf *buf, u64 tsc)
-{
-       buf->last_tsc = (unsigned long)(tsc >> LTT_TSC_BITS);
-}
-
-static __inline__ int last_tsc_overflow(struct ltt_chanbuf *buf, u64 tsc)
-{
-       unsigned long tsc_shifted = (unsigned long)(tsc >> LTT_TSC_BITS);
-
-       if (unlikely((tsc_shifted - buf->last_tsc)))
-               return 1;
-       else
-               return 0;
-}
-#else
-static __inline__ void save_last_tsc(struct ltt_chanbuf *buf, u64 tsc)
-{
-       buf->last_tsc = (unsigned long)tsc;
-}
-
-static __inline__ int last_tsc_overflow(struct ltt_chanbuf *buf, u64 tsc)
-{
-       if (unlikely((tsc - buf->last_tsc) >> LTT_TSC_BITS))
-               return 1;
-       else
-               return 0;
-}
-#endif
-
-extern
-int ltt_chanbuf_create(struct ltt_chanbuf *buf, struct ltt_chan_alloc *chana,
-                      int cpu);
-extern void ltt_chanbuf_free(struct ltt_chanbuf *buf);
-extern int ltt_chan_create(const char *base_filename, struct ltt_chan *chan,
-                          struct dentry *parent, size_t sb_size, size_t n_sb,
-                          int overwrite, struct ltt_trace *trace);
-extern void ltt_chan_free(struct kref *kref);
-extern void ltt_chan_remove_files(struct ltt_chan *chan);
-
-/* Buffer access operations */
-
-extern int ltt_chanbuf_open_read(struct ltt_chanbuf *buf);
-extern void ltt_chanbuf_release_read(struct ltt_chanbuf *buf);
-extern int ltt_chanbuf_get_subbuf(struct ltt_chanbuf *buf,
-                                 unsigned long *consumed);
-extern int ltt_chanbuf_put_subbuf(struct ltt_chanbuf *buf,
-                                 unsigned long consumed);
-extern void ltt_chan_start_switch_timer(struct ltt_chan *chan);
-extern void ltt_chan_stop_switch_timer(struct ltt_chan *chan);
-
-extern int ltt_relay_init(void);
-extern void ltt_relay_exit(void);
-
-static __inline__
-unsigned long ltt_chanbuf_get_offset(struct ltt_chanbuf *buf)
-{
-       return local_read(&buf->offset);
-}
-
-static __inline__
-unsigned long ltt_chanbuf_get_consumed(struct ltt_chanbuf *buf)
-{
-       return atomic_long_read(&buf->consumed);
-}
-
-static __inline__
-int ltt_chanbuf_is_finalized(struct ltt_chanbuf *buf)
-{
-       return buf->finalized;
-}
-
-static __inline__
-void ltt_reserve_push_reader(struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                            long offset)
-{
-       long consumed_old, consumed_new;
-
-       do {
-               consumed_old = atomic_long_read(&buf->consumed);
-               /*
-                * If buffer is in overwrite mode, push the reader consumed
-                * count if the write position has reached it and we are not
-                * at the first iteration (don't push the reader farther than
-                * the writer). This operation can be done concurrently by many
-                * writers in the same buffer, the writer being at the farthest
-                * write position sub-buffer index in the buffer being the one
-                * which will win this loop.
-                * If the buffer is not in overwrite mode, pushing the reader
-                * only happens if a sub-buffer is corrupted.
-                */
-               if (unlikely((SUBBUF_TRUNC(offset, chan)
-                             - SUBBUF_TRUNC(consumed_old, chan))
-                            >= chan->a.buf_size))
-                       consumed_new = SUBBUF_ALIGN(consumed_old, chan);
-               else
-                       return;
-       } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old,
-                                             consumed_new) != consumed_old));
-}
-
-#ifdef LTT_VMCORE
-static __inline__
-void ltt_vmcore_check_deliver(struct ltt_chanbuf *buf, long commit_count,
-                             long idx)
-{
-       local_set(&buf->commit_seq[idx], commit_count);
-}
-#else
-static __inline__
-void ltt_vmcore_check_deliver(struct ltt_chanbuf *buf, long commit_count,
-                             long idx)
-{
-}
-#endif
-
-static __inline__
-void ltt_check_deliver(struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                      long offset, long commit_count, long idx)
-{
-       long old_commit_count = commit_count - chan->a.sb_size;
-
-       /* Check if all commits have been done */
-       if (unlikely((BUFFER_TRUNC(offset, chan) >> chan->a.n_sb_order)
-                    - (old_commit_count & chan->commit_count_mask) == 0)) {
-               /*
-                * If we succeeded in updating the cc_sb, we are delivering
-                * the subbuffer. Deals with concurrent updates of the "cc"
-                * value without adding a add_return atomic operation to the
-                * fast path.
-                */
-               if (likely(local_cmpxchg(&buf->commit_count[idx].cc_sb,
-                                        old_commit_count, commit_count)
-                          == old_commit_count)) {
-                       /*
-                        * Set noref flag for this subbuffer.
-                        */
-                       ltt_set_noref_flag(&buf->a, idx);
-                       ltt_vmcore_check_deliver(buf, commit_count, idx);
-               }
-       }
-}
-
-
-static __inline__
-int ltt_poll_deliver(struct ltt_chanbuf *buf, struct ltt_chan *chan)
-{
-       long consumed_old, consumed_idx, commit_count, write_offset;
-
-       consumed_old = atomic_long_read(&buf->consumed);
-       consumed_idx = SUBBUF_INDEX(consumed_old, chan);
-       commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb);
-       /*
-        * No memory barrier here, since we are only interested
-        * in a statistically correct polling result. The next poll will
-        * get the data is we are racing. The mb() that ensures correct
-        * memory order is in get_subbuf.
-        */
-       write_offset = local_read(&buf->offset);
-
-       /*
-        * Check that the subbuffer we are trying to consume has been
-        * already fully committed.
-        */
-
-       if (((commit_count - chan->a.sb_size)
-            & chan->commit_count_mask)
-           - (BUFFER_TRUNC(consumed_old, chan)
-              >> chan->a.n_sb_order)
-           != 0)
-               return 0;
-
-       /*
-        * Check that we are not about to read the same subbuffer in
-        * which the writer head is.
-        */
-       if ((SUBBUF_TRUNC(write_offset, chan)
-          - SUBBUF_TRUNC(consumed_old, chan))
-          == 0)
-               return 0;
-
-       return 1;
-
-}
-
-static __inline__
-u32 get_read_sb_size(struct ltt_chanbuf *buf)
-{
-       struct ltt_subbuffer_header *header =
-               (struct ltt_subbuffer_header *)
-                       ltt_relay_read_offset_address(&buf->a, 0);
-       return header->sb_size;
-}
-
-/*
- * returns 0 if reserve ok, or 1 if the slow path must be taken.
- */
-static __inline__
-int ltt_relay_try_reserve(struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                         size_t data_size, u64 *tsc, unsigned int *rflags,
-                         int largest_align, long *o_begin, long *o_end,
-                         long *o_old, size_t *before_hdr_pad, size_t *size)
-{
-       *o_begin = local_read(&buf->offset);
-       *o_old = *o_begin;
-
-       *tsc = trace_clock_read64();
-
-#ifdef LTT_VMCORE
-       prefetch(&buf->commit_count[SUBBUF_INDEX(*o_begin, chan)]);
-       prefetch(&buf->commit_seq[SUBBUF_INDEX(*o_begin, chan)]);
-#else
-       prefetchw(&buf->commit_count[SUBBUF_INDEX(*o_begin, chan)]);
-#endif
-       if (last_tsc_overflow(buf, *tsc))
-               *rflags = LTT_RFLAG_ID_SIZE_TSC;
-
-       if (unlikely(SUBBUF_OFFSET(*o_begin, chan) == 0))
-               return 1;
-
-       *size = ltt_get_header_size(chan, *o_begin, data_size, before_hdr_pad,
-                                   *rflags);
-       *size += ltt_align(*o_begin + *size, largest_align) + data_size;
-       if (unlikely((SUBBUF_OFFSET(*o_begin, chan) + *size) > chan->a.sb_size))
-               return 1;
-
-       /*
-        * Event fits in the current buffer and we are not on a switch
-        * boundary. It's safe to write.
-        */
-       *o_end = *o_begin + *size;
-
-       if (unlikely((SUBBUF_OFFSET(*o_end, chan)) == 0))
-               /*
-                * The offset_end will fall at the very beginning of the next
-                * subbuffer.
-                */
-               return 1;
-
-       return 0;
-}
-
-static __inline__
-int ltt_reserve_slot(struct ltt_chan *chan,
-                    struct ltt_trace *trace, size_t data_size,
-                    int largest_align, int cpu,
-                    struct ltt_chanbuf **ret_buf,
-                    size_t *slot_size, long *buf_offset, u64 *tsc,
-                    unsigned int *rflags)
-{
-       struct ltt_chanbuf *buf = *ret_buf = per_cpu_ptr(chan->a.buf, cpu);
-       long o_begin, o_end, o_old;
-       size_t before_hdr_pad;
-
-       /*
-        * Perform retryable operations.
-        */
-       if (unlikely(__get_cpu_var(ltt_nesting) > 4)) {
-               local_inc(&buf->events_lost);
-               return -EPERM;
-       }
-
-       if (unlikely(ltt_relay_try_reserve(buf, chan, data_size, tsc, rflags,
-                                          largest_align, &o_begin, &o_end,
-                                          &o_old, &before_hdr_pad, slot_size)))
-               goto slow_path;
-
-       if (unlikely(local_cmpxchg(&buf->offset, o_old, o_end) != o_old))
-               goto slow_path;
-
-       /*
-        * Atomically update last_tsc. This update races against concurrent
-        * atomic updates, but the race will always cause supplementary full TSC
-        * events, never the opposite (missing a full TSC event when it would be
-        * needed).
-        */
-       save_last_tsc(buf, *tsc);
-
-       /*
-        * Push the reader if necessary
-        */
-       ltt_reserve_push_reader(buf, chan, o_end - 1);
-
-       /*
-        * Clear noref flag for this subbuffer.
-        */
-       ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(o_end - 1, chan));
-
-       *buf_offset = o_begin + before_hdr_pad;
-       return 0;
-slow_path:
-       return ltt_reserve_slot_lockless_slow(chan, trace, data_size,
-                                             largest_align, cpu, ret_buf,
-                                             slot_size, buf_offset, tsc,
-                                             rflags);
-}
-
-/*
- * Force a sub-buffer switch for a per-cpu buffer. This operation is
- * completely reentrant : can be called while tracing is active with
- * absolutely no lock held.
- *
- * Note, however, that as a local_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
- */
-static __inline__
-void ltt_force_switch(struct ltt_chanbuf *buf, enum force_switch_mode mode)
-{
-       return ltt_force_switch_lockless_slow(buf, mode);
-}
-
-/*
- * for flight recording. must be called after relay_commit.
- * This function increments the subbuffer's commit_seq counter each time the
- * commit count reaches back the reserve offset (module subbuffer size). It is
- * useful for crash dump.
- */
-#ifdef LTT_VMCORE
-static __inline__
-void ltt_write_commit_counter(struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                             long idx, long buf_offset, long commit_count,
-                             size_t data_size)
-{
-       long offset;
-       long commit_seq_old;
-
-       offset = buf_offset + data_size;
-
-       /*
-        * SUBBUF_OFFSET includes commit_count_mask. We can simply
-        * compare the offsets within the subbuffer without caring about
-        * buffer full/empty mismatch because offset is never zero here
-        * (subbuffer header and event headers have non-zero length).
-        */
-       if (unlikely(SUBBUF_OFFSET(offset - commit_count, chan)))
-               return;
-
-       commit_seq_old = local_read(&buf->commit_seq[idx]);
-       while (commit_seq_old < commit_count)
-               commit_seq_old = local_cmpxchg(&buf->commit_seq[idx],
-                                        commit_seq_old, commit_count);
-}
-#else
-static __inline__
-void ltt_write_commit_counter(struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                             long idx, long buf_offset, long commit_count,
-                             size_t data_size)
-{
-}
-#endif
-
-/*
- * Atomic unordered slot commit. Increments the commit count in the
- * specified sub-buffer, and delivers it if necessary.
- *
- * Parameters:
- *
- * @buf: buffer.
- * @chan: channel.
- * @buf_offset : offset following the event header.
- * @data_size : size of the event data.
- * @slot_size : size of the reserved slot.
- */
-static __inline__
-void ltt_commit_slot(struct ltt_chanbuf *buf, struct ltt_chan *chan,
-                    long buf_offset, size_t data_size, size_t slot_size)
-{
-       long offset_end = buf_offset;
-       long endidx = SUBBUF_INDEX(offset_end - 1, chan);
-       long commit_count;
-
-#ifdef LTT_NO_IPI_BARRIER
-       smp_wmb();
-#else
-       /*
-        * Must write slot data before incrementing commit count.
-        * This compiler barrier is upgraded into a smp_mb() by the IPI
-        * sent by get_subbuf().
-        */
-       barrier();
-#endif
-       local_add(slot_size, &buf->commit_count[endidx].cc);
-       local_inc(&buf->commit_count[endidx].events);
-       /*
-        * commit count read can race with concurrent OOO commit count updates.
-        * This is only needed for ltt_check_deliver (for non-polling delivery
-        * only) and for ltt_write_commit_counter. The race can only cause the
-        * counter to be read with the same value more than once, which could
-        * cause :
-        * - Multiple delivery for the same sub-buffer (which is handled
-        *   gracefully by the reader code) if the value is for a full
-        *   sub-buffer. It's important that we can never miss a sub-buffer
-        *   delivery. Re-reading the value after the local_add ensures this.
-        * - Reading a commit_count with a higher value that what was actually
-        *   added to it for the ltt_write_commit_counter call (again caused by
-        *   a concurrent committer). It does not matter, because this function
-        *   is interested in the fact that the commit count reaches back the
-        *   reserve offset for a specific sub-buffer, which is completely
-        *   independent of the order.
-        */
-       commit_count = local_read(&buf->commit_count[endidx].cc);
-
-       ltt_check_deliver(buf, chan, offset_end - 1, commit_count, endidx);
-       /*
-        * Update data_size for each commit. It's needed only for extracting
-        * ltt buffers from vmcore, after crash.
-        */
-       ltt_write_commit_counter(buf, chan, endidx, buf_offset,
-                                commit_count, data_size);
-}
-
-#endif //_LTT_LTT_RELAY_LOCKLESS_H
diff --git a/ltt-relay-splice.c b/ltt-relay-splice.c
deleted file mode 100644 (file)
index e4694c1..0000000
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright (C) 2002-2005 - Tom Zanussi (zanussi@us.ibm.com), IBM Corp
- * Copyright (C) 1999-2005 - Karim Yaghmour (karim@opersys.com)
- * Copyright (C) 2008-2009 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * Re-using content from kernel/relay.c
- *
- * This file is released under the GPL.
- */
-
-#include <linux/errno.h>
-#include <linux/stddef.h>
-#include <linux/slab.h>
-#include <linux/module.h>
-#include <linux/string.h>
-#include <linux/vmalloc.h>
-#include <linux/mm.h>
-#include <linux/cpu.h>
-#include <linux/splice.h>
-#include <linux/pipe_fs_i.h>
-#include <linux/bitops.h>
-
-#include "ltt-relay.h"
-#include "ltt-relay-lockless.h"
-
-loff_t ltt_relay_no_llseek(struct file *file, loff_t offset, int origin)
-{
-       return -ESPIPE;
-}
-
-static void ltt_relay_pipe_buf_release(struct pipe_inode_info *pipe,
-                                      struct pipe_buffer *pbuf)
-{
-}
-
-static struct pipe_buf_operations ltt_relay_pipe_buf_ops = {
-       .can_merge = 0,
-       .map = generic_pipe_buf_map,
-       .unmap = generic_pipe_buf_unmap,
-       .confirm = generic_pipe_buf_confirm,
-       .release = ltt_relay_pipe_buf_release,
-       .steal = generic_pipe_buf_steal,
-       .get = generic_pipe_buf_get,
-};
-
-static void ltt_relay_page_release(struct splice_pipe_desc *spd, unsigned int i)
-{
-}
-
-/*
- *     subbuf_splice_actor - splice up to one subbuf's worth of data
- */
-static int subbuf_splice_actor(struct file *in,
-                              loff_t *ppos,
-                              struct pipe_inode_info *pipe,
-                              size_t len,
-                              unsigned int flags)
-{
-       struct ltt_chanbuf *buf = in->private_data;
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-       unsigned int poff, subbuf_pages, nr_pages;
-       struct page *pages[PIPE_DEF_BUFFERS];
-       struct partial_page partial[PIPE_DEF_BUFFERS];
-       struct splice_pipe_desc spd = {
-               .pages = pages,
-               .nr_pages = 0,
-               .partial = partial,
-               .flags = flags,
-               .ops = &ltt_relay_pipe_buf_ops,
-               .spd_release = ltt_relay_page_release,
-       };
-       long consumed_old, consumed_idx, roffset;
-       unsigned long bytes_avail;
-
-       /*
-        * Check that a GET_SUBBUF ioctl has been done before.
-        */
-       WARN_ON(atomic_long_read(&buf->active_readers) != 1);
-       consumed_old = atomic_long_read(&buf->consumed);
-       consumed_old += *ppos;
-       consumed_idx = SUBBUF_INDEX(consumed_old, chan);
-
-       /*
-        * Adjust read len, if longer than what is available.
-        * Max read size is 1 subbuffer due to get_subbuf/put_subbuf for
-        * protection.
-        */
-       bytes_avail = chan->a.sb_size;
-       WARN_ON(bytes_avail > chan->a.buf_size);
-       len = min_t(size_t, len, bytes_avail);
-       subbuf_pages = bytes_avail >> PAGE_SHIFT;
-       nr_pages = min_t(unsigned int, subbuf_pages, PIPE_DEF_BUFFERS);
-       roffset = consumed_old & PAGE_MASK;
-       poff = consumed_old & ~PAGE_MASK;
-       printk_dbg(KERN_DEBUG "SPLICE actor len %zu pos %zd write_pos %ld\n",
-                  len, (ssize_t)*ppos, local_read(&buf->offset));
-
-       for (; spd.nr_pages < nr_pages; spd.nr_pages++) {
-               unsigned int this_len;
-               struct page *page;
-
-               if (!len)
-                       break;
-               printk_dbg(KERN_DEBUG "SPLICE actor loop len %zu roffset %ld\n",
-                          len, roffset);
-
-               this_len = PAGE_SIZE - poff;
-               page = ltt_relay_read_get_page(&buf->a, roffset);
-               spd.pages[spd.nr_pages] = page;
-               spd.partial[spd.nr_pages].offset = poff;
-               spd.partial[spd.nr_pages].len = this_len;
-
-               poff = 0;
-               roffset += PAGE_SIZE;
-               len -= this_len;
-       }
-
-       if (!spd.nr_pages)
-               return 0;
-
-       return splice_to_pipe(pipe, &spd);
-}
-
-ssize_t ltt_relay_file_splice_read(struct file *in, loff_t *ppos,
-                                  struct pipe_inode_info *pipe, size_t len,
-                                  unsigned int flags)
-{
-       ssize_t spliced;
-       int ret;
-
-       ret = 0;
-       spliced = 0;
-
-       printk_dbg(KERN_DEBUG "SPLICE read len %zu pos %zd\n", len,
-                  (ssize_t)*ppos);
-       while (len && !spliced) {
-               ret = subbuf_splice_actor(in, ppos, pipe, len, flags);
-               printk_dbg(KERN_DEBUG "SPLICE read loop ret %d\n", ret);
-               if (ret < 0)
-                       break;
-               else if (!ret) {
-                       if (flags & SPLICE_F_NONBLOCK)
-                               ret = -EAGAIN;
-                       break;
-               }
-
-               *ppos += ret;
-               if (ret > len)
-                       len = 0;
-               else
-                       len -= ret;
-               spliced += ret;
-       }
-
-       if (spliced)
-               return spliced;
-
-       return ret;
-}
diff --git a/ltt-relay-vfs.c b/ltt-relay-vfs.c
deleted file mode 100644 (file)
index defbe2d..0000000
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * ltt/ltt-relay-vfs.c
- *
- * (C) Copyright 2009 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * LTTng VFS interface.
- *
- * Author:
- *     Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/debugfs.h>
-#include <linux/ltt-channels.h>
-#include <asm/atomic.h>
-
-#include "ltt-tracer.h"
-#include "ltt-relay.h"
-#include "ltt-relay-lockless.h"
-
-/**
- *     ltt_open - open file op for ltt files
- *     @inode: opened inode
- *     @file: opened file
- *
- *     Open implementation. Makes sure only one open instance of a buffer is
- *     done at a given moment.
- */
-static int ltt_open(struct inode *inode, struct file *file)
-{
-       struct ltt_chanbuf *buf = inode->i_private;
-       int ret;
-
-       ret = ltt_chanbuf_open_read(buf);
-       if (ret)
-               goto end;
-
-       file->private_data = buf;
-       ret = nonseekable_open(inode, file);
-       /*
-        * Let LTTng splice operation must believe that the file descriptor is
-        * seekable. This is a temporary fix to follow new checks added to
-        * splice.c. We should probably do the proper thing and implement a
-        * llseek function eventually, which involves modifying the lttng splice
-        * actors accordingly. TODO
-        */
-       file->f_mode |= FMODE_PREAD;
-end:
-       return ret;
-}
-
-/**
- *     ltt_release - release file op for ltt files
- *     @inode: opened inode
- *     @file: opened file
- *
- *     Release implementation.
- */
-static int ltt_release(struct inode *inode, struct file *file)
-{
-       struct ltt_chanbuf *buf = inode->i_private;
-
-       ltt_chanbuf_release_read(buf);
-
-       return 0;
-}
-
-/**
- *     ltt_poll - file op for ltt files
- *     @filp: the file
- *     @wait: poll table
- *
- *     Poll implementation.
- */
-static unsigned int ltt_poll(struct file *filp, poll_table *wait)
-{
-       unsigned int mask = 0;
-       struct inode *inode = filp->f_dentry->d_inode;
-       struct ltt_chanbuf *buf = inode->i_private;
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
-       if (filp->f_mode & FMODE_READ) {
-               poll_wait_set_exclusive(wait);
-               poll_wait(filp, &buf->read_wait, wait);
-
-               WARN_ON(atomic_long_read(&buf->active_readers) != 1);
-               if (SUBBUF_TRUNC(ltt_chanbuf_get_offset(buf), chan)
-                 - SUBBUF_TRUNC(ltt_chanbuf_get_consumed(buf), chan)
-                 == 0) {
-                       if (buf->finalized)
-                               return POLLHUP;
-                       else
-                               return 0;
-               } else {
-                       if (SUBBUF_TRUNC(ltt_chanbuf_get_offset(buf), chan)
-                         - SUBBUF_TRUNC(ltt_chanbuf_get_consumed(buf), chan)
-                         >= chan->a.buf_size)
-                               return POLLPRI | POLLRDBAND;
-                       else
-                               return POLLIN | POLLRDNORM;
-               }
-       }
-       return mask;
-}
-
-/**
- *     ltt_ioctl - control on the debugfs file
- *
- *     @inode: the inode
- *     @filp: the file
- *     @cmd: the command
- *     @arg: command arg
- *
- *     This ioctl implements three commands necessary for a minimal
- *     producer/consumer implementation :
- *     RELAY_GET_SB
- *             Get the next sub-buffer that can be read. It never blocks.
- *     RELAY_PUT_SB
- *             Release the currently read sub-buffer. Parameter is the last
- *             put subbuffer (returned by GET_SUBBUF).
- *     RELAY_GET_N_SB
- *             returns the number of sub-buffers in the per cpu channel.
- *     RELAY_GET_SB_SIZE
- *             returns the size of the current sub-buffer.
- *     RELAY_GET_MAX_SB_SIZE
- *             returns the maximum size for sub-buffers.
- */
-static
-int ltt_ioctl(struct inode *inode, struct file *filp, unsigned int cmd,
-             unsigned long arg)
-{
-       struct ltt_chanbuf *buf = inode->i_private;
-       u32 __user *argp = (u32 __user *)arg;
-
-       switch (cmd) {
-       case RELAY_GET_SB:
-       {
-               unsigned long consumed;
-               int ret;
-
-               ret = ltt_chanbuf_get_subbuf(buf, &consumed);
-               if (ret)
-                       return ret;
-               else
-                       return put_user((u32)consumed, argp);
-               break;
-       }
-       case RELAY_PUT_SB:
-       {
-               u32 uconsumed_old;
-               int ret;
-               long consumed_old;
-
-               ret = get_user(uconsumed_old, argp);
-               if (ret)
-                       return ret; /* will return -EFAULT */
-
-               consumed_old = ltt_chanbuf_get_consumed(buf);
-               consumed_old = consumed_old & (~0xFFFFFFFFL);
-               consumed_old = consumed_old | uconsumed_old;
-               ret = ltt_chanbuf_put_subbuf(buf, consumed_old);
-               if (ret)
-                       return ret;
-               break;
-       }
-       case RELAY_GET_N_SB:
-               return put_user((u32)buf->a.chan->n_sb, argp);
-               break;
-       case RELAY_GET_SB_SIZE:
-               return put_user(get_read_sb_size(buf), argp);
-               break;
-       case RELAY_GET_MAX_SB_SIZE:
-               return put_user((u32)buf->a.chan->sb_size, argp);
-               break;
-       default:
-               return -ENOIOCTLCMD;
-       }
-       return 0;
-}
-
-#ifdef CONFIG_COMPAT
-static
-long ltt_compat_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
-{
-       long ret = -ENOIOCTLCMD;
-
-       lock_kernel();
-       ret = ltt_ioctl(file->f_dentry->d_inode, file, cmd, arg);
-       unlock_kernel();
-
-       return ret;
-}
-#endif
-
-static const struct file_operations ltt_file_operations = {
-       .open = ltt_open,
-       .release = ltt_release,
-       .poll = ltt_poll,
-       .splice_read = ltt_relay_file_splice_read,
-       .ioctl = ltt_ioctl,
-       .llseek = ltt_relay_no_llseek,
-#ifdef CONFIG_COMPAT
-       .compat_ioctl = ltt_compat_ioctl,
-#endif
-};
-
-int ltt_chanbuf_create_file(const char *filename, struct dentry *parent,
-                           int mode, struct ltt_chanbuf *buf)
-{
-       struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-       char *tmpname;
-       int ret = 0;
-
-       tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
-       if (!tmpname) {
-               ret = -ENOMEM;
-               goto end;
-       }
-
-       snprintf(tmpname, NAME_MAX, "%s%s_%d",
-                chan->overwrite ? LTT_FLIGHT_PREFIX : "",
-                chan->a.filename, buf->a.cpu);
-
-       buf->a.dentry = debugfs_create_file(tmpname, mode, parent, buf,
-                                           &ltt_file_operations);
-       if (!buf->a.dentry) {
-               ret = -ENOMEM;
-               goto free_name;
-       }
-free_name:
-       kfree(tmpname);
-end:
-       return ret;
-}
-
-int ltt_chanbuf_remove_file(struct ltt_chanbuf *buf)
-{
-       debugfs_remove(buf->a.dentry);
-
-       return 0;
-}
diff --git a/ltt-relay.h b/ltt-relay.h
deleted file mode 100644 (file)
index 3c23a8e..0000000
+++ /dev/null
@@ -1,380 +0,0 @@
- /*
- * include/linux/ltt-relay.h
- *
- * Copyright (C) 2008,2009 - Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
- *
- * Dual LGPL v2.1/GPL v2 license.
- *
- * Credits to Steven Rostedt for proposing to use an extra-subbuffer owned by
- * the reader in flight recorder mode.
- */
-
-#ifndef _LINUX_LTT_RELAY_H
-#define _LINUX_LTT_RELAY_H
-
-#include <linux/types.h>
-#include <linux/sched.h>
-#include <linux/timer.h>
-#include <linux/wait.h>
-#include <linux/fs.h>
-#include <linux/poll.h>
-#include <linux/kref.h>
-#include <linux/mm.h>
-#include <linux/ltt-channels.h>
-
-#include "ltt-tracer-core.h"
-
-/* Use lowest pointer bit to show the sub-buffer has no reference. */
-#define RCHAN_NOREF_FLAG       0x1UL
-
-#define RCHAN_SB_IS_NOREF(x)   ((unsigned long)(x) & RCHAN_NOREF_FLAG)
-#define RCHAN_SB_SET_NOREF(x)  \
-       (x = (struct chanbuf_page *)((unsigned long)(x) | RCHAN_NOREF_FLAG))
-#define RCHAN_SB_CLEAR_NOREF(x)        \
-       (x = (struct chanbuf_page *)((unsigned long)(x) & ~RCHAN_NOREF_FLAG))
-
-struct ltt_trace;
-
-struct chanbuf_page {
-       void *virt;                     /* page virtual address (cached) */
-       struct page *page;              /* pointer to page structure */
-};
-
-struct chanbuf_sb {
-       struct chanbuf_page *pages;     /* Pointer to rchan pages for subbuf */
-};
-
-struct ltt_chanbuf_alloc {
-       struct chanbuf_sb *buf_wsb;     /* Array of rchan_sb for writer */
-       struct chanbuf_sb buf_rsb;      /* chanbuf_sb for reader */
-       void **_virt;                   /* Array of pointers to page addr */
-       struct page **_pages;           /* Array of pointers to pages */
-       struct dentry *dentry;          /* Associated file dentry */
-       unsigned int nr_pages;          /* Number pages in buffer */
-
-       struct ltt_chan_alloc *chan;    /* Associated channel */
-       unsigned int cpu;               /* This buffer's cpu */
-       unsigned int allocated:1;       /* Bool: is buffer allocated ? */
-};
-
-int ltt_chanbuf_alloc_create(struct ltt_chanbuf_alloc *buf,
-                            struct ltt_chan_alloc *chan, int cpu);
-void ltt_chanbuf_alloc_free(struct ltt_chanbuf_alloc *buf);
-int ltt_chan_alloc_init(struct ltt_chan_alloc *chan, struct ltt_trace *trace,
-                       const char *base_filename,
-                       struct dentry *parent, size_t sb_size,
-                       size_t n_sb, int extra_reader_sb, int overwrite);
-void ltt_chan_alloc_free(struct ltt_chan_alloc *chan);
-void ltt_chan_alloc_remove_files(struct ltt_chan_alloc *chan);
-int ltt_chanbuf_create_file(const char *filename, struct dentry *parent,
-                           int mode, struct ltt_chanbuf *buf);
-int ltt_chanbuf_remove_file(struct ltt_chanbuf *buf);
-
-void ltt_chan_for_each_channel(void (*cb) (struct ltt_chanbuf *buf), int cpu);
-
-extern void _ltt_relay_write(struct ltt_chanbuf_alloc *bufa,
-                            size_t offset, const void *src, size_t len,
-                            ssize_t pagecpy);
-
-extern void _ltt_relay_strncpy(struct ltt_chanbuf_alloc *bufa,
-                              size_t offset, const void *src, size_t len,
-                              ssize_t pagecpy);
-
-extern void _ltt_relay_strncpy_fixup(struct ltt_chanbuf_alloc *bufa,
-                                    size_t offset, size_t len, size_t copied,
-                                    int terminated);
-
-extern int ltt_relay_read(struct ltt_chanbuf_alloc *bufa,
-                         size_t offset, void *dest, size_t len);
-
-extern int ltt_relay_read_cstr(struct ltt_chanbuf_alloc *bufa,
-                              size_t offset, void *dest, size_t len);
-
-extern struct page *ltt_relay_read_get_page(struct ltt_chanbuf_alloc *bufa,
-                                           size_t offset);
-
-/*
- * Return the address where a given offset is located.
- * Should be used to get the current subbuffer header pointer. Given we know
- * it's never on a page boundary, it's safe to write directly to this address,
- * as long as the write is never bigger than a page size.
- */
-extern void *ltt_relay_offset_address(struct ltt_chanbuf_alloc *bufa,
-                                     size_t offset);
-extern void *ltt_relay_read_offset_address(struct ltt_chanbuf_alloc *bufa,
-                                          size_t offset);
-
-#ifdef CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS
-static __inline__
-void ltt_relay_do_copy(void *dest, const void *src, size_t len)
-{
-       switch (len) {
-       case 0:
-               break;
-       case 1:
-               *(u8 *)dest = *(const u8 *)src;
-               break;
-       case 2:
-               *(u16 *)dest = *(const u16 *)src;
-               break;
-       case 4:
-               *(u32 *)dest = *(const u32 *)src;
-               break;
-       case 8:
-               *(u64 *)dest = *(const u64 *)src;
-               break;
-       default:
-               /*
-                * What we really want here is an __inline__ memcpy, but we don't
-                * have constants, so gcc generally uses a function call.
-                */
-               for (; len > 0; len--)
-                       *(u8 *)dest++ = *(const u8 *)src++;
-       }
-}
-#else
-/*
- * Returns whether the dest and src addresses are aligned on
- * min(sizeof(void *), len). Call this with statically known len for efficiency.
- */
-static __inline__
-int addr_aligned(const void *dest, const void *src, size_t len)
-{
-       if (ltt_align((size_t)dest, len))
-               return 0;
-       if (ltt_align((size_t)src, len))
-               return 0;
-       return 1;
-}
-
-static __inline__
-void ltt_relay_do_copy(void *dest, const void *src, size_t len)
-{
-       switch (len) {
-       case 0:
-               break;
-       case 1:
-               *(u8 *)dest = *(const u8 *)src;
-               break;
-       case 2:
-               if (unlikely(!addr_aligned(dest, src, 2)))
-                       goto memcpy_fallback;
-               *(u16 *)dest = *(const u16 *)src;
-               break;
-       case 4:
-               if (unlikely(!addr_aligned(dest, src, 4)))
-                       goto memcpy_fallback;
-               *(u32 *)dest = *(const u32 *)src;
-               break;
-       case 8:
-               if (unlikely(!addr_aligned(dest, src, 8)))
-                       goto memcpy_fallback;
-               *(u64 *)dest = *(const u64 *)src;
-               break;
-       default:
-               goto memcpy_fallback;
-       }
-       return;
-
-memcpy_fallback:
-       /*
-        * What we really want here is an inline memcpy, but we don't
-        * have constants, so gcc generally uses a function call.
-        */
-       for (; len > 0; len--)
-               *(u8 *)dest++ = *(const u8 *)src++;
-}
-#endif
-
-/*
- * ltt_relay_do_memset - write character into dest.
- * @dest: destination
- * @src: source character
- * @len: length to write
- */
-static __inline__
-void ltt_relay_do_memset(void *dest, char src, size_t len)
-{
-       /*
-        * What we really want here is an __inline__ memset, but we
-        * don't have constants, so gcc generally uses a function call.
-        */
-       for (; len > 0; len--)
-               *(u8 *)dest++ = src;
-}
-
-
-/*
- * ltt_relay_do_strncpy - copy a string up to a certain number of bytes
- * @dest: destination
- * @src: source
- * @len: max. length to copy
- * @terminated: output string ends with \0 (output)
- *
- * returns the number of bytes copied. Does not finalize with \0 if len is
- * reached.
- */
-static __inline__
-size_t ltt_relay_do_strncpy(void *dest, const void *src, size_t len,
-                           int *terminated)
-{
-       size_t orig_len = len;
-
-       *terminated = 0;
-       /*
-        * What we really want here is an __inline__ strncpy, but we
-        * don't have constants, so gcc generally uses a function call.
-        */
-       for (; len > 0; len--) {
-               *(u8 *)dest = ACCESS_ONCE(*(const u8 *)src);
-               /* Check with dest, because src may be modified concurrently */
-               if (*(const u8 *)dest == '\0') {
-                       len--;
-                       *terminated = 1;
-                       break;
-               }
-               dest++;
-               src++;
-       }
-       return orig_len - len;
-}
-
-static __inline__
-int ltt_relay_write(struct ltt_chanbuf_alloc *bufa,
-                   struct ltt_chan_alloc *chana, size_t offset,
-                   const void *src, size_t len)
-{
-       size_t sbidx, index;
-       ssize_t pagecpy;
-       struct chanbuf_page *rpages;
-
-       offset &= chana->buf_size - 1;
-       sbidx = offset >> chana->sb_size_order;
-       index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-       pagecpy = min_t(size_t, len, (- offset) & ~PAGE_MASK);
-       rpages = bufa->buf_wsb[sbidx].pages;
-       WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-       ltt_relay_do_copy(rpages[index].virt + (offset & ~PAGE_MASK),
-                         src, pagecpy);
-
-       if (unlikely(len != pagecpy))
-               _ltt_relay_write(bufa, offset, src, len, pagecpy);
-       return len;
-}
-
-static __inline__
-int ltt_relay_strncpy(struct ltt_chanbuf_alloc *bufa,
-                     struct ltt_chan_alloc *chana, size_t offset,
-                     const void *src, size_t len)
-{
-       size_t sbidx, index;
-       ssize_t pagecpy, copied;
-       struct chanbuf_page *rpages;
-       int terminated;
-
-       offset &= chana->buf_size - 1;
-       sbidx = offset >> chana->sb_size_order;
-       index = (offset & (chana->sb_size - 1)) >> PAGE_SHIFT;
-       pagecpy = min_t(size_t, len, (- offset) & ~PAGE_MASK);
-       rpages = bufa->buf_wsb[sbidx].pages;
-       WARN_ON_ONCE(RCHAN_SB_IS_NOREF(rpages));
-       copied = ltt_relay_do_strncpy(rpages[index].virt
-                                     + (offset & ~PAGE_MASK),
-                                     src, pagecpy, &terminated);
-       if (unlikely(copied < pagecpy || ((len == pagecpy) && !terminated)))
-               _ltt_relay_strncpy_fixup(bufa, offset, len, copied,
-                                        terminated);
-       else {
-               if (unlikely(len != pagecpy))
-                       _ltt_relay_strncpy(bufa, offset, src, len, pagecpy);
-       }
-       return len;
-}
-
-/**
- * ltt_clear_noref_flag - Clear the noref subbuffer flag, for writer.
- */
-static __inline__
-void ltt_clear_noref_flag(struct ltt_chanbuf_alloc *bufa, long idx)
-{
-       struct chanbuf_page *sb_pages, *new_sb_pages;
-
-       sb_pages = bufa->buf_wsb[idx].pages;
-       for (;;) {
-               if (!RCHAN_SB_IS_NOREF(sb_pages))
-                       return; /* Already writing to this buffer */
-               new_sb_pages = sb_pages;
-               RCHAN_SB_CLEAR_NOREF(new_sb_pages);
-               new_sb_pages = cmpxchg(&bufa->buf_wsb[idx].pages,
-                       sb_pages, new_sb_pages);
-               if (likely(new_sb_pages == sb_pages))
-                       break;
-               sb_pages = new_sb_pages;
-       }
-}
-
-/**
- * ltt_set_noref_flag - Set the noref subbuffer flag, for writer.
- */
-static __inline__
-void ltt_set_noref_flag(struct ltt_chanbuf_alloc *bufa, long idx)
-{
-       struct chanbuf_page *sb_pages, *new_sb_pages;
-
-       sb_pages = bufa->buf_wsb[idx].pages;
-       for (;;) {
-               if (RCHAN_SB_IS_NOREF(sb_pages))
-                       return; /* Already set */
-               new_sb_pages = sb_pages;
-               RCHAN_SB_SET_NOREF(new_sb_pages);
-               new_sb_pages = cmpxchg(&bufa->buf_wsb[idx].pages,
-                       sb_pages, new_sb_pages);
-               if (likely(new_sb_pages == sb_pages))
-                       break;
-               sb_pages = new_sb_pages;
-       }
-}
-
-/**
- * update_read_sb_index - Read-side subbuffer index update.
- */
-static __inline__
-int update_read_sb_index(struct ltt_chanbuf_alloc *bufa,
-                        struct ltt_chan_alloc *chana,
-                        long consumed_idx)
-{
-       struct chanbuf_page *old_wpage, *new_wpage;
-
-       if (unlikely(chana->extra_reader_sb)) {
-               /*
-                * Exchange the target writer subbuffer with our own unused
-                * subbuffer.
-                */
-               old_wpage = bufa->buf_wsb[consumed_idx].pages;
-               if (unlikely(!RCHAN_SB_IS_NOREF(old_wpage)))
-                       return -EAGAIN;
-               WARN_ON_ONCE(!RCHAN_SB_IS_NOREF(bufa->buf_rsb.pages));
-               new_wpage = cmpxchg(&bufa->buf_wsb[consumed_idx].pages,
-                               old_wpage,
-                               bufa->buf_rsb.pages);
-               if (unlikely(old_wpage != new_wpage))
-                       return -EAGAIN;
-               bufa->buf_rsb.pages = new_wpage;
-               RCHAN_SB_CLEAR_NOREF(bufa->buf_rsb.pages);
-       } else {
-               /* No page exchange, use the writer page directly */
-               bufa->buf_rsb.pages = bufa->buf_wsb[consumed_idx].pages;
-               RCHAN_SB_CLEAR_NOREF(bufa->buf_rsb.pages);
-       }
-       return 0;
-}
-
-ssize_t ltt_relay_file_splice_read(struct file *in, loff_t *ppos,
-                                  struct pipe_inode_info *pipe, size_t len,
-                                  unsigned int flags);
-loff_t ltt_relay_no_llseek(struct file *file, loff_t offset, int origin);
-
-extern int ltt_ascii_init(void);
-extern void ltt_ascii_exit(void);
-
-#endif /* _LINUX_LTT_RELAY_H */
diff --git a/ltt-ring-buffer-client.c b/ltt-ring-buffer-client.c
new file mode 100644 (file)
index 0000000..509fc5e
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+ * ltt-ring-buffer-client.c
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * LTTng lib ring buffer client.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/module.h>
+#include "ltt-tracer.h"
+
+struct ring_buffer_priv {
+       struct dentry *dentry;
+}
+
+struct channel_priv {
+       struct ltt_trace *trace;
+       struct ring_buffer_priv *buf;
+};
+
+static const struct lib_ring_buffer_config client_config;
+
+static u64 client_ring_buffer_clock_read(struct channel *chan)
+{
+       return lib_ring_buffer_clock_read(chan);
+}
+
+size_t client_record_header_size(const struct lib_ring_buffer_config *config,
+                                struct channel *chan, size_t offset,
+                                size_t data_size,
+                                size_t *pre_header_padding,
+                                unsigned int rflags,
+                                struct lib_ring_buffer_ctx *ctx)
+{
+       return record_header_size(config, chan, offset, data_size,
+                                 pre_header_padding, rflags, ctx);
+}
+
+/**
+ * client_subbuffer_header_size - called on buffer-switch to a new sub-buffer
+ *
+ * Return header size without padding after the structure. Don't use packed
+ * structure because gcc generates inefficient code on some architectures
+ * (powerpc, mips..)
+ */
+static size_t client_subbuffer_header_size(void)
+{
+       return offsetof(struct subbuffer_header, header_end);
+}
+
+static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
+                               unsigned int subbuf_idx)
+{
+       struct channel *chan = buf->backend.chan;
+       struct subbuffer_header *header =
+               (struct subbuffer_header *)
+                       lib_ring_buffer_offset_address(&buf->backend,
+                               subbuf_idx * chan->backend.subbuf_size);
+
+       header->cycle_count_begin = tsc;
+       header->data_size = 0xFFFFFFFF; /* for debugging */
+       write_trace_header(chan->backend.priv, header);
+}
+
+/*
+ * offset is assumed to never be 0 here : never deliver a completely empty
+ * subbuffer. data_size is between 1 and subbuf_size.
+ */
+static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
+                             unsigned int subbuf_idx, unsigned long data_size)
+{
+       struct channel *chan = buf->backend.chan;
+       struct subbuffer_header *header =
+               (struct subbuffer_header *)
+                       lib_ring_buffer_offset_address(&buf->backend,
+                               subbuf_idx * chan->backend.subbuf_size);
+       unsigned long records_lost = 0;
+
+       header->data_size = data_size;
+       header->subbuf_size = PAGE_ALIGN(data_size);
+       header->cycle_count_end = tsc;
+       records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf);
+       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
+       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
+       header->events_lost = records_lost;
+       header->subbuf_corrupt = 0;     /* deprecated */
+}
+
+static int client_buffer_create(struct lib_ring_buffer *buf, void *priv,
+                               int cpu, const char *name)
+{
+       struct channel_priv *chan_priv = priv;
+       struct ring_buffer_priv *buf_priv;
+       struct dentry *trace_dentry;
+       char *tmpname;
+       int ret = 0;
+
+       if (client_config.alloc == RING_BUFFER_ALLOC_PER_CPU)
+               buf_priv = per_cpu_ptr(chan_priv->buf, cpu);
+       else
+               buf_priv = chan_priv->buf;
+
+       tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
+       if (!tmpname) {
+               ret = -ENOMEM;
+               goto end;
+       }
+
+       snprintf(tmpname, NAME_MAX, "%s%s_%d",
+                (client_config.mode == RING_BUFFER_OVERWRITE) ?  : "",
+                name, cpu);
+
+       trace_dentry = chan_priv->trace->dentry.trace_root;
+       buf_priv->dentry = debugfs_create_file(tmpname, S_IRUSR, trace_dentry,
+                                              buf,
+                                              &lib_ring_buffer_file_operations);
+       if (!buf_priv->dentry) {
+               ret = -ENOMEM;
+               goto free_name;
+       }
+free_name:
+       kfree(tmpname);
+end:
+       return ret;
+}
+
+static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu)
+{
+       struct channel_priv *chan_priv = priv;
+       struct lib_ring_buffer_priv *buf_priv;
+
+       if (client_config.alloc == RING_BUFFER_ALLOC_PER_CPU)
+               buf_priv = per_cpu_ptr(chan_priv->buf, cpu);
+       else
+               buf_priv = chan_priv->buf;
+
+       debugfs_remove(buf_priv->dentry);
+}
+
+static const struct lib_ring_buffer_config client_config = {
+       .cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
+       .cb.record_header_size = client_record_header_size,
+       .cb.subbuffer_header_size = client_subbuffer_header_size,
+       .cb.buffer_begin = client_buffer_begin,
+       .cb.buffer_end = client_buffer_end,
+       .cb.buffer_create = client_buffer_create,
+       .cb.buffer_finalize = client_buffer_finalize,
+
+       .tsc_bits = 32,
+       .alloc = RING_BUFFER_ALLOC_PER_CPU,
+       .sync = RING_BUFFER_SYNC_PER_CPU,
+       .mode = RING_BUFFER_OVERWRITE,
+#ifdef RING_BUFFER_ALIGN
+       .align = RING_BUFFER_NATURAL,
+#else
+       .align = RING_BUFFER_PACKED,
+#endif
+       .backend = RING_BUFFER_PAGE,
+       .output = RING_BUFFER_SPLICE,
+       .oops = RING_BUFFER_OOPS_CONSISTENCY,
+       .ipi = RING_BUFFER_IPI_BARRIER,
+       .wakeup = RING_BUFFER_WAKEUP_BY_TIMER,
+};
+
+struct channel *ltt_channel_create(const char *name, struct ltt_trace *trace,
+                                  void *buf_addr,
+                                  size_t subbuf_size, size_t num_subbuf,
+                                  unsigned int switch_timer_interval,
+                                  unsigned int read_timer_interval)
+{
+       struct channel *chan;
+       struct chan_priv *chan_priv;
+
+       chan_priv = kzalloc(sizeof(struct chan_priv), GFP_KERNEL);
+       if (!chan_priv)
+               return NULL;
+       if (client_config.alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               chan_priv->buf = alloc_percpu(struct lib_ring_buffer_priv);
+               memset(chan_priv->buf, 0, sizeof(*chan_priv->buf));
+       } else
+               chan_priv->buf = kzalloc(sizeof(*chan_priv->buf), GFP_KERNEL)
+       if (!channel_priv->buf)
+               goto free_chan_priv;
+       chan_priv->trace = trace;
+       chan = channel_create(&client_config, name, chan_priv, buf_addr,
+                             subbuf_size, num_subbuf, switch_timer_interval,
+                             read_timer_interval);
+       if (!chan)
+               goto free_buf_priv;
+       return chan;
+
+free_buf_priv:
+       if (client_config.alloc == RING_BUFFER_ALLOC_PER_CPU)
+               free_percpu(chan_priv->buf);
+       else
+               kfree(chan_priv->buf);
+free_chan_priv:
+       kfree(chan_priv);
+       return NULL;
+}
+
+void ltt_channel_destroy(struct channel *chan)
+{
+       struct chan_priv *chan_priv = channel_get_private(chan);
+
+       channel_destroy(chan);
+       if (client_config.alloc == RING_BUFFER_ALLOC_PER_CPU)
+               free_percpu(chan_priv->buf);
+       else
+               kfree(chan_priv->buf);
+       kfree(chan_priv);
+}
+
+static void ltt_relay_remove_dirs(struct ltt_trace *trace)
+{
+#if 0
+       ltt_ascii_remove_dir(trace);
+#endif //0
+       debugfs_remove(trace->dentry.trace_root);
+}
+
+static int ltt_relay_create_dirs(struct ltt_trace *new_trace)
+{
+       struct dentry *ltt_root_dentry;
+       int ret;
+
+       ltt_root_dentry = get_ltt_root();
+       if (!ltt_root_dentry)
+               return ENOENT;
+
+       new_trace->dentry.trace_root = debugfs_create_dir(new_trace->trace_name,
+                                                         ltt_root_dentry);
+       put_ltt_root();
+       if (new_trace->dentry.trace_root == NULL) {
+               printk(KERN_ERR "LTT : Trace directory name %s already taken\n",
+                      new_trace->trace_name);
+               return EEXIST;
+       }
+#if 0
+       ret = ltt_ascii_create_dir(new_trace);
+       if (ret)
+               printk(KERN_WARNING "LTT : Unable to create ascii output file "
+                                   "for trace %s\n", new_trace->trace_name);
+#endif //0
+       return 0;
+}
+static struct ltt_transport ltt_relay_transport = {
+       .name = "relay",
+       .owner = THIS_MODULE,
+       .ops = {
+               .create_dirs = ltt_relay_create_dirs,
+               .remove_dirs = ltt_relay_remove_dirs,
+       },
+};
+
+int __init ltt_ring_buffer_client_init(void)
+{
+       printk(KERN_INFO "LTT : ltt ring buffer client init\n");
+       ltt_transport_register(&ltt_relay_transport);
+       return 0;
+}
+
+void __exit ltt_ring_buffer_client_exit(void)
+{
+       printk(KERN_INFO "LTT : ltt ring buffer client exit\n");
+       ltt_transport_unregister(&ltt_relay_transport);
+}
+
+MODULE_LICENSE("GPL and additional rights");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("LTTng Ring Buffer Client");
index 9d6d239b49c23cc8c1920aa65b74deecf359c697..0a02549d50e311fc78c508985642622d6fc68a50 100644 (file)
@@ -364,8 +364,8 @@ ssize_t channel_switch_timer_write(struct file *file,
        channel_name = file->f_dentry->d_parent->d_name.name;
        trace_name = file->f_dentry->d_parent->d_parent->d_parent->d_name.name;
 
-       /* Convert from ms to jiffies */
-       num = msecs_to_jiffies(num);
+       /* Convert from ms to us */
+       num *= 1000;
 
        err = ltt_trace_set_channel_switch_timer(trace_name, channel_name, num);
        if (IS_ERR_VALUE(err)) {
index 8eae966e386bb6e6f8cb9a09e5b291bb0d341e5d..5cdea9320bdfee6bcb5c96a6800c685b4c8ffab5 100644 (file)
@@ -1,14 +1,13 @@
 /*
  * ltt/ltt-tracer.c
  *
- * (C) Copyright       2005-2008 -
- *             Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
+ * Copyright (c) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
  * Tracing management internal kernel API. Trace buffer allocation/free, tracing
  * start/stop.
  *
  * Author:
- *     Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
+ *     Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
  * Inspired from LTT :
  *  Karim Yaghmour (karim@opersys.com)
@@ -244,9 +243,7 @@ int ltt_module_register(enum ltt_module_function name, void *function,
                ltt_statedump_owner = owner;
                break;
        }
-
 end:
-
        return ret;
 }
 EXPORT_SYMBOL_GPL(ltt_module_register);
@@ -348,48 +345,6 @@ int is_channel_overwrite(enum ltt_channels chan, enum trace_mode mode)
        }
 }
 
-static void trace_async_wakeup(struct ltt_trace *trace)
-{
-       int i;
-       struct ltt_chan *chan;
-
-       /* Must check each channel for pending read wakeup */
-       for (i = 0; i < trace->nr_channels; i++) {
-               chan = &trace->channels[i];
-               if (chan->active)
-                       trace->ops->wakeup_channel(chan);
-       }
-}
-
-/* Timer to send async wakeups to the readers */
-static void async_wakeup(unsigned long data)
-{
-       struct ltt_trace *trace;
-
-       /*
-        * PREEMPT_RT does not allow spinlocks to be taken within preempt
-        * disable sections (spinlock taken in wake_up). However, mainline won't
-        * allow mutex to be taken in interrupt context. Ugly.
-        * Take a standard RCU read lock for RT kernels, which imply that we
-        * also have to synchronize_rcu() upon updates.
-        */
-#ifndef CONFIG_PREEMPT_RT
-       rcu_read_lock_sched();
-#else
-       rcu_read_lock();
-#endif
-       list_for_each_entry_rcu(trace, &ltt_traces.head, list) {
-               trace_async_wakeup(trace);
-       }
-#ifndef CONFIG_PREEMPT_RT
-       rcu_read_unlock_sched();
-#else
-       rcu_read_unlock();
-#endif
-
-       mod_timer(&ltt_async_wakeup_timer, jiffies + LTT_PERCPU_TIMER_INTERVAL);
-}
-
 /**
  * _ltt_trace_find - find a trace by given name.
  * trace_name: trace name
@@ -434,7 +389,7 @@ void ltt_release_trace(struct kref *kref)
 
        trace->ops->remove_dirs(trace);
        module_put(trace->transport->owner);
-       ltt_channels_trace_free(trace->channels, trace->nr_channels);
+       ltt_channels_trace_free(trace);
        kfree(trace);
 }
 EXPORT_SYMBOL_GPL(ltt_release_trace);
@@ -485,9 +440,7 @@ int _ltt_trace_setup(const char *trace_name)
                goto traces_error;
        }
        strncpy(new_trace->trace_name, trace_name, NAME_MAX);
-       new_trace->channels = ltt_channels_trace_alloc(&new_trace->nr_channels,
-                                                      0, 1);
-       if (!new_trace->channels) {
+       if (ltt_channels_trace_alloc(&new_trace->nr_channels, 0)) {
                printk(KERN_ERR
                        "LTT : Unable to allocate memory for chaninfo  %s\n",
                        trace_name);
@@ -496,25 +449,21 @@ int _ltt_trace_setup(const char *trace_name)
        }
 
        /*
-        * Force metadata channel to active, no overwrite.
+        * Force metadata channel to no overwrite.
         */
        metadata_index = ltt_channels_get_index_from_name("metadata");
        WARN_ON(metadata_index < 0);
-       new_trace->channels[metadata_index].overwrite = 0;
-       new_trace->channels[metadata_index].active = 1;
+       new_trace->settings[metadata_index].overwrite = 0;
 
        /*
         * Set hardcoded tracer defaults for some channels
         */
        for (chan = 0; chan < new_trace->nr_channels; chan++) {
-               if (!(new_trace->channels[chan].active))
-                       continue;
-
                chantype = get_channel_type_from_name(
                        ltt_channels_get_name_from_index(chan));
-               new_trace->channels[chan].a.sb_size =
+               new_trace->settings[chan].sb_size =
                        chan_infos[chantype].def_sb_size;
-               new_trace->channels[chan].a.n_sb =
+               new_trace->settings[chan].n_sb =
                        chan_infos[chantype].def_n_sb;
        }
 
@@ -605,7 +554,7 @@ int ltt_trace_set_channel_subbufsize(const char *trace_name,
                err = -ENOENT;
                goto traces_error;
        }
-       trace->channels[index].a.sb_size = size;
+       trace->settings[index].sb_size = size;
 
 traces_error:
        ltt_unlock_traces();
@@ -636,7 +585,7 @@ int ltt_trace_set_channel_subbufcount(const char *trace_name,
                err = -ENOENT;
                goto traces_error;
        }
-       trace->channels[index].a.n_sb = cnt;
+       trace->settings[index].n_sb = cnt;
 
 traces_error:
        ltt_unlock_traces();
@@ -667,7 +616,7 @@ int ltt_trace_set_channel_switch_timer(const char *trace_name,
                err = -ENOENT;
                goto traces_error;
        }
-       ltt_channels_trace_set_timer(&trace->channels[index], interval);
+       ltt_channels_trace_set_timer(&trace->settings[index], interval);
 
 traces_error:
        ltt_unlock_traces();
@@ -675,47 +624,6 @@ traces_error:
 }
 EXPORT_SYMBOL_GPL(ltt_trace_set_channel_switch_timer);
 
-int ltt_trace_set_channel_enable(const char *trace_name,
-                                const char *channel_name, unsigned int enable)
-{
-       int err = 0;
-       struct ltt_trace *trace;
-       int index;
-
-       ltt_lock_traces();
-
-       trace = _ltt_trace_find_setup(trace_name);
-       if (!trace) {
-               printk(KERN_ERR "LTT : Trace not found %s\n", trace_name);
-               err = -ENOENT;
-               goto traces_error;
-       }
-
-       /*
-        * Datas in metadata channel(marker info) is necessary to be able to
-        * read the trace, we always enable this channel.
-        */
-       if (!enable && !strcmp(channel_name, "metadata")) {
-               printk(KERN_ERR "LTT : Trying to disable metadata channel\n");
-               err = -EINVAL;
-               goto traces_error;
-       }
-
-       index = ltt_channels_get_index_from_name(channel_name);
-       if (index < 0) {
-               printk(KERN_ERR "LTT : Channel %s not found\n", channel_name);
-               err = -ENOENT;
-               goto traces_error;
-       }
-
-       trace->channels[index].active = enable;
-
-traces_error:
-       ltt_unlock_traces();
-       return err;
-}
-EXPORT_SYMBOL_GPL(ltt_trace_set_channel_enable);
-
 int ltt_trace_set_channel_overwrite(const char *trace_name,
                                    const char *channel_name,
                                    unsigned int overwrite)
@@ -753,7 +661,7 @@ int ltt_trace_set_channel_overwrite(const char *trace_name,
                goto traces_error;
        }
 
-       trace->channels[index].overwrite = overwrite;
+       trace->settings[index].overwrite = overwrite;
 
 traces_error:
        ltt_unlock_traces();
@@ -811,23 +719,20 @@ int ltt_trace_alloc(const char *trace_name)
        local_irq_restore(flags);
 
        for (chan = 0; chan < trace->nr_channels; chan++) {
-               if (!(trace->channels[chan].active))
-                       continue;
-
                channel_name = ltt_channels_get_name_from_index(chan);
                WARN_ON(!channel_name);
                /*
                 * note: sb_size and n_sb will be overwritten with updated
                 * values by channel creation.
                 */
-               sb_size = trace->channels[chan].a.sb_size;
-               n_sb = trace->channels[chan].a.n_sb;
+               sb_size = trace->settings[chan].sb_size;
+               n_sb = trace->settings[chan].n_sb;
                prepare_chan_size_num(&sb_size, &n_sb);
-               err = trace->ops->create_channel(channel_name,
-                                     &trace->channels[chan],
-                                     trace->dentry.trace_root,
-                                     sb_size, n_sb,
-                                     trace->channels[chan].overwrite, trace);
+               trace->channels[chan] = ltt_create_channel(channel_name,
+                                             trace, NULL, sb_size, n_sb,
+                                             trace->settings[chan].overwrite,
+                                             trace->settings[chan].switch_timer_interval,
+                                             trace->settings[chan].read_timer_interval);
                if (err != 0) {
                        printk(KERN_ERR "LTT : Can't create channel %s.\n",
                                channel_name);
@@ -836,11 +741,8 @@ int ltt_trace_alloc(const char *trace_name)
        }
 
        list_del(&trace->list);
-       if (list_empty(&ltt_traces.head)) {
-               mod_timer(&ltt_async_wakeup_timer,
-                               jiffies + LTT_PERCPU_TIMER_INTERVAL);
+       if (list_empty(&ltt_traces.head))
                set_kernel_trace_flag_all_tasks();
-       }
        list_add_rcu(&trace->list, &ltt_traces.head);
        synchronize_trace();
 
@@ -849,13 +751,8 @@ int ltt_trace_alloc(const char *trace_name)
        return 0;
 
 create_channel_error:
-       for (chan--; chan >= 0; chan--) {
-               if (trace->channels[chan].active) {
-                       struct ltt_chan *chanp = &trace->channels[chan];
-                       trace->ops->remove_channel_files(chanp);
-                       kref_put(&chanp->a.kref, trace->ops->remove_channel);
-               }
-       }
+       for (chan--; chan >= 0; chan--)
+               ltt_channel_destroy(trace->channels[chan]);
        trace->ops->remove_dirs(trace);
 
 dirs_error:
@@ -918,12 +815,6 @@ static int _ltt_trace_destroy(struct ltt_trace *trace)
        synchronize_trace();
        if (list_empty(&ltt_traces.head)) {
                clear_kernel_trace_flag_all_tasks();
-               /*
-                * We stop the asynchronous delivery of reader wakeup, but
-                * we must make one last check for reader wakeups pending
-                * later in __ltt_trace_destroy.
-                */
-               del_timer_sync(&ltt_async_wakeup_timer);
        }
        return 0;
 
@@ -937,48 +828,9 @@ traces_error:
 static void __ltt_trace_destroy(struct ltt_trace *trace)
 {
        int i;
-       struct ltt_chan *chan;
-
-       for (i = 0; i < trace->nr_channels; i++) {
-               chan = &trace->channels[i];
-               if (chan->active)
-                       trace->ops->finish_channel(chan);
-       }
-
-       flush_scheduled_work();
-
-       /*
-        * The currently destroyed trace is not in the trace list anymore,
-        * so it's safe to call the async wakeup ourself. It will deliver
-        * the last subbuffers.
-        */
-       trace_async_wakeup(trace);
-
-       for (i = 0; i < trace->nr_channels; i++) {
-               chan = &trace->channels[i];
-               if (chan->active) {
-                       trace->ops->remove_channel_files(chan);
-                       kref_put(&chan->a.kref,
-                                trace->ops->remove_channel);
-               }
-       }
-
-       /*
-        * Wait for lttd readers to release the files, therefore making sure
-        * the last subbuffers have been read.
-        */
-       if (atomic_read(&trace->kref.refcount) > 1) {
-               int ret = 0;
-               /*
-                * Unlock traces and CPU hotplug while we wait for lttd to
-                * release the files.
-                */
-               ltt_unlock_traces();
-               __wait_event_interruptible(trace->kref_wq,
-                       (atomic_read(&trace->kref.refcount) == 1), ret);
-               ltt_lock_traces();
-       }
 
+       for (i = 0; i < trace->nr_channels; i++)
+               ltt_channel_destroy(trace->channels[i]);
        kref_put(&trace->kref, ltt_release_trace);
 }
 
@@ -1018,36 +870,6 @@ error:
 }
 EXPORT_SYMBOL_GPL(ltt_trace_destroy);
 
-/*
- * called with trace lock held.
- */
-static
-void ltt_channels_trace_start_timer(struct ltt_chan *channels,
-                                   unsigned int nr_channels)
-{
-       int i;
-
-       for (i = 0; i < nr_channels; i++) {
-               struct ltt_chan *chan = &channels[i];
-               chan->a.trace->ops->start_switch_timer(chan);
-       }
-}
-
-/*
- * called with trace lock held.
- */
-static
-void ltt_channels_trace_stop_timer(struct ltt_chan *channels,
-                                  unsigned int nr_channels)
-{
-       int i;
-
-       for (i = 0; i < nr_channels; i++) {
-               struct ltt_chan *chan = &channels[i];
-               chan->a.trace->ops->stop_switch_timer(chan);
-       }
-}
-
 /* must be called from within a traces lock. */
 static int _ltt_trace_start(struct ltt_trace *trace)
 {
@@ -1065,7 +887,6 @@ static int _ltt_trace_start(struct ltt_trace *trace)
                printk(KERN_ERR "LTT : Can't lock filter module.\n");
                goto get_ltt_run_filter_error;
        }
-       ltt_channels_trace_start_timer(trace->channels, trace->nr_channels);
        trace->active = 1;
        /* Read by trace points without protection : be careful */
        ltt_traces.num_active_traces++;
@@ -1132,8 +953,6 @@ static int _ltt_trace_stop(struct ltt_trace *trace)
                printk(KERN_INFO "LTT : Tracing not active for trace %s\n",
                                trace->trace_name);
        if (trace->active) {
-               ltt_channels_trace_stop_timer(trace->channels,
-                       trace->nr_channels);
                trace->active = 0;
                ltt_traces.num_active_traces--;
                synchronize_trace(); /* Wait for each tracing to be finished */
index 8d979b5b536ace8f142110f56fafdfd06ff2c6eb..ac90ee73c953cbe7965b7250782af6ca4524972e 100644 (file)
@@ -9,6 +9,13 @@
 #ifndef _LTT_TRACER_H
 #define _LTT_TRACER_H
 
+#ifndef CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS
+/* Align data on its natural alignment */
+#define RING_BUFFER_ALIGN
+#endif
+
+#include <linux/ringbuffer/config.h>
+
 #include <stdarg.h>
 #include <linux/types.h>
 #include <linux/limits.h>
 #include <linux/wait.h>
 #include <linux/marker.h>
 #include <linux/trace-clock.h>
-#include <linux/ltt-channels.h>
 #include <asm/atomic.h>
 #include <asm/local.h>
 
 #include "ltt-tracer-core.h"
-#include "ltt-relay.h"
+#include "ltt-channels.h"
 
 /* Number of bytes to log with a read/write event */
 #define LTT_LOG_RW_SIZE                        32L
 
-/* Interval (in jiffies) at which the LTT per-CPU timer fires */
-#define LTT_PERCPU_TIMER_INTERVAL      1
-
-#ifndef LTT_ARCH_TYPE
-#define LTT_ARCH_TYPE                  LTT_ARCH_TYPE_UNDEFINED
-#endif
-
-#ifndef LTT_ARCH_VARIANT
-#define LTT_ARCH_VARIANT               LTT_ARCH_VARIANT_NONE
-#endif
-
 struct ltt_active_marker;
 
 /* Maximum number of callbacks per marker */
@@ -129,29 +124,8 @@ struct user_dbg_data {
 };
 
 struct ltt_trace_ops {
-       /* First 32 bytes cache-hot cacheline */
-       void (*wakeup_channel) (struct ltt_chan *chan);
-       int (*user_blocking) (struct ltt_trace *trace, unsigned int index,
-                             size_t data_size, struct user_dbg_data *dbg);
-       /* End of first 32 bytes cacheline */
        int (*create_dirs) (struct ltt_trace *new_trace);
        void (*remove_dirs) (struct ltt_trace *new_trace);
-       int (*create_channel) (const char *channel_name, struct ltt_chan *chan,
-                              struct dentry *parent, size_t sb_size,
-                              size_t n_sb, int overwrite,
-                              struct ltt_trace *trace);
-       void (*finish_channel) (struct ltt_chan *chan);
-       void (*remove_channel) (struct kref *kref);
-       void (*remove_channel_files) (struct ltt_chan *chan);
-       void (*user_errors) (struct ltt_trace *trace, unsigned int index,
-                            size_t data_size, struct user_dbg_data *dbg,
-                            int cpu);
-       void (*start_switch_timer) (struct ltt_chan *chan);
-       void (*stop_switch_timer) (struct ltt_chan *chan);
-#ifdef CONFIG_HOTPLUG_CPU
-       int (*handle_cpuhp) (struct notifier_block *nb, unsigned long action,
-                            void *hcpu, struct ltt_trace *trace);
-#endif
 };
 
 struct ltt_transport {
@@ -170,7 +144,7 @@ enum trace_mode { LTT_TRACE_NORMAL, LTT_TRACE_FLIGHT, LTT_TRACE_HYBRID };
 struct ltt_trace {
        /* First 32 bytes cache-hot cacheline */
        struct list_head list;
-       struct ltt_chan *channels;
+       struct ltt_chan **channels;
        unsigned int nr_channels;
        int active;
        /* Second 32 bytes cache-hot cacheline */
@@ -192,7 +166,8 @@ struct ltt_trace {
        char trace_name[NAME_MAX];
 } ____cacheline_aligned;
 
-/* Hardcoded event headers
+/*
+ * Hardcoded event headers
  *
  * event header for a trace with active heartbeat : 27 bits timestamps
  *
@@ -217,7 +192,7 @@ struct ltt_trace {
 #define LTT_TSC_BITS           27
 #define LTT_TSC_MASK           ((1 << LTT_TSC_BITS) - 1)
 
-struct ltt_event_header {
+struct event_header {
        u32 id_time;            /* 5 bits event id (MSB); 27 bits time (LSB) */
 };
 
@@ -240,7 +215,7 @@ struct ltt_event_header {
  * because gcc generates poor code on at least powerpc and mips. Don't ever
  * let gcc add padding between the structure elements.
  */
-struct ltt_subbuffer_header {
+struct subbuffer_header {
        uint64_t cycle_count_begin;     /* Cycle count at subbuffer start */
        uint64_t cycle_count_end;       /* Cycle count at subbuffer end */
        uint32_t magic_number;          /*
@@ -273,23 +248,22 @@ struct ltt_subbuffer_header {
        uint8_t header_end[0];          /* End of header */
 };
 
-/**
- * ltt_sb_header_size - called on buffer-switch to a new sub-buffer
- *
- * Return header size without padding after the structure. Don't use packed
- * structure because gcc generates inefficient code on some architectures
- * (powerpc, mips..)
- */
-static __inline__ size_t ltt_sb_header_size(void)
+static inline notrace u64 lib_ring_buffer_clock_read(struct channel *chan)
 {
-       return offsetof(struct ltt_subbuffer_header, header_end);
+       return trace_clock_read64();
 }
 
 /*
- * ltt_get_header_size
+ * record_header_size - Calculate the header size and padding necessary.
+ * @config: ring buffer instance configuration
+ * @chan: channel
+ * @offset: offset in the write buffer
+ * @data_size: size of the payload
+ * @pre_header_padding: padding to add before the header (output)
+ * @rflags: reservation flags
+ * @ctx: reservation context
  *
- * Calculate alignment offset to 32-bits. This is the alignment offset of the
- * event header.
+ * Returns the event header size (including padding).
  *
  * Important note :
  * The event header must be 32-bits. The total offset calculated here :
@@ -304,20 +278,23 @@ static __inline__ size_t ltt_sb_header_size(void)
  *
  * The payload must itself determine its own alignment from the biggest type it
  * contains.
- * */
+ */
 static __inline__
-unsigned char ltt_get_header_size(struct ltt_chan *chan, size_t offset,
-                                 size_t data_size, size_t *before_hdr_pad,
-                                 unsigned int rflags)
+unsigned char record_header_size(const struct lib_ring_buffer_config *config,
+                                struct channel *chan, size_t offset,
+                                size_t data_size, size_t *pre_header_padding,
+                                unsigned int rflags,
+                                struct lib_ring_buffer_ctx *ctx)
 {
        size_t orig_offset = offset;
        size_t padding;
 
-       BUILD_BUG_ON(sizeof(struct ltt_event_header) != sizeof(u32));
+       BUILD_BUG_ON(sizeof(struct event_header) != sizeof(u32));
 
-       padding = ltt_align(offset, sizeof(struct ltt_event_header));
+       padding = lib_ring_buffer_align(config, offset,
+                                       sizeof(struct event_header));
        offset += padding;
-       offset += sizeof(struct ltt_event_header);
+       offset += sizeof(struct event_header);
 
        if (unlikely(rflags)) {
                switch (rflags) {
@@ -339,10 +316,12 @@ unsigned char ltt_get_header_size(struct ltt_chan *chan, size_t offset,
                }
        }
 
-       *before_hdr_pad = padding;
+       *pre_header_padding = padding;
        return offset - orig_offset;
 }
 
+#include <linux/ringbuffer/api.h>
+
 extern
 size_t ltt_write_event_header_slow(struct ltt_chanbuf_alloc *bufa,
                                   struct ltt_chan_alloc *chana,
@@ -354,39 +333,30 @@ size_t ltt_write_event_header_slow(struct ltt_chanbuf_alloc *bufa,
  *
  * Writes the event header to the offset (already aligned on 32-bits).
  *
- * @buf : buffer to write to.
- * @chan : pointer to the channel structure..
- * @buf_offset : buffer offset to write to (aligned on 32 bits).
+ * @config: ring buffer instance configuration
+ * @ctx: reservation context
  * @eID : event ID
  * @event_size : size of the event, excluding the event header.
- * @tsc : time stamp counter.
- * @rflags : reservation flags.
- *
- * returns : offset where the event data must be written.
  */
 static __inline__
-size_t ltt_write_event_header(struct ltt_chanbuf_alloc *bufa,
-                             struct ltt_chan_alloc *chana,
-                             long buf_offset, u16 eID, u32 event_size, u64 tsc,
-                             unsigned int rflags)
+void ltt_write_event_header(const struct lib_ring_buffer_config *config,
+                           struct lib_ring_buffer_ctx *ctx,
+                           u16 eID, u32 event_size)
 {
-       struct ltt_event_header header;
+       struct event_header header;
 
-       if (unlikely(rflags))
+       if (unlikely(ctx->rflags))
                goto slow_path;
 
        header.id_time = eID << LTT_TSC_BITS;
-       header.id_time |= (u32)tsc & LTT_TSC_MASK;
-       ltt_relay_write(bufa, chana, buf_offset, &header, sizeof(header));
-       buf_offset += sizeof(header);
-
-       return buf_offset;
+       header.id_time |= (u32)ctx->tsc & LTT_TSC_MASK;
+       lib_ring_buffer_write(config, ctx, &header, sizeof(header));
 
 slow_path:
-       return ltt_write_event_header_slow(bufa, chana, buf_offset,
-                                          eID, event_size, tsc, rflags);
+       ltt_write_event_header_slow(config, ctx, eID, event_size);
 }
 
+#if 0
 /*
  * ltt_read_event_header
  * buf_offset must aligned on 32 bits
@@ -448,25 +418,7 @@ size_t ltt_read_event_header(struct ltt_chanbuf_alloc *bufa, long buf_offset,
 
        return buf_offset;
 }
-
-/* Lockless LTTng */
-
-/* Buffer offset macros */
-
-/*
- * BUFFER_TRUNC zeroes the subbuffer offset and the subbuffer number parts of
- * the offset, which leaves only the buffer number.
- */
-#define BUFFER_TRUNC(offset, chan) \
-       ((offset) & (~((chan)->a.buf_size - 1)))
-#define BUFFER_OFFSET(offset, chan) ((offset) & ((chan)->a.buf_size - 1))
-#define SUBBUF_OFFSET(offset, chan) ((offset) & ((chan)->a.sb_size - 1))
-#define SUBBUF_ALIGN(offset, chan) \
-       (((offset) + (chan)->a.sb_size) & (~((chan)->a.sb_size - 1)))
-#define SUBBUF_TRUNC(offset, chan) \
-       ((offset) & (~((chan)->a.sb_size - 1)))
-#define SUBBUF_INDEX(offset, chan) \
-       (BUFFER_OFFSET((offset), chan) >> (chan)->a.sb_size_order)
+#endif //0
 
 /*
  * Control channels :
@@ -478,7 +430,6 @@ size_t ltt_read_event_header(struct ltt_chanbuf_alloc *bufa, long buf_offset,
  * cpu
  */
 #define LTT_RELAY_ROOT                 "ltt"
-#define LTT_RELAY_LOCKED_ROOT          "ltt-locked"
 
 #define LTT_METADATA_CHANNEL           "metadata_state"
 #define LTT_FD_STATE_CHANNEL           "fd_state"
@@ -515,18 +466,20 @@ size_t ltt_read_event_header(struct ltt_chanbuf_alloc *bufa, long buf_offset,
 
 /**
  * ltt_write_trace_header - Write trace header
- * @trace: Trace information
+ * @priv: Private data (struct trace)
  * @header: Memory address where the information must be written to
  */
 static __inline__
-void ltt_write_trace_header(struct ltt_trace *trace,
-                           struct ltt_subbuffer_header *header)
+void ltt_write_trace_header(void *priv,
+                           struct subbuffer_header *header)
 {
+       struct ltt_trace *trace = priv;
+
        header->magic_number = LTT_TRACER_MAGIC_NUMBER;
        header->major_version = LTT_TRACER_VERSION_MAJOR;
        header->minor_version = LTT_TRACER_VERSION_MINOR;
        header->arch_size = sizeof(void *);
-       header->alignment = ltt_get_alignment();
+       header->alignment = lib_ring_buffer_get_alignment();
        header->start_time_sec = trace->start_time.tv_sec;
        header->start_time_usec = trace->start_time.tv_usec;
        header->start_freq = trace->start_freq;
@@ -590,9 +543,6 @@ int ltt_trace_set_channel_subbufcount(const char *trace_name,
 int ltt_trace_set_channel_switch_timer(const char *trace_name,
                                       const char *channel_name,
                                       unsigned long interval);
-int ltt_trace_set_channel_enable(const char *trace_name,
-                                const char *channel_name,
-                                unsigned int enable);
 int ltt_trace_set_channel_overwrite(const char *trace_name,
                                    const char *channel_name,
                                    unsigned int overwrite);
@@ -657,17 +607,4 @@ static inline void ltt_dump_idt_table(void *call_data)
 }
 #endif
 
-/* Relay IOCTL */
-
-/* Get the next sub-buffer that can be read. */
-#define RELAY_GET_SB                   _IOR(0xF5, 0x00, __u32)
-/* Release the oldest reserved (by "get") sub-buffer. */
-#define RELAY_PUT_SB                   _IOW(0xF5, 0x01, __u32)
-/* returns the number of sub-buffers in the per cpu channel. */
-#define RELAY_GET_N_SB                 _IOR(0xF5, 0x02, __u32)
-/* returns the size of the current sub-buffer. */
-#define RELAY_GET_SB_SIZE              _IOR(0xF5, 0x03, __u32)
-/* returns the maximum size for sub-buffers. */
-#define RELAY_GET_MAX_SB_SIZE          _IOR(0xF5, 0x04, __u32)
-
 #endif /* _LTT_TRACER_H */
index 3e5cd3c3dc1a71d360aa5175bbacedc9fb4c3830..49712c829dc6ce3f1cb6dc350d6b588f0b8173d2 100644 (file)
@@ -42,35 +42,35 @@ struct serialize_long_int {
        unsigned long f1;
        unsigned int f2;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_int_int_long {
        unsigned int f1;
        unsigned int f2;
        unsigned long f3;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_int_int_short {
        unsigned int f1;
        unsigned int f2;
        unsigned short f3;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_long_long {
        unsigned long f1;
        unsigned long f2;
        unsigned long f3;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_long_int {
        unsigned long f1;
        unsigned long f2;
        unsigned int f3;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_long_short_char {
        unsigned long f1;
@@ -78,45 +78,45 @@ struct serialize_long_long_short_char {
        unsigned short f3;
        unsigned char f4;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_long_short {
        unsigned long f1;
        unsigned long f2;
        unsigned short f3;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_short_char {
        unsigned long f1;
        unsigned short f2;
        unsigned char f3;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_short {
        unsigned long f1;
        unsigned short f2;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_char {
        unsigned long f1;
        unsigned char f2;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_ifname {
        unsigned long f1;
        unsigned char f2[IFNAMSIZ];
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_sizet_int {
        size_t f1;
        unsigned int f2;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_long_sizet_int {
        unsigned long f1;
@@ -124,7 +124,7 @@ struct serialize_long_long_sizet_int {
        size_t f3;
        unsigned int f4;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_long_long_sizet_int_int {
        unsigned long f1;
@@ -133,7 +133,7 @@ struct serialize_long_long_sizet_int_int {
        unsigned int f4;
        unsigned int f5;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_l4421224411111 {
        unsigned long f1;
@@ -151,7 +151,7 @@ struct serialize_l4421224411111 {
        uint8_t f13;
        uint8_t f14;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_l214421224411111 {
        unsigned long f1;
@@ -171,7 +171,7 @@ struct serialize_l214421224411111 {
        uint8_t f15;
        uint8_t f16;
        uint8_t end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 
 struct serialize_l4412228 {
        unsigned long f1;
@@ -183,5 +183,5 @@ struct serialize_l4412228 {
        uint16_t f7;
        uint64_t f8;
        unsigned char end_field[0];
-} LTT_ALIGN;
+} RING_BUFFER_ALIGN_ATTR;
 #endif /* _LTT_TYPE_SERIALIZER_H */
This page took 0.132843 seconds and 4 git commands to generate.