View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.Map;
20  import java.util.concurrent.ExecutorService;
21  import java.util.concurrent.Executors;
22  
23  import org.apache.logging.log4j.Level;
24  import org.apache.logging.log4j.Marker;
25  import org.apache.logging.log4j.ThreadContext;
26  import org.apache.logging.log4j.core.Logger;
27  import org.apache.logging.log4j.core.LoggerContext;
28  import org.apache.logging.log4j.core.config.Property;
29  import org.apache.logging.log4j.core.helpers.Clock;
30  import org.apache.logging.log4j.core.helpers.ClockFactory;
31  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
33  import org.apache.logging.log4j.message.Message;
34  import org.apache.logging.log4j.message.MessageFactory;
35  import org.apache.logging.log4j.status.StatusLogger;
36  
37  import com.lmax.disruptor.BlockingWaitStrategy;
38  import com.lmax.disruptor.EventHandler;
39  import com.lmax.disruptor.ExceptionHandler;
40  import com.lmax.disruptor.RingBuffer;
41  import com.lmax.disruptor.SleepingWaitStrategy;
42  import com.lmax.disruptor.WaitStrategy;
43  import com.lmax.disruptor.YieldingWaitStrategy;
44  import com.lmax.disruptor.dsl.Disruptor;
45  import com.lmax.disruptor.dsl.ProducerType;
46  import com.lmax.disruptor.util.Util;
47  
48  /**
49   * AsyncLogger is a logger designed for high throughput and low latency logging.
50   * It does not perform any I/O in the calling (application) thread, but instead
51   * hands off the work to another thread as soon as possible. The actual logging
52   * is performed in the background thread. It uses the LMAX Disruptor library for
53   * inter-thread communication. (<a
54   * href="http://lmax-exchange.github.com/disruptor/"
55   * >http://lmax-exchange.github.com/disruptor/</a>)
56   * <p>
57   * To use AsyncLogger, specify the System property
58   * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
59   * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
60   * will be AsyncLoggers.
61   * <p>
62   * Note that for performance reasons, this logger does not include source
63   * location by default. You need to specify {@code includeLocation="true"} in
64   * the configuration or any %class, %location or %line conversion patterns in
65   * your log4j.xml configuration will produce either a "?" character or no output
66   * at all.
67   * <p>
68   * For best performance, use AsyncLogger with the RandomAccessFileAppender or
69   * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders
70   * have built-in support for the batching mechanism used by the Disruptor
71   * library, and they will flush to disk at the end of each batch. This means
72   * that even with immediateFlush=false, there will never be any items left in
73   * the buffer; all log events will all be written to disk in a very efficient
74   * manner.
75   */
76  public class AsyncLogger extends Logger {
77      private static final long serialVersionUID = 1L;
78      private static final int HALF_A_SECOND = 500;
79      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
80      private static final int RINGBUFFER_MIN_SIZE = 128;
81      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
82      private static final StatusLogger LOGGER = StatusLogger.getLogger();
83      private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
84      private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
85  
86      static enum ThreadNameStrategy { // LOG4J2-467
87          CACHED {
88              @Override
89              public String getThreadName(Info info) {
90                  return info.cachedThreadName;
91              }
92          },
93          UNCACHED {
94              @Override
95              public String getThreadName(Info info) {
96                  return Thread.currentThread().getName();
97              }
98          };
99          abstract String getThreadName(Info info);
100 
101         static ThreadNameStrategy create() {
102             String name = System.getProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
103             try {
104                 return ThreadNameStrategy.valueOf(name);
105             } catch (Exception ex) {
106                 return CACHED;
107             }
108         }
109     }
110     private static volatile Disruptor<RingBufferLogEvent> disruptor;
111     private static Clock clock = ClockFactory.getClock();
112 
113     private static ExecutorService executor = Executors
114             .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
115 
116     static {
117         initInfoForExecutorThread();
118         LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
119         final int ringBufferSize = calculateRingBufferSize();
120 
121         final WaitStrategy waitStrategy = createWaitStrategy();
122         disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor,
123                 ProducerType.MULTI, waitStrategy);
124         final EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {//
125         new RingBufferLogEventHandler() };
126         disruptor.handleExceptionsWith(getExceptionHandler());
127         disruptor.handleEventsWith(handlers);
128 
129         LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
130                 .getBufferSize());
131         disruptor.start();
132     }
133 
134     private static int calculateRingBufferSize() {
135         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
136         final String userPreferredRBSize = System.getProperty("AsyncLogger.RingBufferSize",
137                 String.valueOf(ringBufferSize));
138         try {
139             int size = Integer.parseInt(userPreferredRBSize);
140             if (size < RINGBUFFER_MIN_SIZE) {
141                 size = RINGBUFFER_MIN_SIZE;
142                 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
143                         RINGBUFFER_MIN_SIZE);
144             }
145             ringBufferSize = size;
146         } catch (final Exception ex) {
147             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
148         }
149         return Util.ceilingNextPowerOfTwo(ringBufferSize);
150     }
151 
152     /**
153      * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread.
154      * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}.
155      * All other Info objects will have this attribute set to {@code false}.
156      * This allows us to detect Logger.log() calls initiated from the appender thread,
157      * which may cause deadlock when the RingBuffer is full. (LOG4J2-471)
158      */
159     private static void initInfoForExecutorThread() {
160         executor.submit(new Runnable(){
161             @Override
162             public void run() {
163                 final boolean isAppenderThread = true;
164                 final Info info = new Info(new RingBufferLogEventTranslator(), //
165                         Thread.currentThread().getName(), isAppenderThread);
166                 threadlocalInfo.set(info);
167             }
168         });
169     }
170 
171     private static WaitStrategy createWaitStrategy() {
172         final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
173         LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
174         if ("Sleep".equals(strategy)) {
175             LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
176             return new SleepingWaitStrategy();
177         } else if ("Yield".equals(strategy)) {
178             LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
179             return new YieldingWaitStrategy();
180         } else if ("Block".equals(strategy)) {
181             LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
182             return new BlockingWaitStrategy();
183         }
184         LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
185         return new SleepingWaitStrategy();
186     }
187 
188     private static ExceptionHandler getExceptionHandler() {
189         final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
190         if (cls == null) {
191             LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
192             return null;
193         }
194         try {
195             @SuppressWarnings("unchecked")
196             final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class.forName(cls);
197             final ExceptionHandler result = klass.newInstance();
198             LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
199             return result;
200         } catch (final Exception ignored) {
201             LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
202             return null;
203         }
204     }
205 
206     /**
207      * Constructs an {@code AsyncLogger} with the specified context, name and
208      * message factory.
209      * 
210      * @param context context of this logger
211      * @param name name of this logger
212      * @param messageFactory message factory of this logger
213      */
214     public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
215         super(context, name, messageFactory);
216     }
217 
218     /**
219      * Tuple with the event translator and thread name for a thread.
220      */
221     static class Info {
222         private final RingBufferLogEventTranslator translator;
223         private final String cachedThreadName;
224         private final boolean isAppenderThread;
225         public Info(RingBufferLogEventTranslator translator, String threadName, boolean appenderThread) {
226             this.translator = translator;
227             this.cachedThreadName = threadName;
228             this.isAppenderThread = appenderThread;
229         }
230     }
231 
232     @Override
233     public void log(final Marker marker, final String fqcn, final Level level, final Message data, final Throwable t) {
234         Info info = threadlocalInfo.get();
235         if (info == null) {
236             info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
237             threadlocalInfo.set(info);
238         }
239         
240         // LOG4J2-471: prevent deadlock when RingBuffer is full and object
241         // being logged calls Logger.log() from its toString() method
242         if (info.isAppenderThread && disruptor.getRingBuffer().remainingCapacity() == 0) {
243             // bypass RingBuffer and invoke Appender directly
244             config.loggerConfig.log(getName(), marker, fqcn, level, data, t);
245             return;
246         }
247         final boolean includeLocation = config.loggerConfig.isIncludeLocation();
248         info.translator.setValues(this, getName(), marker, fqcn, level, data, t, //
249 
250                 // config properties are taken care of in the EventHandler
251                 // thread in the #actualAsyncLog method
252 
253                 // needs shallow copy to be fast (LOG4J2-154)
254                 ThreadContext.getImmutableContext(), //
255 
256                 // needs shallow copy to be fast (LOG4J2-154)
257                 ThreadContext.getImmutableStack(), //
258 
259                 // Thread.currentThread().getName(), //
260                 // info.cachedThreadName, //
261                 THREAD_NAME_STRATEGY.getThreadName(info), // LOG4J2-467
262 
263                 // location: very expensive operation. LOG4J2-153:
264                 // Only include if "includeLocation=true" is specified,
265                 // exclude if not specified or if "false" was specified.
266                 includeLocation ? location(fqcn) : null,
267 
268                 // System.currentTimeMillis());
269                 // CoarseCachedClock: 20% faster than system clock, 16ms gaps
270                 // CachedClock: 10% faster than system clock, smaller gaps
271                 clock.currentTimeMillis());
272 
273         disruptor.publishEvent(info.translator);
274     }
275 
276     private StackTraceElement location(final String fqcnOfLogger) {
277         return Log4jLogEvent.calcLocation(fqcnOfLogger);
278     }
279 
280     /**
281      * This method is called by the EventHandler that processes the
282      * RingBufferLogEvent in a separate thread.
283      * 
284      * @param event the event to log
285      */
286     public void actualAsyncLog(final RingBufferLogEvent event) {
287         final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
288         event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor());
289         config.logEvent(event);
290     }
291 
292     public static void stop() {
293         final Disruptor<RingBufferLogEvent> temp = disruptor;
294 
295         // Must guarantee that publishing to the RingBuffer has stopped
296         // before we call disruptor.shutdown()
297         disruptor = null; // client code fails with NPE if log after stop = OK
298         if (temp == null) {
299             return; // stop() has already been called
300         }
301         temp.shutdown();
302 
303         // wait up to 10 seconds for the ringbuffer to drain
304         final RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
305         for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
306             if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
307                 break;
308             }
309             try {
310                 // give ringbuffer some time to drain...
311                 Thread.sleep(HALF_A_SECOND);
312             } catch (final InterruptedException e) {
313                 // ignored
314             }
315         }
316         executor.shutdown(); // finally, kill the processor thread
317         threadlocalInfo.remove(); // LOG4J2-323
318     }
319 
320     /**
321      * Creates and returns a new {@code RingBufferAdmin} that instruments the
322      * ringbuffer of the {@code AsyncLogger}.
323      * 
324      * @param contextName name of the global {@code AsyncLoggerContext}
325      */
326     public static RingBufferAdmin createRingBufferAdmin(String contextName) {
327         return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
328     }
329 }