1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ThreadFactory;
22
23 import org.apache.logging.log4j.Logger;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26 import org.apache.logging.log4j.core.util.Integers;
27 import org.apache.logging.log4j.status.StatusLogger;
28 import org.apache.logging.log4j.util.PropertiesUtil;
29
30 import com.lmax.disruptor.BlockingWaitStrategy;
31 import com.lmax.disruptor.EventFactory;
32 import com.lmax.disruptor.EventHandler;
33 import com.lmax.disruptor.EventTranslatorTwoArg;
34 import com.lmax.disruptor.ExceptionHandler;
35 import com.lmax.disruptor.RingBuffer;
36 import com.lmax.disruptor.Sequence;
37 import com.lmax.disruptor.SequenceReportingEventHandler;
38 import com.lmax.disruptor.SleepingWaitStrategy;
39 import com.lmax.disruptor.WaitStrategy;
40 import com.lmax.disruptor.YieldingWaitStrategy;
41 import com.lmax.disruptor.dsl.Disruptor;
42 import com.lmax.disruptor.dsl.ProducerType;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 class AsyncLoggerConfigHelper {
61
62 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
63 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
64 private static final int RINGBUFFER_MIN_SIZE = 128;
65 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
66 private static final Logger LOGGER = StatusLogger.getLogger();
67
68 private static ThreadFactory threadFactory = new DaemonThreadFactory("AsyncLoggerConfig-");
69 private static volatile Disruptor<Log4jEventWrapper> disruptor;
70 private static ExecutorService executor;
71
72 private static volatile int count = 0;
73 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
74
75
76
77
78
79 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
80 @Override
81 public Log4jEventWrapper newInstance() {
82 return new Log4jEventWrapper();
83 }
84 };
85
86
87
88
89 private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator
90 = new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
91
92 @Override
93 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
94 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
95 ringBufferElement.event = logEvent;
96 ringBufferElement.loggerConfig = loggerConfig;
97 }
98 };
99
100 private final AsyncLoggerConfig asyncLoggerConfig;
101
102 public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
103 this.asyncLoggerConfig = asyncLoggerConfig;
104 claim();
105 }
106
107 private static synchronized void initDisruptor() {
108 if (disruptor != null) {
109 LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
110 return;
111 }
112 LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
113 final int ringBufferSize = calculateRingBufferSize();
114 final WaitStrategy waitStrategy = createWaitStrategy();
115 executor = Executors.newSingleThreadExecutor(threadFactory);
116 initThreadLocalForExecutorThread();
117 disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
118 executor, ProducerType.MULTI, waitStrategy);
119 final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {
120 new Log4jEventWrapperHandler() };
121 final ExceptionHandler<Log4jEventWrapper> errorHandler = getExceptionHandler();
122 disruptor.handleExceptionsWith(errorHandler);
123 disruptor.handleEventsWith(handlers);
124
125 LOGGER.debug(
126 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
127 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
128 disruptor.start();
129 }
130
131 private static WaitStrategy createWaitStrategy() {
132 final String strategy = System
133 .getProperty("AsyncLoggerConfig.WaitStrategy");
134 LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
135 if ("Sleep".equals(strategy)) {
136 return new SleepingWaitStrategy();
137 } else if ("Yield".equals(strategy)) {
138 return new YieldingWaitStrategy();
139 } else if ("Block".equals(strategy)) {
140 return new BlockingWaitStrategy();
141 }
142 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
143 return new BlockingWaitStrategy();
144 }
145
146 private static int calculateRingBufferSize() {
147 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
148 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
149 "AsyncLoggerConfig.RingBufferSize",
150 String.valueOf(ringBufferSize));
151 try {
152 int size = Integer.parseInt(userPreferredRBSize);
153 if (size < RINGBUFFER_MIN_SIZE) {
154 size = RINGBUFFER_MIN_SIZE;
155 LOGGER.warn(
156 "Invalid RingBufferSize {}, using minimum size {}.",
157 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
158 }
159 ringBufferSize = size;
160 } catch (final Exception ex) {
161 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
162 userPreferredRBSize, ringBufferSize);
163 }
164 return Integers.ceilingNextPowerOfTwo(ringBufferSize);
165 }
166
167 private static ExceptionHandler<Log4jEventWrapper> getExceptionHandler() {
168 final String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
169 if (cls == null) {
170 return null;
171 }
172 try {
173 @SuppressWarnings("unchecked")
174 final Class<? extends ExceptionHandler<Log4jEventWrapper>> klass = (Class<? extends ExceptionHandler<Log4jEventWrapper>>) Class
175 .forName(cls);
176 return klass.newInstance();
177 } catch (final Exception ignored) {
178 LOGGER.debug("AsyncLoggerConfig.ExceptionHandler not set: error creating " + cls + ": ", ignored);
179 return null;
180 }
181 }
182
183
184
185
186
187 private static class Log4jEventWrapper {
188 private AsyncLoggerConfig loggerConfig;
189 private LogEvent event;
190
191
192
193
194
195 public void clear() {
196 loggerConfig = null;
197 event = null;
198 }
199 }
200
201
202
203
204 private static class Log4jEventWrapperHandler implements
205 SequenceReportingEventHandler<Log4jEventWrapper> {
206 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
207 private Sequence sequenceCallback;
208 private int counter;
209
210 @Override
211 public void setSequenceCallback(final Sequence sequenceCallback) {
212 this.sequenceCallback = sequenceCallback;
213 }
214
215 @Override
216 public void onEvent(final Log4jEventWrapper event, final long sequence,
217 final boolean endOfBatch) throws Exception {
218 event.event.setEndOfBatch(endOfBatch);
219 event.loggerConfig.asyncCallAppenders(event.event);
220 event.clear();
221
222
223
224
225 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
226 sequenceCallback.set(sequence);
227 counter = 0;
228 }
229 }
230 }
231
232
233
234
235
236
237
238 synchronized static void claim() {
239 count++;
240 initDisruptor();
241 }
242
243
244
245
246
247
248 synchronized static void release() {
249 if (--count > 0) {
250 LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
251 return;
252 }
253 final Disruptor<Log4jEventWrapper> temp = disruptor;
254 if (temp == null) {
255 LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}. (Resetting to zero.)",
256 count);
257 count = 0;
258 return;
259 }
260 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
261
262
263
264 disruptor = null;
265
266
267
268
269 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
270 try {
271 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
272 } catch (final InterruptedException e) {
273 }
274 }
275 temp.shutdown();
276 executor.shutdown();
277 executor = null;
278 }
279
280
281
282
283 private static boolean hasBacklog(final Disruptor<?> disruptor) {
284 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
285 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
286 }
287
288
289
290
291
292
293 private static void initThreadLocalForExecutorThread() {
294 executor.submit(new Runnable() {
295 @Override
296 public void run() {
297 isAppenderThread.set(Boolean.TRUE);
298 }
299 });
300 }
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315 public boolean callAppendersFromAnotherThread(final LogEvent event) {
316
317 final Disruptor<Log4jEventWrapper> temp = disruptor;
318 if (temp == null) {
319 LOGGER.fatal("Ignoring log event after log4j was shut down");
320 return true;
321 }
322
323
324
325 if (isAppenderThread.get() == Boolean.TRUE
326 && temp.getRingBuffer().remainingCapacity() == 0) {
327
328
329 return false;
330 }
331
332 try {
333 LogEvent logEvent = event;
334 if (event instanceof RingBufferLogEvent) {
335 logEvent = ((RingBufferLogEvent) event).createMemento();
336 }
337 logEvent.getMessage().getFormattedMessage();
338
339
340
341
342 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
343 } catch (final NullPointerException npe) {
344 LOGGER.fatal("Ignoring log event after log4j was shut down.");
345 }
346 return true;
347 }
348
349
350
351
352
353
354
355
356 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
357 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
358 }
359
360 }