1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Collection;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.LockSupport;
23
24 import org.apache.logging.log4j.core.config.Node;
25 import org.apache.logging.log4j.core.config.plugins.Plugin;
26 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
27 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28 import org.jctools.queues.MpscArrayQueue;
29
30
31
32
33
34
35 @Plugin(name = "JCToolsBlockingQueue", category = Node.CATEGORY, elementType = BlockingQueueFactory.ELEMENT_TYPE)
36 public class JCToolsBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
37
38 private final WaitStrategy waitStrategy;
39
40 private JCToolsBlockingQueueFactory(final WaitStrategy waitStrategy) {
41 this.waitStrategy = waitStrategy;
42 }
43
44 @Override
45 public BlockingQueue<E> create(final int capacity) {
46 return new MpscBlockingQueue<>(capacity, waitStrategy);
47 }
48
49 @PluginFactory
50 public static <E> JCToolsBlockingQueueFactory<E> createFactory(
51 @PluginAttribute(value = "WaitStrategy", defaultString = "PARK") final WaitStrategy waitStrategy) {
52 return new JCToolsBlockingQueueFactory<>(waitStrategy);
53 }
54
55
56
57
58 private static final class MpscBlockingQueue<E> extends MpscArrayQueue<E> implements BlockingQueue<E> {
59
60 private final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy;
61
62 MpscBlockingQueue(final int capacity, final JCToolsBlockingQueueFactory.WaitStrategy waitStrategy) {
63 super(capacity);
64 this.waitStrategy = waitStrategy;
65 }
66
67 @Override
68 public int drainTo(final Collection<? super E> c) {
69 return drainTo(c, capacity());
70 }
71
72 @Override
73 public int drainTo(final Collection<? super E> c, final int maxElements) {
74 return drain(new Consumer<E>() {
75 @Override
76 public void accept(final E e) {
77 c.add(e);
78 }
79 }, maxElements);
80 }
81
82 @Override
83 public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
84 int idleCounter = 0;
85 final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
86 do {
87 if (offer(e)) {
88 return true;
89 } else if (System.nanoTime() - timeoutNanos > 0) {
90 return false;
91 }
92 idleCounter = waitStrategy.idle(idleCounter);
93 } while (!Thread.interrupted());
94 throw new InterruptedException();
95 }
96
97 @Override
98 public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
99 int idleCounter = 0;
100 final long timeoutNanos = System.nanoTime() + unit.toNanos(timeout);
101 do {
102 final E result = poll();
103 if (result != null) {
104 return result;
105 } else if (System.nanoTime() - timeoutNanos > 0) {
106 return null;
107 }
108 idleCounter = waitStrategy.idle(idleCounter);
109 } while (!Thread.interrupted());
110 throw new InterruptedException();
111 }
112
113 @Override
114 public void put(final E e) throws InterruptedException {
115 int idleCounter = 0;
116 do {
117 if (offer(e)) {
118 return;
119 }
120 idleCounter = waitStrategy.idle(idleCounter);
121 } while (!Thread.interrupted());
122 throw new InterruptedException();
123 }
124
125 @Override
126 public boolean offer(final E e) {
127
128 return offerIfBelowThreshold(e, capacity() - 32);
129 }
130
131 @Override
132 public int remainingCapacity() {
133 return capacity() - size();
134 }
135
136 @Override
137 public E take() throws InterruptedException {
138 int idleCounter = 100;
139 do {
140 final E result = relaxedPoll();
141 if (result != null) {
142 return result;
143 }
144 idleCounter = waitStrategy.idle(idleCounter);
145 } while (!Thread.interrupted());
146 throw new InterruptedException();
147 }
148 }
149
150 public enum WaitStrategy {
151 SPIN(new Idle() {
152 @Override
153 public int idle(final int idleCounter) {
154 return idleCounter + 1;
155 }
156 }),
157 YIELD(new Idle() {
158 @Override
159 public int idle(final int idleCounter) {
160 Thread.yield();
161 return idleCounter + 1;
162 }
163 }),
164 PARK(new Idle() {
165 @Override
166 public int idle(final int idleCounter) {
167 LockSupport.parkNanos(1L);
168 return idleCounter + 1;
169 }
170 }),
171 PROGRESSIVE(new Idle() {
172 @Override
173 public int idle(final int idleCounter) {
174 if (idleCounter > 200) {
175 LockSupport.parkNanos(1L);
176 } else if (idleCounter > 100) {
177 Thread.yield();
178 }
179 return idleCounter + 1;
180 }
181 });
182
183 private final Idle idle;
184
185 private int idle(final int idleCounter) {
186 return idle.idle(idleCounter);
187 }
188
189 WaitStrategy(final Idle idle) {
190 this.idle = idle;
191 }
192 }
193
194 private interface Idle {
195 int idle(int idleCounter);
196 }
197
198 }