1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.Marker;
25 import org.apache.logging.log4j.ThreadContext;
26 import org.apache.logging.log4j.core.Logger;
27 import org.apache.logging.log4j.core.LoggerContext;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
30 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
31 import org.apache.logging.log4j.core.util.Clock;
32 import org.apache.logging.log4j.core.util.ClockFactory;
33 import org.apache.logging.log4j.core.util.Integers;
34 import org.apache.logging.log4j.core.util.Loader;
35 import org.apache.logging.log4j.message.Message;
36 import org.apache.logging.log4j.message.MessageFactory;
37 import org.apache.logging.log4j.message.TimestampMessage;
38 import org.apache.logging.log4j.status.StatusLogger;
39 import org.apache.logging.log4j.util.PropertiesUtil;
40
41 import com.lmax.disruptor.BlockingWaitStrategy;
42 import com.lmax.disruptor.ExceptionHandler;
43 import com.lmax.disruptor.RingBuffer;
44 import com.lmax.disruptor.SleepingWaitStrategy;
45 import com.lmax.disruptor.WaitStrategy;
46 import com.lmax.disruptor.YieldingWaitStrategy;
47 import com.lmax.disruptor.dsl.Disruptor;
48 import com.lmax.disruptor.dsl.ProducerType;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public class AsyncLogger extends Logger {
79 private static final long serialVersionUID = 1L;
80 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
81 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
82 private static final int RINGBUFFER_MIN_SIZE = 128;
83 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
84 private static final StatusLogger LOGGER = StatusLogger.getLogger();
85 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
86 private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
87
88 static enum ThreadNameStrategy {
89 CACHED {
90 @Override
91 public String getThreadName(final Info info) {
92 return info.cachedThreadName;
93 }
94 },
95 UNCACHED {
96 @Override
97 public String getThreadName(final Info info) {
98 return Thread.currentThread().getName();
99 }
100 };
101 abstract String getThreadName(Info info);
102
103 static ThreadNameStrategy create() {
104 final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
105 try {
106 return ThreadNameStrategy.valueOf(name);
107 } catch (final Exception ex) {
108 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
109 return CACHED;
110 }
111 }
112 }
113 private static volatile Disruptor<RingBufferLogEvent> disruptor;
114 private static final Clock clock = ClockFactory.getClock();
115
116 private static final ExecutorService executor = Executors
117 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
118
119 static {
120 initInfoForExecutorThread();
121 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
122 final int ringBufferSize = calculateRingBufferSize();
123
124 final WaitStrategy waitStrategy = createWaitStrategy();
125 disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
126 ProducerType.MULTI, waitStrategy);
127 disruptor.handleExceptionsWith(getExceptionHandler());
128 disruptor.handleEventsWith(new RingBufferLogEventHandler());
129
130 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
131 .getBufferSize());
132 disruptor.start();
133 }
134
135 private static int calculateRingBufferSize() {
136 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
137 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.RingBufferSize",
138 String.valueOf(ringBufferSize));
139 try {
140 int size = Integer.parseInt(userPreferredRBSize);
141 if (size < RINGBUFFER_MIN_SIZE) {
142 size = RINGBUFFER_MIN_SIZE;
143 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
144 RINGBUFFER_MIN_SIZE);
145 }
146 ringBufferSize = size;
147 } catch (final Exception ex) {
148 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
149 }
150 return Integers.ceilingNextPowerOfTwo(ringBufferSize);
151 }
152
153
154
155
156
157
158
159
160 private static void initInfoForExecutorThread() {
161 executor.submit(new Runnable(){
162 @Override
163 public void run() {
164 final boolean isAppenderThread = true;
165 final Info info = new Info(new RingBufferLogEventTranslator(),
166 Thread.currentThread().getName(), isAppenderThread);
167 threadlocalInfo.set(info);
168 }
169 });
170 }
171
172 private static WaitStrategy createWaitStrategy() {
173 final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
174 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
175 if ("Sleep".equals(strategy)) {
176 return new SleepingWaitStrategy();
177 } else if ("Yield".equals(strategy)) {
178 return new YieldingWaitStrategy();
179 } else if ("Block".equals(strategy)) {
180 return new BlockingWaitStrategy();
181 }
182 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
183 return new BlockingWaitStrategy();
184 }
185
186 private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
187 final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
188 if (cls == null) {
189 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
190 return null;
191 }
192 try {
193 final ExceptionHandler<RingBufferLogEvent> result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
194 LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
195 return result;
196 } catch (final Exception ignored) {
197 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
198 return null;
199 }
200 }
201
202
203
204
205
206
207
208
209
210 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
211 super(context, name, messageFactory);
212 }
213
214
215
216
217 static class Info {
218 private final RingBufferLogEventTranslator translator;
219 private final String cachedThreadName;
220 private final boolean isAppenderThread;
221 public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
222 this.translator = translator;
223 this.cachedThreadName = threadName;
224 this.isAppenderThread = appenderThread;
225 }
226 }
227
228 @Override
229 public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
230
231 Info info = threadlocalInfo.get();
232 if (info == null) {
233 info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
234 threadlocalInfo.set(info);
235 }
236
237 final Disruptor<RingBufferLogEvent> temp = disruptor;
238 if (temp == null) {
239 LOGGER.fatal("Ignoring log event after log4j was shut down");
240 return;
241 }
242
243
244
245 if (info.isAppenderThread && temp.getRingBuffer().remainingCapacity() == 0) {
246
247 config.loggerConfig.log(getName(), fqcn, marker, level, message, thrown);
248 return;
249 }
250 message.getFormattedMessage();
251 final boolean includeLocation = config.loggerConfig.isIncludeLocation();
252 info.translator.setValues(this, getName(), marker, fqcn, level, message,
253
254 thrown,
255
256
257
258
259
260 ThreadContext.getImmutableContext(),
261
262
263 ThreadContext.getImmutableStack(),
264
265
266
267 THREAD_NAME_STRATEGY.getThreadName(info),
268
269
270
271
272 includeLocation ? location(fqcn) : null,
273
274
275
276
277
278 message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() :
279 clock.currentTimeMillis());
280
281
282 try {
283
284
285
286 disruptor.publishEvent(info.translator);
287 } catch (final NullPointerException npe) {
288 LOGGER.fatal("Ignoring log event after log4j was shut down.");
289 }
290 }
291
292 private static StackTraceElement location(final String fqcnOfLogger) {
293 return Log4jLogEvent.calcLocation(fqcnOfLogger);
294 }
295
296
297
298
299
300
301
302 public void actualAsyncLog(final RingBufferLogEvent event) {
303 final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
304 event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
305 config.logEvent(event);
306 }
307
308 public static void stop() {
309 final Disruptor<RingBufferLogEvent> temp = disruptor;
310
311
312
313 disruptor = null;
314 if (temp == null) {
315 return;
316 }
317
318
319
320
321 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
322 try {
323 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
324 } catch (final InterruptedException e) {
325 }
326 }
327 temp.shutdown();
328 executor.shutdown();
329 threadlocalInfo.remove();
330 }
331
332
333
334
335 private static boolean hasBacklog(final Disruptor<?> disruptor) {
336 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
337 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
338 }
339
340
341
342
343
344
345
346 public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
347 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
348 }
349 }