8
8
#include <linux/trace_clock.h>
9
9
#include <linux/trace_seq.h>
10
10
#include <linux/spinlock.h>
11
+ #include <linux/irq_work.h>
11
12
#include <linux/debugfs.h>
12
13
#include <linux/uaccess.h>
13
14
#include <linux/hardirq.h>
@@ -442,6 +443,12 @@ int ring_buffer_print_page_header(struct trace_seq *s)
442
443
return ret ;
443
444
}
444
445
446
+ struct rb_irq_work {
447
+ struct irq_work work ;
448
+ wait_queue_head_t waiters ;
449
+ bool waiters_pending ;
450
+ };
451
+
445
452
/*
446
453
* head_page == tail_page && head == tail then buffer is empty.
447
454
*/
@@ -476,6 +483,8 @@ struct ring_buffer_per_cpu {
476
483
struct list_head new_pages ; /* new pages to add */
477
484
struct work_struct update_pages_work ;
478
485
struct completion update_done ;
486
+
487
+ struct rb_irq_work irq_work ;
479
488
};
480
489
481
490
struct ring_buffer {
@@ -495,6 +504,8 @@ struct ring_buffer {
495
504
struct notifier_block cpu_notify ;
496
505
#endif
497
506
u64 (* clock )(void );
507
+
508
+ struct rb_irq_work irq_work ;
498
509
};
499
510
500
511
struct ring_buffer_iter {
@@ -506,6 +517,118 @@ struct ring_buffer_iter {
506
517
u64 read_stamp ;
507
518
};
508
519
520
+ /*
521
+ * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
522
+ *
523
+ * Schedules a delayed work to wake up any task that is blocked on the
524
+ * ring buffer waiters queue.
525
+ */
526
+ static void rb_wake_up_waiters (struct irq_work * work )
527
+ {
528
+ struct rb_irq_work * rbwork = container_of (work , struct rb_irq_work , work );
529
+
530
+ wake_up_all (& rbwork -> waiters );
531
+ }
532
+
533
+ /**
534
+ * ring_buffer_wait - wait for input to the ring buffer
535
+ * @buffer: buffer to wait on
536
+ * @cpu: the cpu buffer to wait on
537
+ *
538
+ * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
539
+ * as data is added to any of the @buffer's cpu buffers. Otherwise
540
+ * it will wait for data to be added to a specific cpu buffer.
541
+ */
542
+ void ring_buffer_wait (struct ring_buffer * buffer , int cpu )
543
+ {
544
+ struct ring_buffer_per_cpu * cpu_buffer ;
545
+ DEFINE_WAIT (wait );
546
+ struct rb_irq_work * work ;
547
+
548
+ /*
549
+ * Depending on what the caller is waiting for, either any
550
+ * data in any cpu buffer, or a specific buffer, put the
551
+ * caller on the appropriate wait queue.
552
+ */
553
+ if (cpu == RING_BUFFER_ALL_CPUS )
554
+ work = & buffer -> irq_work ;
555
+ else {
556
+ cpu_buffer = buffer -> buffers [cpu ];
557
+ work = & cpu_buffer -> irq_work ;
558
+ }
559
+
560
+
561
+ prepare_to_wait (& work -> waiters , & wait , TASK_INTERRUPTIBLE );
562
+
563
+ /*
564
+ * The events can happen in critical sections where
565
+ * checking a work queue can cause deadlocks.
566
+ * After adding a task to the queue, this flag is set
567
+ * only to notify events to try to wake up the queue
568
+ * using irq_work.
569
+ *
570
+ * We don't clear it even if the buffer is no longer
571
+ * empty. The flag only causes the next event to run
572
+ * irq_work to do the work queue wake up. The worse
573
+ * that can happen if we race with !trace_empty() is that
574
+ * an event will cause an irq_work to try to wake up
575
+ * an empty queue.
576
+ *
577
+ * There's no reason to protect this flag either, as
578
+ * the work queue and irq_work logic will do the necessary
579
+ * synchronization for the wake ups. The only thing
580
+ * that is necessary is that the wake up happens after
581
+ * a task has been queued. It's OK for spurious wake ups.
582
+ */
583
+ work -> waiters_pending = true;
584
+
585
+ if ((cpu == RING_BUFFER_ALL_CPUS && ring_buffer_empty (buffer )) ||
586
+ (cpu != RING_BUFFER_ALL_CPUS && ring_buffer_empty_cpu (buffer , cpu )))
587
+ schedule ();
588
+
589
+ finish_wait (& work -> waiters , & wait );
590
+ }
591
+
592
+ /**
593
+ * ring_buffer_poll_wait - poll on buffer input
594
+ * @buffer: buffer to wait on
595
+ * @cpu: the cpu buffer to wait on
596
+ * @filp: the file descriptor
597
+ * @poll_table: The poll descriptor
598
+ *
599
+ * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
600
+ * as data is added to any of the @buffer's cpu buffers. Otherwise
601
+ * it will wait for data to be added to a specific cpu buffer.
602
+ *
603
+ * Returns POLLIN | POLLRDNORM if data exists in the buffers,
604
+ * zero otherwise.
605
+ */
606
+ int ring_buffer_poll_wait (struct ring_buffer * buffer , int cpu ,
607
+ struct file * filp , poll_table * poll_table )
608
+ {
609
+ struct ring_buffer_per_cpu * cpu_buffer ;
610
+ struct rb_irq_work * work ;
611
+
612
+ if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty (buffer )) ||
613
+ (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu (buffer , cpu )))
614
+ return POLLIN | POLLRDNORM ;
615
+
616
+ if (cpu == RING_BUFFER_ALL_CPUS )
617
+ work = & buffer -> irq_work ;
618
+ else {
619
+ cpu_buffer = buffer -> buffers [cpu ];
620
+ work = & cpu_buffer -> irq_work ;
621
+ }
622
+
623
+ work -> waiters_pending = true;
624
+ poll_wait (filp , & work -> waiters , poll_table );
625
+
626
+ if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty (buffer )) ||
627
+ (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu (buffer , cpu )))
628
+ return POLLIN | POLLRDNORM ;
629
+ return 0 ;
630
+ }
631
+
509
632
/* buffer may be either ring_buffer or ring_buffer_per_cpu */
510
633
#define RB_WARN_ON (b , cond ) \
511
634
({ \
@@ -1061,6 +1184,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
1061
1184
cpu_buffer -> lock = (arch_spinlock_t )__ARCH_SPIN_LOCK_UNLOCKED ;
1062
1185
INIT_WORK (& cpu_buffer -> update_pages_work , update_pages_handler );
1063
1186
init_completion (& cpu_buffer -> update_done );
1187
+ init_irq_work (& cpu_buffer -> irq_work .work , rb_wake_up_waiters );
1064
1188
1065
1189
bpage = kzalloc_node (ALIGN (sizeof (* bpage ), cache_line_size ()),
1066
1190
GFP_KERNEL , cpu_to_node (cpu ));
@@ -1156,6 +1280,8 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1156
1280
buffer -> clock = trace_clock_local ;
1157
1281
buffer -> reader_lock_key = key ;
1158
1282
1283
+ init_irq_work (& buffer -> irq_work .work , rb_wake_up_waiters );
1284
+
1159
1285
/* need at least two pages */
1160
1286
if (nr_pages < 2 )
1161
1287
nr_pages = 2 ;
@@ -2610,6 +2736,22 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2610
2736
rb_end_commit (cpu_buffer );
2611
2737
}
2612
2738
2739
+ static __always_inline void
2740
+ rb_wakeups (struct ring_buffer * buffer , struct ring_buffer_per_cpu * cpu_buffer )
2741
+ {
2742
+ if (buffer -> irq_work .waiters_pending ) {
2743
+ buffer -> irq_work .waiters_pending = false;
2744
+ /* irq_work_queue() supplies it's own memory barriers */
2745
+ irq_work_queue (& buffer -> irq_work .work );
2746
+ }
2747
+
2748
+ if (cpu_buffer -> irq_work .waiters_pending ) {
2749
+ cpu_buffer -> irq_work .waiters_pending = false;
2750
+ /* irq_work_queue() supplies it's own memory barriers */
2751
+ irq_work_queue (& cpu_buffer -> irq_work .work );
2752
+ }
2753
+ }
2754
+
2613
2755
/**
2614
2756
* ring_buffer_unlock_commit - commit a reserved
2615
2757
* @buffer: The buffer to commit to
@@ -2629,6 +2771,8 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer,
2629
2771
2630
2772
rb_commit (cpu_buffer , event );
2631
2773
2774
+ rb_wakeups (buffer , cpu_buffer );
2775
+
2632
2776
trace_recursive_unlock ();
2633
2777
2634
2778
preempt_enable_notrace ();
@@ -2801,6 +2945,8 @@ int ring_buffer_write(struct ring_buffer *buffer,
2801
2945
2802
2946
rb_commit (cpu_buffer , event );
2803
2947
2948
+ rb_wakeups (buffer , cpu_buffer );
2949
+
2804
2950
ret = 0 ;
2805
2951
out :
2806
2952
preempt_enable_notrace ();
0 commit comments