1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.async;
19
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.TimeUnit;
22
23 import com.lmax.disruptor.EventTranslatorVararg;
24 import org.apache.logging.log4j.Level;
25 import org.apache.logging.log4j.Marker;
26 import org.apache.logging.log4j.core.AbstractLifeCycle;
27 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
28 import org.apache.logging.log4j.core.util.Log4jThreadFactory;
29 import org.apache.logging.log4j.core.util.Throwables;
30
31 import com.lmax.disruptor.ExceptionHandler;
32 import com.lmax.disruptor.RingBuffer;
33 import com.lmax.disruptor.TimeoutException;
34 import com.lmax.disruptor.WaitStrategy;
35 import com.lmax.disruptor.dsl.Disruptor;
36 import com.lmax.disruptor.dsl.ProducerType;
37 import org.apache.logging.log4j.message.Message;
38
39
40
41
42
43
44
45 class AsyncLoggerDisruptor extends AbstractLifeCycle {
46 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
47 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
48
49 private final Object queueFullEnqueueLock = new Object();
50
51 private volatile Disruptor<RingBufferLogEvent> disruptor;
52 private String contextName;
53
54 private boolean useThreadLocalTranslator = true;
55 private long backgroundThreadId;
56 private AsyncQueueFullPolicy asyncQueueFullPolicy;
57 private int ringBufferSize;
58
59 AsyncLoggerDisruptor(final String contextName) {
60 this.contextName = contextName;
61 }
62
63 public String getContextName() {
64 return contextName;
65 }
66
67 public void setContextName(final String name) {
68 contextName = name;
69 }
70
71 Disruptor<RingBufferLogEvent> getDisruptor() {
72 return disruptor;
73 }
74
75
76
77
78
79
80 @Override
81 public synchronized void start() {
82 if (disruptor != null) {
83 LOGGER.trace(
84 "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
85 contextName);
86 return;
87 }
88 LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
89 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
90 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
91
92 final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLogger[" + contextName + "]", true, Thread.NORM_PRIORITY) {
93 @Override
94 public Thread newThread(final Runnable r) {
95 final Thread result = super.newThread(r);
96 backgroundThreadId = result.getId();
97 return result;
98 }
99 };
100 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
101
102 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, threadFactory, ProducerType.MULTI,
103 waitStrategy);
104
105 final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
106 disruptor.setDefaultExceptionHandler(errorHandler);
107
108 final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
109 disruptor.handleEventsWith(handlers);
110
111 LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
112 + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
113 .getClass().getSimpleName(), errorHandler);
114 disruptor.start();
115
116 LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
117 : "vararg");
118 super.start();
119 }
120
121
122
123
124
125 @Override
126 public boolean stop(final long timeout, final TimeUnit timeUnit) {
127 final Disruptor<RingBufferLogEvent> temp = getDisruptor();
128 if (temp == null) {
129 LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
130 return true;
131 }
132 setStopping();
133 LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
134
135
136 disruptor = null;
137
138
139
140
141 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
142 try {
143 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
144 } catch (final InterruptedException e) {
145 }
146 }
147 try {
148
149 temp.shutdown(timeout, timeUnit);
150 } catch (final TimeoutException e) {
151 LOGGER.warn("[{}] AsyncLoggerDisruptor: shutdown timed out after {} {}", contextName, timeout, timeUnit);
152 temp.halt();
153 }
154
155 LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor has been shut down.", contextName);
156
157 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
158 LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
159 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
160 }
161 setStopped();
162 return true;
163 }
164
165
166
167
168 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
169 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
170 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
171 }
172
173
174
175
176
177
178
179 public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
180 final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
181 return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
182 }
183
184 EventRoute getEventRoute(final Level logLevel) {
185 final int remainingCapacity = remainingDisruptorCapacity();
186 if (remainingCapacity < 0) {
187 return EventRoute.DISCARD;
188 }
189 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
190 }
191
192 private int remainingDisruptorCapacity() {
193 final Disruptor<RingBufferLogEvent> temp = disruptor;
194 if (hasLog4jBeenShutDown(temp)) {
195 return -1;
196 }
197 return (int) temp.getRingBuffer().remainingCapacity();
198 }
199
200
201
202 private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) {
203 if (aDisruptor == null) {
204 LOGGER.warn("Ignoring log event after log4j was shut down");
205 return true;
206 }
207 return false;
208 }
209
210 boolean tryPublish(final RingBufferLogEventTranslator translator) {
211 try {
212
213
214
215 return disruptor.getRingBuffer().tryPublishEvent(translator);
216 } catch (final NullPointerException npe) {
217
218 logWarningOnNpeFromDisruptorPublish(translator);
219 return false;
220 }
221 }
222
223 void enqueueLogMessageWhenQueueFull(final RingBufferLogEventTranslator translator) {
224 try {
225
226
227
228 if (synchronizeEnqueueWhenQueueFull()) {
229 synchronized (queueFullEnqueueLock) {
230 disruptor.publishEvent(translator);
231 }
232 } else {
233 disruptor.publishEvent(translator);
234 }
235 } catch (final NullPointerException npe) {
236
237 logWarningOnNpeFromDisruptorPublish(translator);
238 }
239 }
240
241 void enqueueLogMessageWhenQueueFull(
242 final EventTranslatorVararg<RingBufferLogEvent> translator,
243 final AsyncLogger asyncLogger,
244 final StackTraceElement location,
245 final String fqcn,
246 final Level level,
247 final Marker marker,
248 final Message msg,
249 final Throwable thrown) {
250 try {
251
252
253
254 if (synchronizeEnqueueWhenQueueFull()) {
255 synchronized (queueFullEnqueueLock) {
256 disruptor.getRingBuffer().publishEvent(translator,
257 asyncLogger,
258 location,
259 fqcn,
260 level,
261 marker,
262 msg,
263 thrown);
264 }
265 } else {
266 disruptor.getRingBuffer().publishEvent(translator,
267 asyncLogger,
268 location,
269 fqcn,
270 level,
271 marker,
272 msg,
273 thrown);
274 }
275 } catch (final NullPointerException npe) {
276
277 logWarningOnNpeFromDisruptorPublish(level, fqcn, msg, thrown);
278 }
279 }
280
281 private boolean synchronizeEnqueueWhenQueueFull() {
282 return DisruptorUtil.ASYNC_LOGGER_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL
283
284 && backgroundThreadId != Thread.currentThread().getId();
285 }
286
287 private void logWarningOnNpeFromDisruptorPublish(final RingBufferLogEventTranslator translator) {
288 logWarningOnNpeFromDisruptorPublish(
289 translator.level, translator.loggerName, translator.message, translator.thrown);
290 }
291
292 private void logWarningOnNpeFromDisruptorPublish(
293 final Level level, final String fqcn, final Message msg, final Throwable thrown) {
294 LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}{}", contextName,
295 level, fqcn, msg.getFormattedMessage(), thrown == null ? "" : Throwables.toStringList(thrown));
296 }
297
298
299
300
301
302
303
304
305 public boolean isUseThreadLocals() {
306 return useThreadLocalTranslator;
307 }
308
309
310
311
312
313
314
315
316
317
318
319
320 public void setUseThreadLocals(final boolean allow) {
321 useThreadLocalTranslator = allow;
322 LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName,
323 useThreadLocalTranslator ? "threadlocal" : "vararg");
324 }
325 }