//ust// #include <linux/cpu.h>
//ust// #include <linux/splice.h>
//ust// #include <linux/bitops.h>
-#include <sys/mman.h>
#include "kernelcompat.h"
+#include <sys/mman.h>
+#include <sys/ipc.h>
+#include <sys/shm.h>
#include "list.h"
#include "relay.h"
#include "channels.h"
static DEFINE_MUTEX(relay_channels_mutex);
static LIST_HEAD(relay_channels);
+
+static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf);
+
/**
* relay_alloc_buf - allocate a channel buffer
* @buf: the buffer struct
unsigned int n_pages;
struct buf_page *buf_page, *n;
- void *result;
+ void *ptr;
+ int result;
*size = PAGE_ALIGN(*size);
- /* Maybe do read-ahead */
- result = mmap(NULL, *size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS, -1, 0);
- if(result == MAP_FAILED) {
- PERROR("mmap");
+ result = buf->shmid = shmget(getpid(), *size, IPC_CREAT | IPC_EXCL | 0700);
+ if(buf->shmid == -1) {
+ PERROR("shmget");
+ return -1;
+ }
+
+ ptr = shmat(buf->shmid, NULL, 0);
+ if(ptr == (void *) -1) {
+ perror("shmat");
+ goto destroy_shmem;
+ }
+
+ /* Already mark the shared memory for destruction. This will occur only
+ * when all users have detached.
+ */
+ result = shmctl(buf->shmid, IPC_RMID, NULL);
+ if(result == -1) {
+ perror("shmctl");
return -1;
}
- buf->buf_data = result;
+ buf->buf_data = ptr;
buf->buf_size = *size;
return 0;
+
+ destroy_shmem:
+ result = shmctl(buf->shmid, IPC_RMID, NULL);
+ if(result == -1) {
+ perror("shmctl");
+ }
+
+ return -1;
}
/**
static void relay_remove_buf(struct kref *kref)
{
struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
- buf->chan->cb->remove_buf_file(buf);
+//ust// buf->chan->cb->remove_buf_file(buf);
relay_destroy_buf(buf);
}
/* Create file in fs */
//ust// dentry = chan->cb->create_buf_file(tmpname, chan->parent, S_IRUSR,
//ust// buf);
+
+ ltt_create_buf_file_callback(buf); // ust //
+
//ust// if (!dentry)
//ust// goto free_buf;
//ust//
//ust// buf->hpage[odd] = page = buf->wpage;
//ust// page = ltt_relay_cache_page(buf, &buf->hpage[odd], page, offset);
//ust// return page_address(page->page) + (offset & ~PAGE_MASK);
+ return ((char *)buf->buf_data)+offset;
return NULL;
}
//ust// EXPORT_SYMBOL_GPL(ltt_relay_offset_address);
#define printk_dbg(fmt, args...)
#endif
-/* LTTng lockless logging buffer info */
-struct ltt_channel_buf_struct {
- /* First 32 bytes cache-hot cacheline */
- local_t offset; /* Current offset in the buffer */
- local_t *commit_count; /* Commit count per sub-buffer */
- atomic_long_t consumed; /*
- * Current offset in the buffer
- * standard atomic access (shared)
- */
- unsigned long last_tsc; /*
- * Last timestamp written in the buffer.
- */
- /* End of first 32 bytes cacheline */
- atomic_long_t active_readers; /*
- * Active readers count
- * standard atomic access (shared)
- */
- local_t events_lost;
- local_t corrupted_subbuffers;
- spinlock_t full_lock; /*
- * buffer full condition spinlock, only
- * for userspace tracing blocking mode
- * synchronization with reader.
- */
-//ust// wait_queue_head_t write_wait; /*
-//ust// * Wait queue for blocking user space
-//ust// * writers
-//ust// */
- atomic_t wakeup_readers; /* Boolean : wakeup readers waiting ? */
-} ____cacheline_aligned;
-
/*
* Last TSC comparison functions. Check if the current TSC overflows
* LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
header->lost_size = SUBBUF_OFFSET((buf->chan->subbuf_size - offset),
buf->chan);
header->cycle_count_end = tsc;
- header->events_lost = ltt_buf->events_lost;
- header->subbuf_corrupt = ltt_buf->corrupted_subbuffers;
+ header->events_lost = local_read(<t_buf->events_lost);
+ header->subbuf_corrupt = local_read(<t_buf->corrupted_subbuffers);
+
+}
+
+void (*wake_consumer)(void *, int) = NULL;
+
+void relay_set_wake_consumer(void (*wake)(void *, int))
+{
+ wake_consumer = wake;
+}
+
+void relay_wake_consumer(void *arg, int finished)
+{
+ if(wake_consumer)
+ wake_consumer(arg, finished);
}
static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx,
struct ltt_channel_struct *channel =
(struct ltt_channel_struct *)buf->chan->private_data;
struct ltt_channel_buf_struct *ltt_buf = channel->buf;
+ int result;
- atomic_set(<t_buf->wakeup_readers, 1);
+ result = write(ltt_buf->data_ready_fd_write, "1", 1);
+ if(result == -1) {
+ PERROR("write (in ltt_relay_buffer_flush)");
+ ERR("this should never happen!");
+ }
+//ust// atomic_set(<t_buf->wakeup_readers, 1);
}
-static struct dentry *ltt_create_buf_file_callback(const char *filename,
- struct dentry *parent, int mode,
- struct rchan_buf *buf)
+static struct dentry *ltt_create_buf_file_callback(struct rchan_buf *buf)
{
struct ltt_channel_struct *ltt_chan;
int err;
//ust// if (!dentry)
//ust// goto error;
//ust// return dentry;
+ return NULL; //ust//
//ust//error:
ltt_relay_destroy_buffer(ltt_chan);
return NULL;
//ust// return mask;
//ust// }
-static int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
+int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
{
struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data;
long consumed_old, consumed_idx, commit_count, write_offset;
return 0;
}
-static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, u32 uconsumed_old)
+int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, u32 uconsumed_old)
{
long consumed_new, consumed_old;
consumed_old = consumed_old | uconsumed_old;
consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
- spin_lock(<t_buf->full_lock);
+//ust// spin_lock(<t_buf->full_lock);
if (atomic_long_cmpxchg(<t_buf->consumed, consumed_old,
consumed_new)
!= consumed_old) {
/* We have been pushed by the writer : the last
* buffer read _is_ corrupted! It can also
* happen if this is a buffer we never got. */
- spin_unlock(<t_buf->full_lock);
+//ust// spin_unlock(<t_buf->full_lock);
return -EIO;
} else {
/* tell the client that buffer is now unfull */
index = SUBBUF_INDEX(consumed_old, buf->chan);
data = BUFFER_OFFSET(consumed_old, buf->chan);
ltt_buf_unfull(buf, index, data);
- spin_unlock(<t_buf->full_lock);
+//ust// spin_unlock(<t_buf->full_lock);
}
return 0;
}
{
struct ltt_channel_buf_struct *ltt_buf = ltt_chan->buf;
unsigned int j;
+ int fds[2];
+ int result;
ltt_buf->commit_count =
- malloc(sizeof(ltt_buf->commit_count) * n_subbufs);
+ zmalloc(sizeof(ltt_buf->commit_count) * n_subbufs);
if (!ltt_buf->commit_count)
return -ENOMEM;
kref_get(&trace->kref);
kref_get(&trace->ltt_transport_kref);
kref_get(<t_chan->kref);
- ltt_buf->offset = ltt_subbuffer_header_size();
+ local_set(<t_buf->offset, ltt_subbuffer_header_size());
atomic_long_set(<t_buf->consumed, 0);
atomic_long_set(<t_buf->active_readers, 0);
for (j = 0; j < n_subbufs; j++)
local_set(<t_buf->commit_count[j], 0);
//ust// init_waitqueue_head(<t_buf->write_wait);
- atomic_set(<t_buf->wakeup_readers, 0);
- spin_lock_init(<t_buf->full_lock);
+//ust// atomic_set(<t_buf->wakeup_readers, 0);
+//ust// spin_lock_init(<t_buf->full_lock);
ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
- ltt_buf->commit_count[0] += ltt_subbuffer_header_size();
+ local_add(ltt_subbuffer_header_size(), <t_buf->commit_count[0]);
+
+ local_set(<t_buf->events_lost, 0);
+ local_set(<t_buf->corrupted_subbuffers, 0);
- ltt_buf->events_lost = 0;
- ltt_buf->corrupted_subbuffers = 0;
+ result = pipe(fds);
+ if(result == -1) {
+ PERROR("pipe");
+ return -1;
+ }
+ ltt_buf->data_ready_fd_read = fds[0];
+ ltt_buf->data_ready_fd_write = fds[1];
return 0;
}
*/
static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf)
{
+ struct ltt_channel_struct *channel =
+ (struct ltt_channel_struct *)buf->chan->private_data;
+ struct ltt_channel_buf_struct *ltt_buf = channel->buf;
+ int result;
+
buf->finalized = 1;
ltt_force_switch(buf, FORCE_FLUSH);
+
+ result = write(ltt_buf->data_ready_fd_write, "1", 1);
+ if(result == -1) {
+ PERROR("write (in ltt_relay_buffer_flush)");
+ ERR("this should never happen!");
+ }
}
static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel)
static void ltt_relay_finish_buffer(struct ltt_channel_struct *ltt_channel)
{
struct rchan *rchan = ltt_channel->trans_channel_data;
+ int result;
if (rchan->buf) {
struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf;
ltt_relay_buffer_flush(rchan->buf);
//ust// ltt_relay_wake_writers(ltt_buf);
+ /* closing the pipe tells the consumer the buffer is finished */
+
+ //result = write(ltt_buf->data_ready_fd_write, "D", 1);
+ //if(result == -1) {
+ // PERROR("write (in ltt_relay_finish_buffer)");
+ // ERR("this should never happen!");
+ //}
+ close(ltt_buf->data_ready_fd_write);
}
}
static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace,
struct ltt_channel_struct *ltt_channel, void **transport_data,
size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
- unsigned int *rflags, int largest_align, int cpu)
+ unsigned int *rflags, int largest_align)
{
struct rchan *rchan = ltt_channel->trans_channel_data;
struct rchan_buf *buf = *transport_data = rchan->buf;
static void ltt_relay_print_user_errors(struct ltt_trace_struct *trace,
unsigned int chan_index, size_t data_size,
- struct user_dbg_data *dbg, int cpu)
+ struct user_dbg_data *dbg)
{
struct rchan *rchan;
struct ltt_channel_buf_struct *ltt_buf;
//ust// return 0;
//ust// }
-void init_ustrelay_transport(void)
+static char initialized = 0;
+
+void __attribute__((constructor)) init_ustrelay_transport(void)
{
- ltt_transport_register(&ust_relay_transport);
+ if(!initialized) {
+ ltt_transport_register(&ust_relay_transport);
+ initialized = 1;
+ }
}
static void __exit ltt_relay_exit(void)