View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.Map;
20  import java.util.concurrent.ExecutorService;
21  import java.util.concurrent.Executors;
22  
23  import org.apache.logging.log4j.Level;
24  import org.apache.logging.log4j.Marker;
25  import org.apache.logging.log4j.ThreadContext;
26  import org.apache.logging.log4j.core.Logger;
27  import org.apache.logging.log4j.core.LoggerContext;
28  import org.apache.logging.log4j.core.config.Property;
29  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
30  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
31  import org.apache.logging.log4j.core.util.Clock;
32  import org.apache.logging.log4j.core.util.ClockFactory;
33  import org.apache.logging.log4j.core.util.Integers;
34  import org.apache.logging.log4j.core.util.Loader;
35  import org.apache.logging.log4j.message.Message;
36  import org.apache.logging.log4j.message.MessageFactory;
37  import org.apache.logging.log4j.message.TimestampMessage;
38  import org.apache.logging.log4j.status.StatusLogger;
39  import org.apache.logging.log4j.util.PropertiesUtil;
40  
41  import com.lmax.disruptor.BlockingWaitStrategy;
42  import com.lmax.disruptor.ExceptionHandler;
43  import com.lmax.disruptor.RingBuffer;
44  import com.lmax.disruptor.SleepingWaitStrategy;
45  import com.lmax.disruptor.WaitStrategy;
46  import com.lmax.disruptor.YieldingWaitStrategy;
47  import com.lmax.disruptor.dsl.Disruptor;
48  import com.lmax.disruptor.dsl.ProducerType;
49  
50  /**
51   * AsyncLogger is a logger designed for high throughput and low latency logging.
52   * It does not perform any I/O in the calling (application) thread, but instead
53   * hands off the work to another thread as soon as possible. The actual logging
54   * is performed in the background thread. It uses the LMAX Disruptor library for
55   * inter-thread communication. (<a
56   * href="http://lmax-exchange.github.com/disruptor/"
57   * >http://lmax-exchange.github.com/disruptor/</a>)
58   * <p>
59   * To use AsyncLogger, specify the System property
60   * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
61   * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
62   * will be AsyncLoggers.
63   * <p>
64   * Note that for performance reasons, this logger does not include source
65   * location by default. You need to specify {@code includeLocation="true"} in
66   * the configuration or any %class, %location or %line conversion patterns in
67   * your log4j.xml configuration will produce either a "?" character or no output
68   * at all.
69   * <p>
70   * For best performance, use AsyncLogger with the RandomAccessFileAppender or
71   * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders
72   * have built-in support for the batching mechanism used by the Disruptor
73   * library, and they will flush to disk at the end of each batch. This means
74   * that even with immediateFlush=false, there will never be any items left in
75   * the buffer; all log events will all be written to disk in a very efficient
76   * manner.
77   */
78  public class AsyncLogger extends Logger {
79      private static final long serialVersionUID = 1L;
80      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
81      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
82      private static final int RINGBUFFER_MIN_SIZE = 128;
83      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
84      private static final StatusLogger LOGGER = StatusLogger.getLogger();
85      private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
86      private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
87  
88      static enum ThreadNameStrategy { // LOG4J2-467
89          CACHED {
90              @Override
91              public String getThreadName(final Info info) {
92                  return info.cachedThreadName;
93              }
94          },
95          UNCACHED {
96              @Override
97              public String getThreadName(final Info info) {
98                  return Thread.currentThread().getName();
99              }
100         };
101         abstract String getThreadName(Info info);
102 
103         static ThreadNameStrategy create() {
104             final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
105             try {
106                 return ThreadNameStrategy.valueOf(name);
107             } catch (final Exception ex) {
108                 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
109                 return CACHED;
110             }
111         }
112     }
113     private static volatile Disruptor<RingBufferLogEvent> disruptor;
114     private static final Clock clock = ClockFactory.getClock();
115 
116     private static final ExecutorService executor = Executors
117             .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
118 
119     static {
120         initInfoForExecutorThread();
121         LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
122         final int ringBufferSize = calculateRingBufferSize();
123 
124         final WaitStrategy waitStrategy = createWaitStrategy();
125         disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
126                 ProducerType.MULTI, waitStrategy);
127         disruptor.handleExceptionsWith(getExceptionHandler());
128         disruptor.handleEventsWith(new RingBufferLogEventHandler());
129 
130         LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
131                 .getBufferSize());
132         disruptor.start();
133     }
134 
135     private static int calculateRingBufferSize() {
136         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
137         final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.RingBufferSize",
138                 String.valueOf(ringBufferSize));
139         try {
140             int size = Integer.parseInt(userPreferredRBSize);
141             if (size < RINGBUFFER_MIN_SIZE) {
142                 size = RINGBUFFER_MIN_SIZE;
143                 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
144                         RINGBUFFER_MIN_SIZE);
145             }
146             ringBufferSize = size;
147         } catch (final Exception ex) {
148             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
149         }
150         return Integers.ceilingNextPowerOfTwo(ringBufferSize);
151     }
152 
153     /**
154      * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread.
155      * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}.
156      * All other Info objects will have this attribute set to {@code false}.
157      * This allows us to detect Logger.log() calls initiated from the appender thread,
158      * which may cause deadlock when the RingBuffer is full. (LOG4J2-471)
159      */
160     private static void initInfoForExecutorThread() {
161         executor.submit(new Runnable(){
162             @Override
163             public void run() {
164                 final boolean isAppenderThread = true;
165                 final Info info = new Info(new RingBufferLogEventTranslator(), //
166                         Thread.currentThread().getName(), isAppenderThread);
167                 threadlocalInfo.set(info);
168             }
169         });
170     }
171 
172     private static WaitStrategy createWaitStrategy() {
173         final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
174         LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
175         if ("Sleep".equals(strategy)) {
176             return new SleepingWaitStrategy();
177         } else if ("Yield".equals(strategy)) {
178             return new YieldingWaitStrategy();
179         } else if ("Block".equals(strategy)) {
180             return new BlockingWaitStrategy();
181         }
182         LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
183         return new BlockingWaitStrategy();
184     }
185 
186     private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
187         final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
188         if (cls == null) {
189             LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
190             return null;
191         }
192         try {
193             final ExceptionHandler<RingBufferLogEvent> result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
194             LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
195             return result;
196         } catch (final Exception ignored) {
197             LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
198             return null;
199         }
200     }
201 
202     /**
203      * Constructs an {@code AsyncLogger} with the specified context, name and
204      * message factory.
205      *
206      * @param context context of this logger
207      * @param name name of this logger
208      * @param messageFactory message factory of this logger
209      */
210     public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
211         super(context, name, messageFactory);
212     }
213 
214     /**
215      * Tuple with the event translator and thread name for a thread.
216      */
217     static class Info {
218         private final RingBufferLogEventTranslator translator;
219         private final String cachedThreadName;
220         private final boolean isAppenderThread;
221         public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
222             this.translator = translator;
223             this.cachedThreadName = threadName;
224             this.isAppenderThread = appenderThread;
225         }
226     }
227 
228     @Override
229     public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
230         // TODO refactor to reduce size to <= 35 bytecodes to allow JVM to inline it
231         Info info = threadlocalInfo.get();
232         if (info == null) {
233             info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
234             threadlocalInfo.set(info);
235         }
236         
237         final Disruptor<RingBufferLogEvent> temp = disruptor;
238         if (temp == null) { // LOG4J2-639
239             LOGGER.fatal("Ignoring log event after log4j was shut down");
240             return;
241         }
242 
243         // LOG4J2-471: prevent deadlock when RingBuffer is full and object
244         // being logged calls Logger.log() from its toString() method
245         if (info.isAppenderThread && temp.getRingBuffer().remainingCapacity() == 0) {
246             // bypass RingBuffer and invoke Appender directly
247             config.loggerConfig.log(getName(), fqcn, marker, level, message, thrown);
248             return;
249         }
250         message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
251         final boolean includeLocation = config.loggerConfig.isIncludeLocation();
252         info.translator.setValues(this, getName(), marker, fqcn, level, message, //
253                 // don't construct ThrowableProxy until required
254                 thrown, //
255 
256                 // config properties are taken care of in the EventHandler
257                 // thread in the #actualAsyncLog method
258 
259                 // needs shallow copy to be fast (LOG4J2-154)
260                 ThreadContext.getImmutableContext(), //
261 
262                 // needs shallow copy to be fast (LOG4J2-154)
263                 ThreadContext.getImmutableStack(), //
264 
265                 // Thread.currentThread().getName(), //
266                 // info.cachedThreadName, //
267                 THREAD_NAME_STRATEGY.getThreadName(info), // LOG4J2-467
268 
269                 // location: very expensive operation. LOG4J2-153:
270                 // Only include if "includeLocation=true" is specified,
271                 // exclude if not specified or if "false" was specified.
272                 includeLocation ? location(fqcn) : null,
273 
274                 // System.currentTimeMillis());
275                 // CoarseCachedClock: 20% faster than system clock, 16ms gaps
276                 // CachedClock: 10% faster than system clock, smaller gaps
277                 // LOG4J2-744 avoid calling clock altogether if message has the timestamp
278                 message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() :
279                         clock.currentTimeMillis());
280 
281         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
282         try {
283             // Note: do NOT use the temp variable above!
284             // That could result in adding a log event to the disruptor after it was shut down,
285             // which could cause the publishEvent method to hang and never return.
286             disruptor.publishEvent(info.translator);
287         } catch (final NullPointerException npe) {
288             LOGGER.fatal("Ignoring log event after log4j was shut down.");
289         }
290     }
291 
292     private static StackTraceElement location(final String fqcnOfLogger) {
293         return Log4jLogEvent.calcLocation(fqcnOfLogger);
294     }
295 
296     /**
297      * This method is called by the EventHandler that processes the
298      * RingBufferLogEvent in a separate thread.
299      *
300      * @param event the event to log
301      */
302     public void actualAsyncLog(final RingBufferLogEvent event) {
303         final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
304         event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
305         config.logEvent(event);
306     }
307 
308     public static void stop() {
309         final Disruptor<RingBufferLogEvent> temp = disruptor;
310 
311         // Must guarantee that publishing to the RingBuffer has stopped
312         // before we call disruptor.shutdown()
313         disruptor = null; // client code fails with NPE if log after stop = OK
314         if (temp == null) {
315             return; // stop() has already been called
316         }
317 
318         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
319         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
320         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
321         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
322             try {
323                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
324             } catch (final InterruptedException e) { // ignored
325             }
326         }
327         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
328         executor.shutdown(); // finally, kill the processor thread
329         threadlocalInfo.remove(); // LOG4J2-323
330     }
331 
332     /**
333      * Returns {@code true} if the specified disruptor still has unprocessed events.
334      */
335     private static boolean hasBacklog(final Disruptor<?> disruptor) {
336         final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
337         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
338     }
339 
340     /**
341      * Creates and returns a new {@code RingBufferAdmin} that instruments the
342      * ringbuffer of the {@code AsyncLogger}.
343      *
344      * @param contextName name of the global {@code AsyncLoggerContext}
345      */
346     public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
347         return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
348     }
349 }