View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ThreadFactory;
22  
23  import org.apache.logging.log4j.Logger;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26  import org.apache.logging.log4j.core.util.Integers;
27  import org.apache.logging.log4j.status.StatusLogger;
28  import org.apache.logging.log4j.util.PropertiesUtil;
29  
30  import com.lmax.disruptor.BlockingWaitStrategy;
31  import com.lmax.disruptor.EventFactory;
32  import com.lmax.disruptor.EventHandler;
33  import com.lmax.disruptor.EventTranslatorTwoArg;
34  import com.lmax.disruptor.ExceptionHandler;
35  import com.lmax.disruptor.RingBuffer;
36  import com.lmax.disruptor.Sequence;
37  import com.lmax.disruptor.SequenceReportingEventHandler;
38  import com.lmax.disruptor.SleepingWaitStrategy;
39  import com.lmax.disruptor.WaitStrategy;
40  import com.lmax.disruptor.YieldingWaitStrategy;
41  import com.lmax.disruptor.dsl.Disruptor;
42  import com.lmax.disruptor.dsl.ProducerType;
43  
44  /**
45   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX
46   * Disruptor library.
47   * <p>
48   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do
49   * not configure any {@code <asyncLogger>} or {@code <asyncRoot>} elements in
50   * the configuration. If {@code AsyncLoggerConfig} has inner classes that extend
51   * or implement classes from the Disruptor library, a
52   * {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in the
53   * classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin
54   * from the pre-defined plugins definition file.
55   * <p>
56   * This class serves to make the dependency on the Disruptor optional, so that
57   * these classes are only loaded when the {@code AsyncLoggerConfig} is actually
58   * used.
59   */
60  class AsyncLoggerConfigHelper {
61  
62      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
63      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
64      private static final int RINGBUFFER_MIN_SIZE = 128;
65      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
66      private static final Logger LOGGER = StatusLogger.getLogger();
67  
68      private static ThreadFactory threadFactory = new DaemonThreadFactory("AsyncLoggerConfig-");
69      private static volatile Disruptor<Log4jEventWrapper> disruptor;
70      private static ExecutorService executor;
71  
72      private static volatile int count = 0;
73      private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
74  
75      /**
76       * Factory used to populate the RingBuffer with events. These event objects
77       * are then re-used during the life of the RingBuffer.
78       */
79      private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
80          @Override
81          public Log4jEventWrapper newInstance() {
82              return new Log4jEventWrapper();
83          }
84      };
85  
86      /**
87       * Object responsible for passing on data to a specific RingBuffer event.
88       */
89      private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator 
90              = new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
91  
92          @Override
93          public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence, 
94                  final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
95              ringBufferElement.event = logEvent;
96              ringBufferElement.loggerConfig = loggerConfig;
97          }
98      };
99  
100     private final AsyncLoggerConfig asyncLoggerConfig;
101 
102     public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
103         this.asyncLoggerConfig = asyncLoggerConfig;
104         claim();
105     }
106 
107     private static synchronized void initDisruptor() {
108         if (disruptor != null) {
109             LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
110             return;
111         }
112         LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
113         final int ringBufferSize = calculateRingBufferSize();
114         final WaitStrategy waitStrategy = createWaitStrategy();
115         executor = Executors.newSingleThreadExecutor(threadFactory);
116         initThreadLocalForExecutorThread();
117         disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
118                 executor, ProducerType.MULTI, waitStrategy);
119         final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
120         new Log4jEventWrapperHandler() };
121         final ExceptionHandler<Log4jEventWrapper> errorHandler = getExceptionHandler();
122         disruptor.handleExceptionsWith(errorHandler);
123         disruptor.handleEventsWith(handlers);
124 
125         LOGGER.debug(
126                 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
127                 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
128         disruptor.start();
129     }
130 
131     private static WaitStrategy createWaitStrategy() {
132         final String strategy = System
133                 .getProperty("AsyncLoggerConfig.WaitStrategy");
134         LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
135         if ("Sleep".equals(strategy)) {
136             return new SleepingWaitStrategy();
137         } else if ("Yield".equals(strategy)) {
138             return new YieldingWaitStrategy();
139         } else if ("Block".equals(strategy)) {
140             return new BlockingWaitStrategy();
141         }
142         LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
143         return new BlockingWaitStrategy();
144     }
145 
146     private static int calculateRingBufferSize() {
147         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
148         final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
149                 "AsyncLoggerConfig.RingBufferSize",
150                 String.valueOf(ringBufferSize));
151         try {
152             int size = Integer.parseInt(userPreferredRBSize);
153             if (size < RINGBUFFER_MIN_SIZE) {
154                 size = RINGBUFFER_MIN_SIZE;
155                 LOGGER.warn(
156                         "Invalid RingBufferSize {}, using minimum size {}.",
157                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
158             }
159             ringBufferSize = size;
160         } catch (final Exception ex) {
161             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
162                     userPreferredRBSize, ringBufferSize);
163         }
164         return Integers.ceilingNextPowerOfTwo(ringBufferSize);
165     }
166 
167     private static ExceptionHandler<Log4jEventWrapper> getExceptionHandler() {
168         final String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
169         if (cls == null) {
170             return null;
171         }
172         try {
173             @SuppressWarnings("unchecked")
174             final Class<? extends ExceptionHandler<Log4jEventWrapper>> klass = (Class<? extends ExceptionHandler<Log4jEventWrapper>>) Class
175                     .forName(cls);
176             return klass.newInstance();
177         } catch (final Exception ignored) {
178             LOGGER.debug("AsyncLoggerConfig.ExceptionHandler not set: error creating " + cls + ": ", ignored);
179             return null;
180         }
181     }
182 
183     /**
184      * RingBuffer events contain all information necessary to perform the work
185      * in a separate thread.
186      */
187     private static class Log4jEventWrapper {
188         private AsyncLoggerConfig loggerConfig;
189         private LogEvent event;
190 
191         /**
192          * Release references held by ring buffer to allow objects to be
193          * garbage-collected.
194          */
195         public void clear() {
196             loggerConfig = null;
197             event = null;
198         }
199     }
200 
201     /**
202      * EventHandler performs the work in a separate thread.
203      */
204     private static class Log4jEventWrapperHandler implements
205             SequenceReportingEventHandler<Log4jEventWrapper> {
206         private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
207         private Sequence sequenceCallback;
208         private int counter;
209 
210         @Override
211         public void setSequenceCallback(final Sequence sequenceCallback) {
212             this.sequenceCallback = sequenceCallback;
213         }
214 
215         @Override
216         public void onEvent(final Log4jEventWrapper event, final long sequence,
217                 final boolean endOfBatch) throws Exception {
218             event.event.setEndOfBatch(endOfBatch);
219             event.loggerConfig.asyncCallAppenders(event.event);
220             event.clear();
221 
222             // notify the BatchEventProcessor that the sequence has progressed.
223             // Without this callback the sequence would not be progressed
224             // until the batch has completely finished.
225             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
226                 sequenceCallback.set(sequence);
227                 counter = 0;
228             }
229         }
230     }
231 
232     /**
233      * Increases the reference count and creates and starts a new Disruptor and
234      * associated thread if none currently exists.
235      * 
236      * @see #release()
237      */
238     synchronized static void claim() {
239         count++;
240         initDisruptor();
241     }
242 
243     /**
244      * Decreases the reference count. If the reference count reached zero, the
245      * Disruptor and its associated thread are shut down and their references
246      * set to {@code null}.
247      */
248     synchronized static void release() {
249         if (--count > 0) {
250             LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
251             return;
252         }
253         final Disruptor<Log4jEventWrapper> temp = disruptor;
254         if (temp == null) {
255             LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}. (Resetting to zero.)",
256                     count);
257             count = 0; // ref count must not be negative or #claim() will not work correctly
258             return; // disruptor was already shut down by another thread
259         }
260         LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
261 
262         // Must guarantee that publishing to the RingBuffer has stopped
263         // before we call disruptor.shutdown()
264         disruptor = null; // client code fails with NPE if log after stop = OK
265 
266         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
267         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
268         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
269         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
270             try {
271                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
272             } catch (final InterruptedException e) { // ignored
273             }
274         }
275         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
276         executor.shutdown(); // finally, kill the processor thread
277         executor = null; // release reference to allow GC
278     }
279 
280     /**
281      * Returns {@code true} if the specified disruptor still has unprocessed events.
282      */
283     private static boolean hasBacklog(final Disruptor<?> disruptor) {
284         final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
285         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
286     }
287 
288     /**
289      * Initialize the threadlocal that allows us to detect Logger.log() calls 
290      * initiated from the appender thread, which may cause deadlock when the 
291      * RingBuffer is full. (LOG4J2-471)
292      */
293     private static void initThreadLocalForExecutorThread() {
294         executor.submit(new Runnable() {
295             @Override
296             public void run() {
297                 isAppenderThread.set(Boolean.TRUE);
298             }
299         });
300     }
301 
302     /**
303      * If possible, delegates the invocation to {@code callAppenders} to another
304      * thread and returns {@code true}. If this is not possible (if it detects
305      * that delegating to another thread would cause deadlock because the
306      * current call to Logger.log() originated from the appender thread and the
307      * ringbuffer is full) then this method does nothing and returns {@code false}.
308      * It is the responsibility of the caller to process the event when this
309      * method returns {@code false}.
310      * 
311      * @param event the event to delegate to another thread
312      * @return {@code true} if delegation was successful, {@code false} if the
313      *          calling thread needs to process the event itself
314      */
315     public boolean callAppendersFromAnotherThread(final LogEvent event) {
316         // TODO refactor to reduce size to <= 35 bytecodes to allow JVM to inline it
317         final Disruptor<Log4jEventWrapper> temp = disruptor;
318         if (temp == null) { // LOG4J2-639
319             LOGGER.fatal("Ignoring log event after log4j was shut down");
320             return true;
321         }
322 
323         // LOG4J2-471: prevent deadlock when RingBuffer is full and object
324         // being logged calls Logger.log() from its toString() method
325         if (isAppenderThread.get() == Boolean.TRUE //
326                 && temp.getRingBuffer().remainingCapacity() == 0) {
327 
328             // bypass RingBuffer and invoke Appender directly
329             return false;
330         }
331         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
332         try {
333             LogEvent logEvent = event;
334             if (event instanceof RingBufferLogEvent) {
335                 logEvent = ((RingBufferLogEvent) event).createMemento();
336             }
337             logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
338 
339             // Note: do NOT use the temp variable above!
340             // That could result in adding a log event to the disruptor after it was shut down,
341             // which could cause the publishEvent method to hang and never return.
342             disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
343         } catch (final NullPointerException npe) {
344             LOGGER.fatal("Ignoring log event after log4j was shut down.");
345         }
346         return true;
347     }
348 
349     /**
350      * Creates and returns a new {@code RingBufferAdmin} that instruments the
351      * ringbuffer of this {@code AsyncLoggerConfig}.
352      * 
353      * @param contextName name of the {@code LoggerContext}
354      * @param loggerConfigName name of the logger config
355      */
356     public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
357         return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
358     }
359 
360 }