View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ThreadFactory;
22  
23  import org.apache.logging.log4j.Logger;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26  import org.apache.logging.log4j.status.StatusLogger;
27  
28  import com.lmax.disruptor.BlockingWaitStrategy;
29  import com.lmax.disruptor.EventFactory;
30  import com.lmax.disruptor.EventHandler;
31  import com.lmax.disruptor.EventTranslatorTwoArg;
32  import com.lmax.disruptor.ExceptionHandler;
33  import com.lmax.disruptor.RingBuffer;
34  import com.lmax.disruptor.Sequence;
35  import com.lmax.disruptor.SequenceReportingEventHandler;
36  import com.lmax.disruptor.SleepingWaitStrategy;
37  import com.lmax.disruptor.WaitStrategy;
38  import com.lmax.disruptor.YieldingWaitStrategy;
39  import com.lmax.disruptor.dsl.Disruptor;
40  import com.lmax.disruptor.dsl.ProducerType;
41  import com.lmax.disruptor.util.Util;
42  
43  /**
44   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX
45   * Disruptor library.
46   * <p>
47   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do
48   * not configure any {@code <asyncLogger>} or {@code <asyncRoot>} elements in
49   * the configuration. If {@code AsyncLoggerConfig} has inner classes that extend
50   * or implement classes from the Disruptor library, a
51   * {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in the
52   * classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin
53   * from the pre-defined plugins definition file.
54   * <p>
55   * This class serves to make the dependency on the Disruptor optional, so that
56   * these classes are only loaded when the {@code AsyncLoggerConfig} is actually
57   * used.
58   */
59  class AsyncLoggerConfigHelper {
60  
61      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
62      private static final int HALF_A_SECOND = 500;
63      private static final int RINGBUFFER_MIN_SIZE = 128;
64      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
65      private static final Logger LOGGER = StatusLogger.getLogger();
66  
67      private static ThreadFactory threadFactory = new DaemonThreadFactory(
68              "AsyncLoggerConfig-");
69      private static volatile Disruptor<Log4jEventWrapper> disruptor;
70      private static ExecutorService executor;
71  
72      private static volatile int count = 0;
73      private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
74  
75      /**
76       * Factory used to populate the RingBuffer with events. These event objects
77       * are then re-used during the life of the RingBuffer.
78       */
79      private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
80          @Override
81          public Log4jEventWrapper newInstance() {
82              return new Log4jEventWrapper();
83          }
84      };
85  
86      /**
87       * Object responsible for passing on data to a specific RingBuffer event.
88       */
89      private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator 
90              = new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
91  
92          @Override
93          public void translateTo(Log4jEventWrapper ringBufferElement, long sequence, 
94                  LogEvent logEvent, AsyncLoggerConfig loggerConfig) {
95              ringBufferElement.event = logEvent;
96              ringBufferElement.loggerConfig = loggerConfig;
97          }
98      };
99  
100     private final AsyncLoggerConfig asyncLoggerConfig;
101 
102     public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
103         this.asyncLoggerConfig = asyncLoggerConfig;
104         claim();
105     }
106 
107     private static synchronized void initDisruptor() {
108         if (disruptor != null) {
109             LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
110             return;
111         }
112         LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
113         final int ringBufferSize = calculateRingBufferSize();
114         final WaitStrategy waitStrategy = createWaitStrategy();
115         executor = Executors.newSingleThreadExecutor(threadFactory);
116         initThreadLocalForExecutorThread();
117         disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
118                 executor, ProducerType.MULTI, waitStrategy);
119         final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
120         new Log4jEventWrapperHandler() };
121         final ExceptionHandler errorHandler = getExceptionHandler();
122         disruptor.handleExceptionsWith(errorHandler);
123         disruptor.handleEventsWith(handlers);
124 
125         LOGGER.debug(
126                 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
127                 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
128         disruptor.start();
129     }
130 
131     private static WaitStrategy createWaitStrategy() {
132         final String strategy = System
133                 .getProperty("AsyncLoggerConfig.WaitStrategy");
134         LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
135         if ("Sleep".equals(strategy)) {
136             return new SleepingWaitStrategy();
137         } else if ("Yield".equals(strategy)) {
138             return new YieldingWaitStrategy();
139         } else if ("Block".equals(strategy)) {
140             return new BlockingWaitStrategy();
141         }
142         return new SleepingWaitStrategy();
143     }
144 
145     private static int calculateRingBufferSize() {
146         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
147         final String userPreferredRBSize = System.getProperty(
148                 "AsyncLoggerConfig.RingBufferSize",
149                 String.valueOf(ringBufferSize));
150         try {
151             int size = Integer.parseInt(userPreferredRBSize);
152             if (size < RINGBUFFER_MIN_SIZE) {
153                 size = RINGBUFFER_MIN_SIZE;
154                 LOGGER.warn(
155                         "Invalid RingBufferSize {}, using minimum size {}.",
156                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
157             }
158             ringBufferSize = size;
159         } catch (final Exception ex) {
160             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
161                     userPreferredRBSize, ringBufferSize);
162         }
163         return Util.ceilingNextPowerOfTwo(ringBufferSize);
164     }
165 
166     private static ExceptionHandler getExceptionHandler() {
167         final String cls = System
168                 .getProperty("AsyncLoggerConfig.ExceptionHandler");
169         if (cls == null) {
170             return null;
171         }
172         try {
173             @SuppressWarnings("unchecked")
174             final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
175                     .forName(cls);
176             final ExceptionHandler result = klass.newInstance();
177             return result;
178         } catch (final Exception ignored) {
179             LOGGER.debug(
180                     "AsyncLoggerConfig.ExceptionHandler not set: error creating "
181                             + cls + ": ", ignored);
182             return null;
183         }
184     }
185 
186     /**
187      * RingBuffer events contain all information necessary to perform the work
188      * in a separate thread.
189      */
190     private static class Log4jEventWrapper {
191         private AsyncLoggerConfig loggerConfig;
192         private LogEvent event;
193 
194         /**
195          * Release references held by ring buffer to allow objects to be
196          * garbage-collected.
197          */
198         public void clear() {
199             loggerConfig = null;
200             event = null;
201         }
202     }
203 
204     /**
205      * EventHandler performs the work in a separate thread.
206      */
207     private static class Log4jEventWrapperHandler implements
208             SequenceReportingEventHandler<Log4jEventWrapper> {
209         private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
210         private Sequence sequenceCallback;
211         private int counter;
212 
213         @Override
214         public void setSequenceCallback(final Sequence sequenceCallback) {
215             this.sequenceCallback = sequenceCallback;
216         }
217 
218         @Override
219         public void onEvent(final Log4jEventWrapper event, final long sequence,
220                 final boolean endOfBatch) throws Exception {
221             event.event.setEndOfBatch(endOfBatch);
222             event.loggerConfig.asyncCallAppenders(event.event);
223             event.clear();
224 
225             // notify the BatchEventProcessor that the sequence has progressed.
226             // Without this callback the sequence would not be progressed
227             // until the batch has completely finished.
228             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
229                 sequenceCallback.set(sequence);
230                 counter = 0;
231             }
232         }
233     }
234 
235     /**
236      * Increases the reference count and creates and starts a new Disruptor and
237      * associated thread if none currently exists.
238      * 
239      * @see #release()
240      */
241     synchronized static void claim() {
242         count++;
243         initDisruptor();
244     }
245 
246     /**
247      * Decreases the reference count. If the reference count reached zero, the
248      * Disruptor and its associated thread are shut down and their references
249      * set to {@code null}.
250      */
251     synchronized static void release() {
252         if (--count > 0) {
253             LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
254             return;
255         }
256         final Disruptor<Log4jEventWrapper> temp = disruptor;
257         if (temp == null) {
258             LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}.", count);
259             return; // disruptor was already shut down by another thread
260         }
261         LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
262 
263         // Must guarantee that publishing to the RingBuffer has stopped
264         // before we call disruptor.shutdown()
265         disruptor = null; // client code fails with NPE if log after stop = OK
266         temp.shutdown();
267 
268         // wait up to 10 seconds for the ringbuffer to drain
269         final RingBuffer<Log4jEventWrapper> ringBuffer = temp.getRingBuffer();
270         for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
271             if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
272                 break;
273             }
274             try {
275                 // give ringbuffer some time to drain...
276                 Thread.sleep(HALF_A_SECOND);
277             } catch (final InterruptedException e) {
278                 // ignored
279             }
280         }
281         executor.shutdown(); // finally, kill the processor thread
282         executor = null; // release reference to allow GC
283     }
284 
285     /**
286      * Initialize the threadlocal that allows us to detect Logger.log() calls 
287      * initiated from the appender thread, which may cause deadlock when the 
288      * RingBuffer is full. (LOG4J2-471)
289      */
290     private static void initThreadLocalForExecutorThread() {
291         executor.submit(new Runnable(){
292             @Override
293             public void run() {
294                 isAppenderThread.set(Boolean.TRUE);
295             }
296         });
297     }
298 
299     /**
300      * If possible, delegates the invocation to {@code callAppenders} to another
301      * thread and returns {@code true}. If this is not possible (if it detects
302      * that delegating to another thread would cause deadlock because the
303      * current call to Logger.log() originated from the appender thread and the
304      * ringbuffer is full) then this method does nothing and returns {@code false}.
305      * It is the responsibility of the caller to process the event when this
306      * method returns {@code false}.
307      * 
308      * @param event the event to delegate to another thread
309      * @return {@code true} if delegation was successful, {@code false} if the
310      *          calling thread needs to process the event itself
311      */
312     public boolean callAppendersFromAnotherThread(final LogEvent event) {
313         
314         // LOG4J2-471: prevent deadlock when RingBuffer is full and object
315         // being logged calls Logger.log() from its toString() method
316         if (isAppenderThread.get() == Boolean.TRUE //
317                 && disruptor.getRingBuffer().remainingCapacity() == 0) {
318             
319             // bypass RingBuffer and invoke Appender directly
320             return false;
321         }
322         disruptor.getRingBuffer().publishEvent(translator, event, asyncLoggerConfig);
323         return true;
324     }
325 
326     /**
327      * Creates and returns a new {@code RingBufferAdmin} that instruments the
328      * ringbuffer of this {@code AsyncLoggerConfig}.
329      * 
330      * @param contextName name of the {@code LoggerContext}
331      * @param loggerConfigName name of the logger config
332      */
333     public RingBufferAdmin createRingBufferAdmin(String contextName, String loggerConfigName) {
334         return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
335     }
336 
337 }