View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ThreadFactory;
22  
23  import org.apache.logging.log4j.Logger;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26  import org.apache.logging.log4j.status.StatusLogger;
27  
28  import com.lmax.disruptor.BlockingWaitStrategy;
29  import com.lmax.disruptor.EventFactory;
30  import com.lmax.disruptor.EventHandler;
31  import com.lmax.disruptor.EventTranslatorTwoArg;
32  import com.lmax.disruptor.ExceptionHandler;
33  import com.lmax.disruptor.RingBuffer;
34  import com.lmax.disruptor.Sequence;
35  import com.lmax.disruptor.SequenceReportingEventHandler;
36  import com.lmax.disruptor.SleepingWaitStrategy;
37  import com.lmax.disruptor.WaitStrategy;
38  import com.lmax.disruptor.YieldingWaitStrategy;
39  import com.lmax.disruptor.dsl.Disruptor;
40  import com.lmax.disruptor.dsl.ProducerType;
41  import com.lmax.disruptor.util.Util;
42  
43  /**
44   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX
45   * Disruptor library.
46   * <p>
47   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do
48   * not configure any {@code <asyncLogger>} or {@code <asyncRoot>} elements in
49   * the configuration. If {@code AsyncLoggerConfig} has inner classes that extend
50   * or implement classes from the Disruptor library, a
51   * {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in the
52   * classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin
53   * from the pre-defined plugins definition file.
54   * <p>
55   * This class serves to make the dependency on the Disruptor optional, so that
56   * these classes are only loaded when the {@code AsyncLoggerConfig} is actually
57   * used.
58   */
59  class AsyncLoggerConfigHelper {
60  
61      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
62      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
63      private static final int RINGBUFFER_MIN_SIZE = 128;
64      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
65      private static final Logger LOGGER = StatusLogger.getLogger();
66  
67      private static ThreadFactory threadFactory = new DaemonThreadFactory("AsyncLoggerConfig-");
68      private static volatile Disruptor<Log4jEventWrapper> disruptor;
69      private static ExecutorService executor;
70  
71      private static volatile int count = 0;
72      private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
73  
74      /**
75       * Factory used to populate the RingBuffer with events. These event objects
76       * are then re-used during the life of the RingBuffer.
77       */
78      private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
79          @Override
80          public Log4jEventWrapper newInstance() {
81              return new Log4jEventWrapper();
82          }
83      };
84  
85      /**
86       * Object responsible for passing on data to a specific RingBuffer event.
87       */
88      private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator 
89              = new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
90  
91          @Override
92          public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence, 
93                  final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
94              ringBufferElement.event = logEvent;
95              ringBufferElement.loggerConfig = loggerConfig;
96          }
97      };
98  
99      private final AsyncLoggerConfig asyncLoggerConfig;
100 
101     public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
102         this.asyncLoggerConfig = asyncLoggerConfig;
103         claim();
104     }
105 
106     private static synchronized void initDisruptor() {
107         if (disruptor != null) {
108             LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
109             return;
110         }
111         LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
112         final int ringBufferSize = calculateRingBufferSize();
113         final WaitStrategy waitStrategy = createWaitStrategy();
114         executor = Executors.newSingleThreadExecutor(threadFactory);
115         initThreadLocalForExecutorThread();
116         disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
117                 executor, ProducerType.MULTI, waitStrategy);
118         final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
119         new Log4jEventWrapperHandler() };
120         final ExceptionHandler errorHandler = getExceptionHandler();
121         disruptor.handleExceptionsWith(errorHandler);
122         disruptor.handleEventsWith(handlers);
123 
124         LOGGER.debug(
125                 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
126                 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
127         disruptor.start();
128     }
129 
130     private static WaitStrategy createWaitStrategy() {
131         final String strategy = System
132                 .getProperty("AsyncLoggerConfig.WaitStrategy");
133         LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
134         if ("Sleep".equals(strategy)) {
135             return new SleepingWaitStrategy();
136         } else if ("Yield".equals(strategy)) {
137             return new YieldingWaitStrategy();
138         } else if ("Block".equals(strategy)) {
139             return new BlockingWaitStrategy();
140         }
141         LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
142         return new BlockingWaitStrategy();
143     }
144 
145     private static int calculateRingBufferSize() {
146         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
147         final String userPreferredRBSize = System.getProperty(
148                 "AsyncLoggerConfig.RingBufferSize",
149                 String.valueOf(ringBufferSize));
150         try {
151             int size = Integer.parseInt(userPreferredRBSize);
152             if (size < RINGBUFFER_MIN_SIZE) {
153                 size = RINGBUFFER_MIN_SIZE;
154                 LOGGER.warn(
155                         "Invalid RingBufferSize {}, using minimum size {}.",
156                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
157             }
158             ringBufferSize = size;
159         } catch (final Exception ex) {
160             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
161                     userPreferredRBSize, ringBufferSize);
162         }
163         return Util.ceilingNextPowerOfTwo(ringBufferSize);
164     }
165 
166     private static ExceptionHandler getExceptionHandler() {
167         final String cls = System
168                 .getProperty("AsyncLoggerConfig.ExceptionHandler");
169         if (cls == null) {
170             return null;
171         }
172         try {
173             @SuppressWarnings("unchecked")
174             final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
175                     .forName(cls);
176             final ExceptionHandler result = klass.newInstance();
177             return result;
178         } catch (final Exception ignored) {
179             LOGGER.debug(
180                     "AsyncLoggerConfig.ExceptionHandler not set: error creating "
181                             + cls + ": ", ignored);
182             return null;
183         }
184     }
185 
186     /**
187      * RingBuffer events contain all information necessary to perform the work
188      * in a separate thread.
189      */
190     private static class Log4jEventWrapper {
191         private AsyncLoggerConfig loggerConfig;
192         private LogEvent event;
193 
194         /**
195          * Release references held by ring buffer to allow objects to be
196          * garbage-collected.
197          */
198         public void clear() {
199             loggerConfig = null;
200             event = null;
201         }
202     }
203 
204     /**
205      * EventHandler performs the work in a separate thread.
206      */
207     private static class Log4jEventWrapperHandler implements
208             SequenceReportingEventHandler<Log4jEventWrapper> {
209         private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
210         private Sequence sequenceCallback;
211         private int counter;
212 
213         @Override
214         public void setSequenceCallback(final Sequence sequenceCallback) {
215             this.sequenceCallback = sequenceCallback;
216         }
217 
218         @Override
219         public void onEvent(final Log4jEventWrapper event, final long sequence,
220                 final boolean endOfBatch) throws Exception {
221             event.event.setEndOfBatch(endOfBatch);
222             event.loggerConfig.asyncCallAppenders(event.event);
223             event.clear();
224 
225             // notify the BatchEventProcessor that the sequence has progressed.
226             // Without this callback the sequence would not be progressed
227             // until the batch has completely finished.
228             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
229                 sequenceCallback.set(sequence);
230                 counter = 0;
231             }
232         }
233     }
234 
235     /**
236      * Increases the reference count and creates and starts a new Disruptor and
237      * associated thread if none currently exists.
238      * 
239      * @see #release()
240      */
241     synchronized static void claim() {
242         count++;
243         initDisruptor();
244     }
245 
246     /**
247      * Decreases the reference count. If the reference count reached zero, the
248      * Disruptor and its associated thread are shut down and their references
249      * set to {@code null}.
250      */
251     synchronized static void release() {
252         if (--count > 0) {
253             LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
254             return;
255         }
256         final Disruptor<Log4jEventWrapper> temp = disruptor;
257         if (temp == null) {
258             LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}. (Resetting to zero.)",
259                     count);
260             count = 0; // ref count must not be negative or #claim() will not work correctly
261             return; // disruptor was already shut down by another thread
262         }
263         LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
264 
265         // Must guarantee that publishing to the RingBuffer has stopped
266         // before we call disruptor.shutdown()
267         disruptor = null; // client code fails with NPE if log after stop = OK
268 
269         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
270         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
271         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
272         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
273             try {
274                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
275             } catch (final InterruptedException e) { // ignored
276             }
277         }
278         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
279         executor.shutdown(); // finally, kill the processor thread
280         executor = null; // release reference to allow GC
281     }
282 
283     /**
284      * Returns {@code true} if the specified disruptor still has unprocessed events.
285      */
286     private static boolean hasBacklog(final Disruptor<?> disruptor) {
287         final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
288         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
289     }
290 
291     /**
292      * Initialize the threadlocal that allows us to detect Logger.log() calls 
293      * initiated from the appender thread, which may cause deadlock when the 
294      * RingBuffer is full. (LOG4J2-471)
295      */
296     private static void initThreadLocalForExecutorThread() {
297         executor.submit(new Runnable() {
298             @Override
299             public void run() {
300                 isAppenderThread.set(Boolean.TRUE);
301             }
302         });
303     }
304 
305     /**
306      * If possible, delegates the invocation to {@code callAppenders} to another
307      * thread and returns {@code true}. If this is not possible (if it detects
308      * that delegating to another thread would cause deadlock because the
309      * current call to Logger.log() originated from the appender thread and the
310      * ringbuffer is full) then this method does nothing and returns {@code false}.
311      * It is the responsibility of the caller to process the event when this
312      * method returns {@code false}.
313      * 
314      * @param event the event to delegate to another thread
315      * @return {@code true} if delegation was successful, {@code false} if the
316      *          calling thread needs to process the event itself
317      */
318     public boolean callAppendersFromAnotherThread(final LogEvent event) {
319         // TODO refactor to reduce size to <= 35 bytecodes to allow JVM to inline it
320         final Disruptor<Log4jEventWrapper> temp = disruptor;
321         if (temp == null) { // LOG4J2-639
322             LOGGER.fatal("Ignoring log event after log4j was shut down");
323             return true;
324         }
325 
326         // LOG4J2-471: prevent deadlock when RingBuffer is full and object
327         // being logged calls Logger.log() from its toString() method
328         if (isAppenderThread.get() == Boolean.TRUE //
329                 && temp.getRingBuffer().remainingCapacity() == 0) {
330 
331             // bypass RingBuffer and invoke Appender directly
332             return false;
333         }
334         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
335         try {
336             LogEvent logEvent = event;
337             if (event instanceof RingBufferLogEvent) {
338                 logEvent = ((RingBufferLogEvent) event).createMemento();
339             }
340             logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
341 
342             // Note: do NOT use the temp variable above!
343             // That could result in adding a log event to the disruptor after it was shut down,
344             // which could cause the publishEvent method to hang and never return.
345             disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
346         } catch (final NullPointerException npe) {
347             LOGGER.fatal("Ignoring log event after log4j was shut down.");
348         }
349         return true;
350     }
351 
352     /**
353      * Creates and returns a new {@code RingBufferAdmin} that instruments the
354      * ringbuffer of this {@code AsyncLoggerConfig}.
355      * 
356      * @param contextName name of the {@code LoggerContext}
357      * @param loggerConfigName name of the logger config
358      */
359     public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
360         return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
361     }
362 
363 }