lttng-modules v0.19-stable: setup_trace_write: Fix recursive locking
[lttng-modules.git] / ltt-relay-lockless.c
CommitLineData
1c8284eb
MD
1/*
2 * ltt/ltt-relay-lockless.c
3 *
4 * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
5 *
6 * LTTng lockless buffer space management (reader/writer).
7 *
8 * Author:
9 * Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
10 *
11 * Inspired from LTT :
12 * Karim Yaghmour (karim@opersys.com)
13 * Tom Zanussi (zanussi@us.ibm.com)
14 * Bob Wisniewski (bob@watson.ibm.com)
15 * And from K42 :
16 * Bob Wisniewski (bob@watson.ibm.com)
17 *
18 * Changelog:
19 * 08/10/08, Cleanup.
20 * 19/10/05, Complete lockless mechanism.
21 * 27/05/05, Modular redesign and rewrite.
22 *
23 * Userspace reader semantic :
24 * while (poll fd != POLLHUP) {
25 * - ioctl RELAY_GET_SUBBUF_SIZE
26 * while (1) {
27 * - ioctl GET_SUBBUF
28 * - splice 1 subbuffer worth of data to a pipe
29 * - splice the data from pipe to disk/network
30 * - ioctl PUT_SUBBUF, check error value
31 * if err val < 0, previous subbuffer was corrupted.
32 * }
33 * }
34 *
35 * Dual LGPL v2.1/GPL v2 license.
36 */
37
38#include <linux/time.h>
39#include <linux/module.h>
40#include <linux/string.h>
41#include <linux/slab.h>
42#include <linux/init.h>
43#include <linux/rcupdate.h>
44#include <linux/timer.h>
45#include <linux/sched.h>
46#include <linux/bitops.h>
47#include <linux/smp_lock.h>
48#include <linux/stat.h>
49#include <linux/cpu.h>
50#include <linux/idle.h>
51#include <linux/delay.h>
52#include <linux/notifier.h>
53#include <asm/atomic.h>
54#include <asm/local.h>
55
56#include "ltt-tracer.h"
57#include "ltt-relay.h"
58#include "ltt-relay-lockless.h"
59
60#if 0
61#define printk_dbg(fmt, args...) printk(fmt, args)
62#else
63#define printk_dbg(fmt, args...)
64#endif
65
66struct ltt_reserve_switch_offsets {
67 long begin, end, old;
68 long begin_switch, end_switch_current, end_switch_old;
69 size_t before_hdr_pad, size;
70};
71
72static
73void ltt_force_switch(struct ltt_chanbuf *buf, enum force_switch_mode mode);
74
75static
76void ltt_relay_print_buffer_errors(struct ltt_chan *chan, unsigned int cpu);
77
78static const struct file_operations ltt_file_operations;
79
80static
81void ltt_buffer_begin(struct ltt_chanbuf *buf, u64 tsc, unsigned int subbuf_idx)
82{
83 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
84 struct ltt_subbuffer_header *header =
85 (struct ltt_subbuffer_header *)
86 ltt_relay_offset_address(&buf->a,
87 subbuf_idx * chan->a.sb_size);
88
89 header->cycle_count_begin = tsc;
90 header->data_size = 0xFFFFFFFF; /* for debugging */
91 ltt_write_trace_header(chan->a.trace, header);
92}
93
94/*
95 * offset is assumed to never be 0 here : never deliver a completely empty
96 * subbuffer. The lost size is between 0 and subbuf_size-1.
97 */
98static
99void ltt_buffer_end(struct ltt_chanbuf *buf, u64 tsc, unsigned int offset,
100 unsigned int subbuf_idx)
101{
102 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
103 struct ltt_subbuffer_header *header =
104 (struct ltt_subbuffer_header *)
105 ltt_relay_offset_address(&buf->a,
106 subbuf_idx * chan->a.sb_size);
107 u32 data_size = SUBBUF_OFFSET(offset - 1, chan) + 1;
108
109 header->data_size = data_size;
110 header->sb_size = PAGE_ALIGN(data_size);
111 header->cycle_count_end = tsc;
112 header->events_lost = local_read(&buf->events_lost);
113 header->subbuf_corrupt = local_read(&buf->corrupted_subbuffers);
114}
115
116/*
117 * Must be called under trace lock or cpu hotplug protection.
118 */
119void ltt_chanbuf_free(struct ltt_chanbuf *buf)
120{
121 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
122
123 ltt_relay_print_buffer_errors(chan, buf->a.cpu);
2e6246b4 124#ifdef LTT_VMCORE
1c8284eb
MD
125 kfree(buf->commit_seq);
126#endif
127 kfree(buf->commit_count);
128
129 ltt_chanbuf_alloc_free(&buf->a);
130}
131
132/*
133 * Must be called under trace lock or cpu hotplug protection.
134 */
135int ltt_chanbuf_create(struct ltt_chanbuf *buf, struct ltt_chan_alloc *chana,
136 int cpu)
137{
138 struct ltt_chan *chan = container_of(chana, struct ltt_chan, a);
139 struct ltt_trace *trace = chana->trace;
140 unsigned int j, n_sb;
141 int ret;
142
143 /* Test for cpu hotplug */
144 if (buf->a.allocated)
145 return 0;
146
147 ret = ltt_chanbuf_alloc_create(&buf->a, &chan->a, cpu);
148 if (ret)
149 return ret;
150
151 buf->commit_count =
152 kzalloc_node(ALIGN(sizeof(*buf->commit_count) * chan->a.n_sb,
153 1 << INTERNODE_CACHE_SHIFT),
154 GFP_KERNEL, cpu_to_node(cpu));
155 if (!buf->commit_count) {
156 ret = -ENOMEM;
157 goto free_chanbuf;
158 }
159
2e6246b4 160#ifdef LTT_VMCORE
1c8284eb
MD
161 buf->commit_seq =
162 kzalloc_node(ALIGN(sizeof(*buf->commit_seq) * chan->a.n_sb,
163 1 << INTERNODE_CACHE_SHIFT),
164 GFP_KERNEL, cpu_to_node(cpu));
165 if (!buf->commit_seq) {
166 kfree(buf->commit_count);
167 ret = -ENOMEM;
168 goto free_commit;
169 }
170#endif
171
172 local_set(&buf->offset, ltt_sb_header_size());
173 atomic_long_set(&buf->consumed, 0);
174 atomic_long_set(&buf->active_readers, 0);
175 n_sb = chan->a.n_sb;
176 for (j = 0; j < n_sb; j++) {
177 local_set(&buf->commit_count[j].cc, 0);
178 local_set(&buf->commit_count[j].cc_sb, 0);
179 local_set(&buf->commit_count[j].events, 0);
180 }
181 init_waitqueue_head(&buf->write_wait);
182 init_waitqueue_head(&buf->read_wait);
183 spin_lock_init(&buf->full_lock);
184
185 RCHAN_SB_CLEAR_NOREF(buf->a.buf_wsb[0].pages);
186 ltt_buffer_begin(buf, trace->start_tsc, 0);
187 /* atomic_add made on local variable on data that belongs to
188 * various CPUs : ok because tracing not started (for this cpu). */
189 local_add(ltt_sb_header_size(), &buf->commit_count[0].cc);
190
191 local_set(&buf->events_lost, 0);
192 local_set(&buf->corrupted_subbuffers, 0);
193 buf->finalized = 0;
194
195 ret = ltt_chanbuf_create_file(chan->a.filename, chan->a.parent,
196 S_IRUSR, buf);
197 if (ret)
198 goto free_init;
199
200 /*
201 * Ensure the buffer is ready before setting it to allocated.
202 * Used for cpu hotplug vs async wakeup.
203 */
204 smp_wmb();
205 buf->a.allocated = 1;
206
207 return 0;
208
209 /* Error handling */
210free_init:
2e6246b4 211#ifdef LTT_VMCORE
1c8284eb
MD
212 kfree(buf->commit_seq);
213free_commit:
214#endif
215 kfree(buf->commit_count);
216free_chanbuf:
217 ltt_chanbuf_alloc_free(&buf->a);
218 return ret;
219}
220
221void ltt_chan_remove_files(struct ltt_chan *chan)
222{
223 ltt_ascii_remove(chan);
224 ltt_chan_alloc_remove_files(&chan->a);
225}
226EXPORT_SYMBOL_GPL(ltt_chan_remove_files);
227
228
229void ltt_chan_free(struct kref *kref)
230{
231 struct ltt_chan *chan = container_of(kref, struct ltt_chan, a.kref);
232
233 ltt_chan_alloc_free(&chan->a);
234}
235EXPORT_SYMBOL_GPL(ltt_chan_free);
236
237/**
238 * ltt_chan_create - Create channel.
239 */
240int ltt_chan_create(const char *base_filename,
241 struct ltt_chan *chan, struct dentry *parent,
242 size_t sb_size, size_t n_sb,
243 int overwrite, struct ltt_trace *trace)
244{
245 int ret;
246
247 chan->overwrite = overwrite;
248
249 ret = ltt_chan_alloc_init(&chan->a, trace, base_filename, parent,
250 sb_size, n_sb, overwrite, overwrite);
251 if (ret)
252 goto error;
253
254 chan->commit_count_mask = (~0UL >> chan->a.n_sb_order);
255
256 ret = ltt_ascii_create(chan);
257 if (ret)
258 goto error_chan_alloc_free;
259
260 return ret;
261
262error_chan_alloc_free:
263 ltt_chan_alloc_free(&chan->a);
264error:
265 return ret;
266}
267EXPORT_SYMBOL_GPL(ltt_chan_create);
268
269int ltt_chanbuf_open_read(struct ltt_chanbuf *buf)
270{
271 kref_get(&buf->a.chan->kref);
272 if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) {
273 kref_put(&buf->a.chan->kref, ltt_chan_free);
274 return -EBUSY;
275 }
276
277 return 0;
278}
279EXPORT_SYMBOL_GPL(ltt_chanbuf_open_read);
280
281void ltt_chanbuf_release_read(struct ltt_chanbuf *buf)
282{
283 //ltt_relay_destroy_buffer(&buf->a.chan->a, buf->a.cpu);
284 WARN_ON(atomic_long_read(&buf->active_readers) != 1);
285 atomic_long_dec(&buf->active_readers);
286 kref_put(&buf->a.chan->kref, ltt_chan_free);
287}
288EXPORT_SYMBOL_GPL(ltt_chanbuf_release_read);
289
290/*
291 * Wake writers :
292 *
293 * This must be done after the trace is removed from the RCU list so that there
294 * are no stalled writers.
295 */
296static void ltt_relay_wake_writers(struct ltt_chanbuf *buf)
297{
298
299 if (waitqueue_active(&buf->write_wait))
300 wake_up_interruptible(&buf->write_wait);
301}
302
303/*
304 * This function should not be called from NMI interrupt context
305 */
306static void ltt_buf_unfull(struct ltt_chanbuf *buf)
307{
308 ltt_relay_wake_writers(buf);
309}
310
311/*
312 * Promote compiler barrier to a smp_mb().
313 * For the specific LTTng case, this IPI call should be removed if the
314 * architecture does not reorder writes. This should eventually be provided by
315 * a separate architecture-specific infrastructure.
316 */
317static void remote_mb(void *info)
318{
319 smp_mb();
320}
321
322int ltt_chanbuf_get_subbuf(struct ltt_chanbuf *buf, unsigned long *consumed)
323{
324 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
325 long consumed_old, consumed_idx, commit_count, write_offset;
326 int ret;
327
328 consumed_old = atomic_long_read(&buf->consumed);
329 consumed_idx = SUBBUF_INDEX(consumed_old, chan);
330 commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb);
331 /*
332 * Make sure we read the commit count before reading the buffer
333 * data and the write offset. Correct consumed offset ordering
334 * wrt commit count is insured by the use of cmpxchg to update
335 * the consumed offset.
336 * smp_call_function_single can fail if the remote CPU is offline,
337 * this is OK because then there is no wmb to execute there.
338 * If our thread is executing on the same CPU as the on the buffers
339 * belongs to, we don't have to synchronize it at all. If we are
340 * migrated, the scheduler will take care of the memory barriers.
341 * Normally, smp_call_function_single() should ensure program order when
342 * executing the remote function, which implies that it surrounds the
343 * function execution with :
344 * smp_mb()
345 * send IPI
346 * csd_lock_wait
347 * recv IPI
348 * smp_mb()
349 * exec. function
350 * smp_mb()
351 * csd unlock
352 * smp_mb()
353 *
354 * However, smp_call_function_single() does not seem to clearly execute
355 * such barriers. It depends on spinlock semantic to provide the barrier
356 * before executing the IPI and, when busy-looping, csd_lock_wait only
357 * executes smp_mb() when it has to wait for the other CPU.
358 *
359 * I don't trust this code. Therefore, let's add the smp_mb() sequence
360 * required ourself, even if duplicated. It has no performance impact
361 * anyway.
362 *
363 * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
364 * read and write vs write. They do not ensure core synchronization. We
365 * really have to ensure total order between the 3 barriers running on
366 * the 2 CPUs.
367 */
368#ifdef LTT_NO_IPI_BARRIER
369 /*
370 * Local rmb to match the remote wmb to read the commit count before the
371 * buffer data and the write offset.
372 */
373 smp_rmb();
374#else
375 if (raw_smp_processor_id() != buf->a.cpu) {
376 smp_mb(); /* Total order with IPI handler smp_mb() */
377 smp_call_function_single(buf->a.cpu, remote_mb, NULL, 1);
378 smp_mb(); /* Total order with IPI handler smp_mb() */
379 }
380#endif
381 write_offset = local_read(&buf->offset);
382 /*
383 * Check that the subbuffer we are trying to consume has been
384 * already fully committed.
385 */
386 if (((commit_count - chan->a.sb_size)
387 & chan->commit_count_mask)
388 - (BUFFER_TRUNC(consumed_old, chan)
389 >> chan->a.n_sb_order)
390 != 0) {
391 return -EAGAIN;
392 }
393 /*
394 * Check that we are not about to read the same subbuffer in
395 * which the writer head is.
396 */
397 if ((SUBBUF_TRUNC(write_offset, chan)
398 - SUBBUF_TRUNC(consumed_old, chan))
399 == 0) {
400 return -EAGAIN;
401 }
402
403 ret = update_read_sb_index(&buf->a, &chan->a, consumed_idx);
404 if (ret)
405 return ret;
406
407 *consumed = consumed_old;
408 return 0;
409}
410EXPORT_SYMBOL_GPL(ltt_chanbuf_get_subbuf);
411
412int ltt_chanbuf_put_subbuf(struct ltt_chanbuf *buf, unsigned long consumed)
413{
414 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
415 long consumed_new, consumed_old;
416
417 WARN_ON(atomic_long_read(&buf->active_readers) != 1);
418
419 consumed_old = consumed;
420 consumed_new = SUBBUF_ALIGN(consumed_old, chan);
421 WARN_ON_ONCE(RCHAN_SB_IS_NOREF(buf->a.buf_rsb.pages));
422 RCHAN_SB_SET_NOREF(buf->a.buf_rsb.pages);
423
424 spin_lock(&buf->full_lock);
425 if (atomic_long_cmpxchg(&buf->consumed, consumed_old, consumed_new)
426 != consumed_old) {
427 /* We have been pushed by the writer. */
428 spin_unlock(&buf->full_lock);
429 /*
430 * We exchanged the subbuffer pages. No corruption possible
431 * even if the writer did push us. No more -EIO possible.
432 */
433 return 0;
434 } else {
435 /* tell the client that buffer is now unfull */
436 int index;
437 long data;
438 index = SUBBUF_INDEX(consumed_old, chan);
439 data = BUFFER_OFFSET(consumed_old, chan);
440 ltt_buf_unfull(buf);
441 spin_unlock(&buf->full_lock);
442 }
443 return 0;
444}
445EXPORT_SYMBOL_GPL(ltt_chanbuf_put_subbuf);
446
447static void switch_buffer(unsigned long data)
448{
449 struct ltt_chanbuf *buf = (struct ltt_chanbuf *)data;
450 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
451
452 /*
453 * Only flush buffers periodically if readers are active.
454 */
455 if (atomic_long_read(&buf->active_readers))
456 ltt_force_switch(buf, FORCE_ACTIVE);
457
458 mod_timer_pinned(&buf->switch_timer,
459 jiffies + chan->switch_timer_interval);
460}
461
462static void ltt_chanbuf_start_switch_timer(struct ltt_chanbuf *buf)
463{
464 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
465
466 if (!chan->switch_timer_interval)
467 return;
468
469 init_timer_deferrable(&buf->switch_timer);
470 buf->switch_timer.function = switch_buffer;
471 buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
472 buf->switch_timer.data = (unsigned long)buf;
473 add_timer_on(&buf->switch_timer, buf->a.cpu);
474}
475
476/*
477 * called with ltt traces lock held.
478 */
479void ltt_chan_start_switch_timer(struct ltt_chan *chan)
480{
481 int cpu;
482
483 if (!chan->switch_timer_interval)
484 return;
485
486 for_each_online_cpu(cpu) {
487 struct ltt_chanbuf *buf;
488
489 buf = per_cpu_ptr(chan->a.buf, cpu);
490 ltt_chanbuf_start_switch_timer(buf);
491 }
492}
493
494static void ltt_chanbuf_stop_switch_timer(struct ltt_chanbuf *buf)
495{
496 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
497
498 if (!chan->switch_timer_interval)
499 return;
500
501 del_timer_sync(&buf->switch_timer);
502}
503
504/*
505 * called with ltt traces lock held.
506 */
507void ltt_chan_stop_switch_timer(struct ltt_chan *chan)
508{
509 int cpu;
510
511 if (!chan->switch_timer_interval)
512 return;
513
514 for_each_online_cpu(cpu) {
515 struct ltt_chanbuf *buf;
516
517 buf = per_cpu_ptr(chan->a.buf, cpu);
518 ltt_chanbuf_stop_switch_timer(buf);
519 }
520}
521
522static void ltt_chanbuf_idle_switch(struct ltt_chanbuf *buf)
523{
524 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
525
526 if (chan->switch_timer_interval)
527 ltt_force_switch(buf, FORCE_ACTIVE);
528}
529
530/*
531 * ltt_chanbuf_switch is called from a remote CPU to ensure that the buffers of
532 * a cpu which went down are flushed. Note that if we execute concurrently
533 * with trace allocation, a buffer might appear be unallocated (because it
534 * detects that the target CPU is offline).
535 */
536static void ltt_chanbuf_switch(struct ltt_chanbuf *buf)
537{
538 if (buf->a.allocated)
539 ltt_force_switch(buf, FORCE_ACTIVE);
540}
541
542/**
543 * ltt_chanbuf_hotcpu_callback - CPU hotplug callback
544 * @nb: notifier block
545 * @action: hotplug action to take
546 * @hcpu: CPU number
547 *
548 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
549 */
550static
551int ltt_chanbuf_hotcpu_callback(struct notifier_block *nb,
552 unsigned long action,
553 void *hcpu)
554{
555 unsigned int cpu = (unsigned long)hcpu;
556
557 switch (action) {
558 case CPU_DOWN_FAILED:
559 case CPU_DOWN_FAILED_FROZEN:
560 case CPU_ONLINE:
561 case CPU_ONLINE_FROZEN:
562 /*
563 * CPU hotplug lock protects trace lock from this callback.
564 */
565 ltt_chan_for_each_channel(ltt_chanbuf_start_switch_timer, cpu);
566 return NOTIFY_OK;
567
568 case CPU_DOWN_PREPARE:
569 case CPU_DOWN_PREPARE_FROZEN:
570 /*
571 * Performs an IPI to delete the timer locally on the target
572 * CPU. CPU hotplug lock protects trace lock from this
573 * callback.
574 */
575 ltt_chan_for_each_channel(ltt_chanbuf_stop_switch_timer, cpu);
576 return NOTIFY_OK;
577
578 case CPU_DEAD:
579 case CPU_DEAD_FROZEN:
580 /*
581 * Performing a buffer switch on a remote CPU. Performed by
582 * the CPU responsible for doing the hotunplug after the target
583 * CPU stopped running completely. Ensures that all data
584 * from that remote CPU is flushed. CPU hotplug lock protects
585 * trace lock from this callback.
586 */
587 ltt_chan_for_each_channel(ltt_chanbuf_switch, cpu);
588 return NOTIFY_OK;
589
590 default:
591 return NOTIFY_DONE;
592 }
593}
594
595static int pm_idle_entry_callback(struct notifier_block *self,
596 unsigned long val, void *data)
597{
598 if (val == IDLE_START) {
599 rcu_read_lock_sched_notrace();
600 ltt_chan_for_each_channel(ltt_chanbuf_idle_switch,
601 smp_processor_id());
602 rcu_read_unlock_sched_notrace();
603 }
604 return 0;
605}
606
607struct notifier_block pm_idle_entry_notifier = {
608 .notifier_call = pm_idle_entry_callback,
609 .priority = ~0U, /* smallest prio, run after tracing events */
610};
611
612static
613void ltt_relay_print_written(struct ltt_chan *chan, long cons_off,
614 unsigned int cpu)
615{
616 struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
617 long cons_idx, events_count;
618
619 cons_idx = SUBBUF_INDEX(cons_off, chan);
620 events_count = local_read(&buf->commit_count[cons_idx].events);
621
622 if (events_count)
623 printk(KERN_INFO
624 "LTT: %lu events written in channel %s "
625 "(cpu %u, index %lu)\n",
626 events_count, chan->a.filename, cpu, cons_idx);
627}
628
629static
630void ltt_relay_print_subbuffer_errors(struct ltt_chanbuf *buf,
631 struct ltt_chan *chan, long cons_off,
632 unsigned int cpu)
633{
634 long cons_idx, commit_count, commit_count_sb, write_offset;
635
636 cons_idx = SUBBUF_INDEX(cons_off, chan);
637 commit_count = local_read(&buf->commit_count[cons_idx].cc);
638 commit_count_sb = local_read(&buf->commit_count[cons_idx].cc_sb);
639 /*
640 * No need to order commit_count and write_offset reads because we
641 * execute after trace is stopped when there are no readers left.
642 */
643 write_offset = local_read(&buf->offset);
644 printk(KERN_WARNING
645 "LTT : unread channel %s offset is %ld "
646 "and cons_off : %ld (cpu %u)\n",
647 chan->a.filename, write_offset, cons_off, cpu);
648 /* Check each sub-buffer for non filled commit count */
649 if (((commit_count - chan->a.sb_size) & chan->commit_count_mask)
650 - (BUFFER_TRUNC(cons_off, chan) >> chan->a.n_sb_order)
651 != 0)
652 printk(KERN_ALERT
653 "LTT : %s : subbuffer %lu has non filled "
654 "commit count [cc, cc_sb] [%lu,%lu].\n",
655 chan->a.filename, cons_idx, commit_count,
656 commit_count_sb);
657 printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %lu\n",
658 chan->a.filename, commit_count, chan->a.sb_size);
659}
660
661static
662void ltt_relay_print_errors(struct ltt_chanbuf *buf, struct ltt_chan *chan,
663 struct ltt_trace *trace, int cpu)
664{
665 long cons_off;
666
667 /*
668 * Can be called in the error path of allocation when
669 * trans_channel_data is not yet set.
670 */
671 if (!chan)
672 return;
673 for (cons_off = 0; cons_off < chan->a.buf_size;
674 cons_off = SUBBUF_ALIGN(cons_off, chan))
675 ltt_relay_print_written(chan, cons_off, cpu);
676 for (cons_off = atomic_long_read(&buf->consumed);
677 (SUBBUF_TRUNC(local_read(&buf->offset), chan)
678 - cons_off) > 0;
679 cons_off = SUBBUF_ALIGN(cons_off, chan))
680 ltt_relay_print_subbuffer_errors(buf, chan, cons_off, cpu);
681}
682
683static
684void ltt_relay_print_buffer_errors(struct ltt_chan *chan, unsigned int cpu)
685{
686 struct ltt_trace *trace = chan->a.trace;
687 struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
688
689 if (local_read(&buf->events_lost))
690 printk(KERN_ALERT
691 "LTT : %s : %ld events lost "
692 "in %s channel (cpu %u).\n",
693 chan->a.filename, local_read(&buf->events_lost),
694 chan->a.filename, cpu);
695 if (local_read(&buf->corrupted_subbuffers))
696 printk(KERN_ALERT
697 "LTT : %s : %ld corrupted subbuffers "
698 "in %s channel (cpu %u).\n",
699 chan->a.filename,
700 local_read(&buf->corrupted_subbuffers),
701 chan->a.filename, cpu);
702
703 ltt_relay_print_errors(buf, chan, trace, cpu);
704}
705
706static void ltt_relay_remove_dirs(struct ltt_trace *trace)
707{
708 ltt_ascii_remove_dir(trace);
709 debugfs_remove(trace->dentry.trace_root);
710}
711
712static int ltt_relay_create_dirs(struct ltt_trace *new_trace)
713{
714 struct dentry *ltt_root_dentry;
715 int ret;
716
717 ltt_root_dentry = get_ltt_root();
718 if (!ltt_root_dentry)
719 return ENOENT;
720
721 new_trace->dentry.trace_root = debugfs_create_dir(new_trace->trace_name,
722 ltt_root_dentry);
723 put_ltt_root();
724 if (new_trace->dentry.trace_root == NULL) {
725 printk(KERN_ERR "LTT : Trace directory name %s already taken\n",
726 new_trace->trace_name);
727 return EEXIST;
728 }
729 ret = ltt_ascii_create_dir(new_trace);
730 if (ret)
731 printk(KERN_WARNING "LTT : Unable to create ascii output file "
732 "for trace %s\n", new_trace->trace_name);
733
734 return 0;
735}
736
737/*
738 * LTTng channel flush function.
739 *
740 * Must be called when no tracing is active in the channel, because of
741 * accesses across CPUs.
742 */
743static notrace void ltt_relay_buffer_flush(struct ltt_chanbuf *buf)
744{
745 buf->finalized = 1;
746 ltt_force_switch(buf, FORCE_FLUSH);
747}
748
749static void ltt_relay_async_wakeup_chan(struct ltt_chan *chan)
750{
751 unsigned int i;
752
753 for_each_possible_cpu(i) {
754 struct ltt_chanbuf *buf;
755
756 buf = per_cpu_ptr(chan->a.buf, i);
757 if (!buf->a.allocated)
758 continue;
759 /*
760 * Ensure the buffer has been allocated before reading its
761 * content. Sync cpu hotplug vs async wakeup.
762 */
763 smp_rmb();
764 if (ltt_poll_deliver(buf, chan))
765 wake_up_interruptible(&buf->read_wait);
766 }
767}
768
769static void ltt_relay_finish_buffer(struct ltt_chan *chan, unsigned int cpu)
770{
771 struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
772
773 if (buf->a.allocated) {
774 ltt_relay_buffer_flush(buf);
775 ltt_relay_wake_writers(buf);
776 }
777}
778
779
780static void ltt_relay_finish_channel(struct ltt_chan *chan)
781{
782 unsigned int i;
783
784 for_each_possible_cpu(i)
785 ltt_relay_finish_buffer(chan, i);
786}
787
788/*
789 * This is called with preemption disabled when user space has requested
790 * blocking mode. If one of the active traces has free space below a
791 * specific threshold value, we reenable preemption and block.
792 */
793static
794int ltt_relay_user_blocking(struct ltt_trace *trace, unsigned int chan_index,
795 size_t data_size, struct user_dbg_data *dbg)
796{
797 struct ltt_chanbuf *buf;
798 struct ltt_chan *chan;
799 int cpu;
800 DECLARE_WAITQUEUE(wait, current);
801
802 chan = &trace->channels[chan_index];
803 cpu = smp_processor_id();
804 buf = per_cpu_ptr(chan->a.buf, cpu);
805
806 /*
807 * Check if data is too big for the channel : do not
808 * block for it.
809 */
810 if (LTT_RESERVE_CRITICAL + data_size > chan->a.sb_size)
811 return 0;
812
813 /*
814 * If free space too low, we block. We restart from the
815 * beginning after we resume (cpu id may have changed
816 * while preemption is active).
817 */
818 spin_lock(&buf->full_lock);
819 if (!chan->overwrite) {
820 dbg->write = local_read(&buf->offset);
821 dbg->read = atomic_long_read(&buf->consumed);
822 dbg->avail_size = dbg->write + LTT_RESERVE_CRITICAL + data_size
823 - SUBBUF_TRUNC(dbg->read, chan);
824 if (dbg->avail_size > chan->a.buf_size) {
825 __set_current_state(TASK_INTERRUPTIBLE);
826 add_wait_queue(&buf->write_wait, &wait);
827 spin_unlock(&buf->full_lock);
828 preempt_enable();
829 schedule();
830 __set_current_state(TASK_RUNNING);
831 remove_wait_queue(&buf->write_wait, &wait);
832 if (signal_pending(current))
833 return -ERESTARTSYS;
834 preempt_disable();
835 return 1;
836 }
837 }
838 spin_unlock(&buf->full_lock);
839 return 0;
840}
841
842static
843void ltt_relay_print_user_errors(struct ltt_trace *trace,
844 unsigned int chan_index, size_t data_size,
845 struct user_dbg_data *dbg, int cpu)
846{
847 struct ltt_chanbuf *buf;
848 struct ltt_chan *chan;
849
850 chan = &trace->channels[chan_index];
851 buf = per_cpu_ptr(chan->a.buf, cpu);
852
853 printk(KERN_ERR "Error in LTT usertrace : "
854 "buffer full : event lost in blocking "
855 "mode. Increase LTT_RESERVE_CRITICAL.\n");
856 printk(KERN_ERR "LTT nesting level is %u.\n",
857 per_cpu(ltt_nesting, cpu));
858 printk(KERN_ERR "LTT available size %lu.\n",
859 dbg->avail_size);
860 printk(KERN_ERR "available write : %lu, read : %lu\n",
861 dbg->write, dbg->read);
862
863 dbg->write = local_read(&buf->offset);
864 dbg->read = atomic_long_read(&buf->consumed);
865
866 printk(KERN_ERR "LTT current size %lu.\n",
867 dbg->write + LTT_RESERVE_CRITICAL + data_size
868 - SUBBUF_TRUNC(dbg->read, chan));
869 printk(KERN_ERR "current write : %lu, read : %lu\n",
870 dbg->write, dbg->read);
871}
872
873/*
874 * ltt_reserve_switch_old_subbuf: switch old subbuffer
875 *
876 * Concurrency safe because we are the last and only thread to alter this
877 * sub-buffer. As long as it is not delivered and read, no other thread can
878 * alter the offset, alter the reserve_count or call the
879 * client_buffer_end_callback on this sub-buffer.
880 *
881 * The only remaining threads could be the ones with pending commits. They will
882 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
883 * We detect corrupted subbuffers with commit and reserve counts. We keep a
884 * corrupted sub-buffers count and push the readers across these sub-buffers.
885 *
886 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
887 * switches in, finding out it's corrupted. The result will be than the old
888 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
889 * will be declared corrupted too because of the commit count adjustment.
890 *
891 * Note : offset_old should never be 0 here.
892 */
893static
894void ltt_reserve_switch_old_subbuf(struct ltt_chanbuf *buf,
895 struct ltt_chan *chan,
896 struct ltt_reserve_switch_offsets *offsets,
897 u64 *tsc)
898{
899 long oldidx = SUBBUF_INDEX(offsets->old - 1, chan);
900 long commit_count, padding_size;
901
902 padding_size = chan->a.sb_size
903 - (SUBBUF_OFFSET(offsets->old - 1, chan) + 1);
904 ltt_buffer_end(buf, *tsc, offsets->old, oldidx);
905
906 /*
907 * Must write slot data before incrementing commit count.
908 * This compiler barrier is upgraded into a smp_wmb() by the IPI
909 * sent by get_subbuf() when it does its smp_rmb().
910 */
911 barrier();
912 local_add(padding_size, &buf->commit_count[oldidx].cc);
913 commit_count = local_read(&buf->commit_count[oldidx].cc);
914 ltt_check_deliver(buf, chan, offsets->old - 1, commit_count, oldidx);
915 ltt_write_commit_counter(buf, chan, oldidx, offsets->old, commit_count,
916 padding_size);
917}
918
919/*
920 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
921 *
922 * This code can be executed unordered : writers may already have written to the
923 * sub-buffer before this code gets executed, caution. The commit makes sure
924 * that this code is executed before the deliver of this sub-buffer.
925 */
926static
927void ltt_reserve_switch_new_subbuf(struct ltt_chanbuf *buf,
928 struct ltt_chan *chan,
929 struct ltt_reserve_switch_offsets *offsets,
930 u64 *tsc)
931{
932 long beginidx = SUBBUF_INDEX(offsets->begin, chan);
933 long commit_count;
934
935 ltt_buffer_begin(buf, *tsc, beginidx);
936
937 /*
938 * Must write slot data before incrementing commit count.
939 * This compiler barrier is upgraded into a smp_wmb() by the IPI
940 * sent by get_subbuf() when it does its smp_rmb().
941 */
942 barrier();
943 local_add(ltt_sb_header_size(), &buf->commit_count[beginidx].cc);
944 commit_count = local_read(&buf->commit_count[beginidx].cc);
945 /* Check if the written buffer has to be delivered */
946 ltt_check_deliver(buf, chan, offsets->begin, commit_count, beginidx);
947 ltt_write_commit_counter(buf, chan, beginidx, offsets->begin,
948 commit_count, ltt_sb_header_size());
949}
950
951
952/*
953 * ltt_reserve_end_switch_current: finish switching current subbuffer
954 *
955 * Concurrency safe because we are the last and only thread to alter this
956 * sub-buffer. As long as it is not delivered and read, no other thread can
957 * alter the offset, alter the reserve_count or call the
958 * client_buffer_end_callback on this sub-buffer.
959 *
960 * The only remaining threads could be the ones with pending commits. They will
961 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
962 * We detect corrupted subbuffers with commit and reserve counts. We keep a
963 * corrupted sub-buffers count and push the readers across these sub-buffers.
964 *
965 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
966 * switches in, finding out it's corrupted. The result will be than the old
967 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
968 * will be declared corrupted too because of the commit count adjustment.
969 */
970static
971void ltt_reserve_end_switch_current(struct ltt_chanbuf *buf,
972 struct ltt_chan *chan,
973 struct ltt_reserve_switch_offsets *offsets,
974 u64 *tsc)
975{
976 long endidx = SUBBUF_INDEX(offsets->end - 1, chan);
977 long commit_count, padding_size;
978
979 padding_size = chan->a.sb_size
980 - (SUBBUF_OFFSET(offsets->end - 1, chan) + 1);
981
982 ltt_buffer_end(buf, *tsc, offsets->end, endidx);
983
984 /*
985 * Must write slot data before incrementing commit count.
986 * This compiler barrier is upgraded into a smp_wmb() by the IPI
987 * sent by get_subbuf() when it does its smp_rmb().
988 */
989 barrier();
990 local_add(padding_size, &buf->commit_count[endidx].cc);
991 commit_count = local_read(&buf->commit_count[endidx].cc);
992 ltt_check_deliver(buf, chan, offsets->end - 1, commit_count, endidx);
993 ltt_write_commit_counter(buf, chan, endidx, offsets->end, commit_count,
994 padding_size);
995}
996
997/*
998 * Returns :
999 * 0 if ok
1000 * !0 if execution must be aborted.
1001 */
1002static
1003int ltt_relay_try_switch_slow(enum force_switch_mode mode,
1004 struct ltt_chanbuf *buf, struct ltt_chan *chan,
1005 struct ltt_reserve_switch_offsets *offsets,
1006 u64 *tsc)
1007{
1008 long sb_index;
1009 long reserve_commit_diff;
1010 long off;
1011
1012 offsets->begin = local_read(&buf->offset);
1013 offsets->old = offsets->begin;
1014 offsets->begin_switch = 0;
1015 offsets->end_switch_old = 0;
1016
1017 *tsc = trace_clock_read64();
1018
1019 off = SUBBUF_OFFSET(offsets->begin, chan);
1020 if ((mode != FORCE_ACTIVE && off > 0) || off > ltt_sb_header_size()) {
1021 offsets->begin = SUBBUF_ALIGN(offsets->begin, chan);
1022 offsets->end_switch_old = 1;
1023 } else {
1024 /* we do not have to switch : buffer is empty */
1025 return -1;
1026 }
1027 if (mode == FORCE_ACTIVE)
1028 offsets->begin += ltt_sb_header_size();
1029 /*
1030 * Always begin_switch in FORCE_ACTIVE mode.
1031 * Test new buffer integrity
1032 */
1033 sb_index = SUBBUF_INDEX(offsets->begin, chan);
1034 reserve_commit_diff =
1035 (BUFFER_TRUNC(offsets->begin, chan)
1036 >> chan->a.n_sb_order)
1037 - (local_read(&buf->commit_count[sb_index].cc_sb)
1038 & chan->commit_count_mask);
1039 if (reserve_commit_diff == 0) {
1040 /* Next buffer not corrupted. */
1041 if (mode == FORCE_ACTIVE
1042 && !chan->overwrite
1043 && offsets->begin - atomic_long_read(&buf->consumed)
1044 >= chan->a.buf_size) {
1045 /*
1046 * We do not overwrite non consumed buffers and we are
1047 * full : ignore switch while tracing is active.
1048 */
1049 return -1;
1050 }
1051 } else {
1052 /*
1053 * Next subbuffer corrupted. Force pushing reader even in normal
1054 * mode
1055 */
1056 }
1057 offsets->end = offsets->begin;
1058 return 0;
1059}
1060
1061/*
1062 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1063 * completely reentrant : can be called while tracing is active with
1064 * absolutely no lock held.
1065 *
1066 * Note, however, that as a local_cmpxchg is used for some atomic
1067 * operations, this function must be called from the CPU which owns the buffer
1068 * for a ACTIVE flush.
1069 */
1070void ltt_force_switch_lockless_slow(struct ltt_chanbuf *buf,
1071 enum force_switch_mode mode)
1072{
1073 struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
1074 struct ltt_reserve_switch_offsets offsets;
1075 u64 tsc;
1076
1077 offsets.size = 0;
1078
1079 /*
1080 * Perform retryable operations.
1081 */
1082 do {
1083 if (ltt_relay_try_switch_slow(mode, buf, chan, &offsets, &tsc))
1084 return;
1085 } while (local_cmpxchg(&buf->offset, offsets.old, offsets.end)
1086 != offsets.old);
1087
1088 /*
1089 * Atomically update last_tsc. This update races against concurrent
1090 * atomic updates, but the race will always cause supplementary full TSC
1091 * events, never the opposite (missing a full TSC event when it would be
1092 * needed).
1093 */
1094 save_last_tsc(buf, tsc);
1095
1096 /*
1097 * Push the reader if necessary
1098 */
1099 if (mode == FORCE_ACTIVE) {
1100 ltt_reserve_push_reader(buf, chan, offsets.end - 1);
1101 ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.end - 1,
1102 chan));
1103 }
1104
1105 /*
1106 * Switch old subbuffer if needed.
1107 */
1108 if (offsets.end_switch_old) {
1109 ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.old - 1,
1110 chan));
1111 ltt_reserve_switch_old_subbuf(buf, chan, &offsets, &tsc);
1112 }
1113
1114 /*
1115 * Populate new subbuffer.
1116 */
1117 if (mode == FORCE_ACTIVE)
1118 ltt_reserve_switch_new_subbuf(buf, chan, &offsets, &tsc);
1119}
1120EXPORT_SYMBOL_GPL(ltt_force_switch_lockless_slow);
1121
1122/*
1123 * Returns :
1124 * 0 if ok
1125 * !0 if execution must be aborted.
1126 */
1127static
1128int ltt_relay_try_reserve_slow(struct ltt_chanbuf *buf, struct ltt_chan *chan,
1129 struct ltt_reserve_switch_offsets *offsets,
1130 size_t data_size, u64 *tsc, unsigned int *rflags,
1131 int largest_align)
1132{
1133 long reserve_commit_diff;
1134
1135 offsets->begin = local_read(&buf->offset);
1136 offsets->old = offsets->begin;
1137 offsets->begin_switch = 0;
1138 offsets->end_switch_current = 0;
1139 offsets->end_switch_old = 0;
1140
1141 *tsc = trace_clock_read64();
1142 if (last_tsc_overflow(buf, *tsc))
1143 *rflags = LTT_RFLAG_ID_SIZE_TSC;
1144
1145 if (unlikely(SUBBUF_OFFSET(offsets->begin, chan) == 0)) {
1146 offsets->begin_switch = 1; /* For offsets->begin */
1147 } else {
1148 offsets->size = ltt_get_header_size(chan, offsets->begin,
1149 data_size,
1150 &offsets->before_hdr_pad,
1151 *rflags);
1152 offsets->size += ltt_align(offsets->begin + offsets->size,
1153 largest_align)
1154 + data_size;
1155 if (unlikely((SUBBUF_OFFSET(offsets->begin, chan) +
1156 offsets->size) > chan->a.sb_size)) {
1157 offsets->end_switch_old = 1; /* For offsets->old */
1158 offsets->begin_switch = 1; /* For offsets->begin */
1159 }
1160 }
1161 if (unlikely(offsets->begin_switch)) {
1162 long sb_index;
1163
1164 /*
1165 * We are typically not filling the previous buffer completely.
1166 */
1167 if (likely(offsets->end_switch_old))
1168 offsets->begin = SUBBUF_ALIGN(offsets->begin, chan);
1169 offsets->begin = offsets->begin + ltt_sb_header_size();
1170 /* Test new buffer integrity */
1171 sb_index = SUBBUF_INDEX(offsets->begin, chan);
1172 reserve_commit_diff =
1173 (BUFFER_TRUNC(offsets->begin, chan)
1174 >> chan->a.n_sb_order)
1175 - (local_read(&buf->commit_count[sb_index].cc_sb)
1176 & chan->commit_count_mask);
1177 if (likely(reserve_commit_diff == 0)) {
1178 /* Next buffer not corrupted. */
1179 if (unlikely(!chan->overwrite &&
1180 (SUBBUF_TRUNC(offsets->begin, chan)
1181 - SUBBUF_TRUNC(atomic_long_read(&buf->consumed),
1182 chan))
1183 >= chan->a.buf_size)) {
1184 /*
1185 * We do not overwrite non consumed buffers
1186 * and we are full : event is lost.
1187 */
1188 local_inc(&buf->events_lost);
1189 return -1;
1190 } else {
1191 /*
1192 * next buffer not corrupted, we are either in
1193 * overwrite mode or the buffer is not full.
1194 * It's safe to write in this new subbuffer.
1195 */
1196 }
1197 } else {
1198 /*
1199 * Next subbuffer corrupted. Drop event in normal and
1200 * overwrite mode. Caused by either a writer OOPS or
1201 * too many nested writes over a reserve/commit pair.
1202 */
1203 local_inc(&buf->events_lost);
1204 return -1;
1205 }
1206 offsets->size = ltt_get_header_size(chan, offsets->begin,
1207 data_size,
1208 &offsets->before_hdr_pad,
1209 *rflags);
1210 offsets->size += ltt_align(offsets->begin + offsets->size,
1211 largest_align)
1212 + data_size;
1213 if (unlikely((SUBBUF_OFFSET(offsets->begin, chan)
1214 + offsets->size) > chan->a.sb_size)) {
1215 /*
1216 * Event too big for subbuffers, report error, don't
1217 * complete the sub-buffer switch.
1218 */
1219 local_inc(&buf->events_lost);
1220 return -1;
1221 } else {
1222 /*
1223 * We just made a successful buffer switch and the event
1224 * fits in the new subbuffer. Let's write.
1225 */
1226 }
1227 } else {
1228 /*
1229 * Event fits in the current buffer and we are not on a switch
1230 * boundary. It's safe to write.
1231 */
1232 }
1233 offsets->end = offsets->begin + offsets->size;
1234
1235 if (unlikely((SUBBUF_OFFSET(offsets->end, chan)) == 0)) {
1236 /*
1237 * The offset_end will fall at the very beginning of the next
1238 * subbuffer.
1239 */
1240 offsets->end_switch_current = 1; /* For offsets->begin */
1241 }
1242 return 0;
1243}
1244
1245/**
1246 * ltt_relay_reserve_slot_lockless_slow - Atomic slot reservation in a buffer.
1247 * @trace: the trace structure to log to.
1248 * @ltt_channel: channel structure
1249 * @transport_data: data structure specific to ltt relay
1250 * @data_size: size of the variable length data to log.
1251 * @slot_size: pointer to total size of the slot (out)
1252 * @buf_offset : pointer to reserved buffer offset (out)
1253 * @tsc: pointer to the tsc at the slot reservation (out)
1254 * @cpu: cpuid
1255 *
1256 * Return : -ENOSPC if not enough space, else returns 0.
1257 * It will take care of sub-buffer switching.
1258 */
1259int ltt_reserve_slot_lockless_slow(struct ltt_chan *chan,
1260 struct ltt_trace *trace, size_t data_size,
1261 int largest_align, int cpu,
1262 struct ltt_chanbuf **ret_buf,
1263 size_t *slot_size, long *buf_offset,
1264 u64 *tsc, unsigned int *rflags)
1265{
1266 struct ltt_chanbuf *buf = *ret_buf = per_cpu_ptr(chan->a.buf, cpu);
1267 struct ltt_reserve_switch_offsets offsets;
1268
1269 offsets.size = 0;
1270
1271 do {
1272 if (unlikely(ltt_relay_try_reserve_slow(buf, chan, &offsets,
1273 data_size, tsc, rflags,
1274 largest_align)))
1275 return -ENOSPC;
1276 } while (unlikely(local_cmpxchg(&buf->offset, offsets.old, offsets.end)
1277 != offsets.old));
1278
1279 /*
1280 * Atomically update last_tsc. This update races against concurrent
1281 * atomic updates, but the race will always cause supplementary full TSC
1282 * events, never the opposite (missing a full TSC event when it would be
1283 * needed).
1284 */
1285 save_last_tsc(buf, *tsc);
1286
1287 /*
1288 * Push the reader if necessary
1289 */
1290 ltt_reserve_push_reader(buf, chan, offsets.end - 1);
1291
1292 /*
1293 * Clear noref flag for this subbuffer.
1294 */
1295 ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.end - 1, chan));
1296
1297 /*
1298 * Switch old subbuffer if needed.
1299 */
1300 if (unlikely(offsets.end_switch_old)) {
1301 ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.old - 1,
1302 chan));
1303 ltt_reserve_switch_old_subbuf(buf, chan, &offsets, tsc);
1304 }
1305
1306 /*
1307 * Populate new subbuffer.
1308 */
1309 if (unlikely(offsets.begin_switch))
1310 ltt_reserve_switch_new_subbuf(buf, chan, &offsets, tsc);
1311
1312 if (unlikely(offsets.end_switch_current))
1313 ltt_reserve_end_switch_current(buf, chan, &offsets, tsc);
1314
1315 *slot_size = offsets.size;
1316 *buf_offset = offsets.begin + offsets.before_hdr_pad;
1317 return 0;
1318}
1319EXPORT_SYMBOL_GPL(ltt_reserve_slot_lockless_slow);
1320
1321static struct ltt_transport ltt_relay_transport = {
1322 .name = "relay",
1323 .owner = THIS_MODULE,
1324 .ops = {
1325 .create_dirs = ltt_relay_create_dirs,
1326 .remove_dirs = ltt_relay_remove_dirs,
1327 .create_channel = ltt_chan_create,
1328 .finish_channel = ltt_relay_finish_channel,
1329 .remove_channel = ltt_chan_free,
1330 .remove_channel_files = ltt_chan_remove_files,
1331 .wakeup_channel = ltt_relay_async_wakeup_chan,
1332 .user_blocking = ltt_relay_user_blocking,
1333 .user_errors = ltt_relay_print_user_errors,
1334 .start_switch_timer = ltt_chan_start_switch_timer,
1335 .stop_switch_timer = ltt_chan_stop_switch_timer,
1336 },
1337};
1338
1339static struct notifier_block fn_ltt_chanbuf_hotcpu_callback = {
1340 .notifier_call = ltt_chanbuf_hotcpu_callback,
1341 .priority = 6,
1342};
1343
1344int __init ltt_relay_init(void)
1345{
1346 printk(KERN_INFO "LTT : ltt-relay init\n");
1347
1348 ltt_transport_register(&ltt_relay_transport);
1349 register_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback);
1350 register_idle_notifier(&pm_idle_entry_notifier);
1351
1352 return 0;
1353}
1354
1355void __exit ltt_relay_exit(void)
1356{
1357 printk(KERN_INFO "LTT : ltt-relay exit\n");
1358
1359 unregister_idle_notifier(&pm_idle_entry_notifier);
1360 unregister_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback);
1361 ltt_transport_unregister(&ltt_relay_transport);
1362}
1363
1364MODULE_LICENSE("GPL and additional rights");
1365MODULE_AUTHOR("Mathieu Desnoyers");
1366MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");
This page took 0.071523 seconds and 4 git commands to generate.