View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ThreadFactory;
20  import java.util.concurrent.TimeUnit;
21  
22  import org.apache.logging.log4j.Level;
23  import org.apache.logging.log4j.core.AbstractLifeCycle;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
26  import org.apache.logging.log4j.core.impl.LogEventFactory;
27  import org.apache.logging.log4j.core.impl.MutableLogEvent;
28  import org.apache.logging.log4j.core.impl.ReusableLogEventFactory;
29  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
30  import org.apache.logging.log4j.core.util.Log4jThreadFactory;
31  import org.apache.logging.log4j.core.util.Throwables;
32  import org.apache.logging.log4j.message.ReusableMessage;
33  
34  import com.lmax.disruptor.EventFactory;
35  import com.lmax.disruptor.EventTranslatorTwoArg;
36  import com.lmax.disruptor.ExceptionHandler;
37  import com.lmax.disruptor.RingBuffer;
38  import com.lmax.disruptor.Sequence;
39  import com.lmax.disruptor.SequenceReportingEventHandler;
40  import com.lmax.disruptor.TimeoutException;
41  import com.lmax.disruptor.WaitStrategy;
42  import com.lmax.disruptor.dsl.Disruptor;
43  import com.lmax.disruptor.dsl.ProducerType;
44  
45  /**
46   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
47   * <p>
48   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or
49   * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or
50   * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in
51   * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins
52   * definition file.
53   * <p>
54   * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the
55   * {@code AsyncLoggerConfig} is actually used.
56   */
57  public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate {
58  
59      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
60      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
61  
62      /**
63       * RingBuffer events contain all information necessary to perform the work in a separate thread.
64       */
65      public static class Log4jEventWrapper {
66          public Log4jEventWrapper() {
67          }
68  
69          public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) {
70              event = mutableLogEvent;
71          }
72  
73          private AsyncLoggerConfig loggerConfig;
74          private LogEvent event;
75  
76          /**
77           * Release references held by ring buffer to allow objects to be garbage-collected.
78           */
79          public void clear() {
80              loggerConfig = null;
81              if (event instanceof MutableLogEvent) {
82                  ((MutableLogEvent) event).clear();
83              } else {
84                  event = null;
85              }
86          }
87  
88          @Override
89          public String toString() {
90              return String.valueOf(event);
91          }
92      }
93  
94      /**
95       * EventHandler performs the work in a separate thread.
96       */
97      private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
98          private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
99          private Sequence sequenceCallback;
100         private int counter;
101 
102         @Override
103         public void setSequenceCallback(final Sequence sequenceCallback) {
104             this.sequenceCallback = sequenceCallback;
105         }
106 
107         @Override
108         public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
109                 throws Exception {
110             event.event.setEndOfBatch(endOfBatch);
111             event.loggerConfig.logToAsyncLoggerConfigsOnCurrentThread(event.event);
112             event.clear();
113 
114             notifyIntermediateProgress(sequence);
115         }
116 
117         /**
118          * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
119          * be progressed until the batch has completely finished.
120          */
121         private void notifyIntermediateProgress(final long sequence) {
122             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
123                 sequenceCallback.set(sequence);
124                 counter = 0;
125             }
126         }
127     }
128 
129     /**
130      * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
131      * RingBuffer.
132      */
133     private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
134         @Override
135         public Log4jEventWrapper newInstance() {
136             return new Log4jEventWrapper();
137         }
138     };
139 
140     /**
141      * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
142      * RingBuffer.
143      */
144     private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = new EventFactory<Log4jEventWrapper>() {
145         @Override
146         public Log4jEventWrapper newInstance() {
147             return new Log4jEventWrapper(new MutableLogEvent());
148         }
149     };
150 
151     /**
152      * Object responsible for passing on data to a specific RingBuffer event.
153      */
154     private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
155             new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
156 
157         @Override
158         public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
159                 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
160             ringBufferElement.event = logEvent;
161             ringBufferElement.loggerConfig = loggerConfig;
162         }
163     };
164 
165     /**
166      * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent.
167      */
168     private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR =
169             new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
170 
171         @Override
172         public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
173                 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
174             ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent);
175             ringBufferElement.loggerConfig = loggerConfig;
176         }
177     };
178 
179     private int ringBufferSize;
180     private AsyncQueueFullPolicy asyncQueueFullPolicy;
181     private Boolean mutable = Boolean.FALSE;
182 
183     private volatile Disruptor<Log4jEventWrapper> disruptor;
184     private long backgroundThreadId; // LOG4J2-471
185     private EventFactory<Log4jEventWrapper> factory;
186     private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator;
187     private volatile boolean alreadyLoggedWarning = false;
188 
189     private final Object queueFullEnqueueLock = new Object();
190 
191     public AsyncLoggerConfigDisruptor() {
192     }
193 
194     // called from AsyncLoggerConfig constructor
195     @Override
196     public void setLogEventFactory(final LogEventFactory logEventFactory) {
197         // if any AsyncLoggerConfig uses a ReusableLogEventFactory
198         // then we need to populate our ringbuffer with MutableLogEvents
199         this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory);
200     }
201 
202     /**
203      * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently
204      * exists.
205      *
206      * @see #stop()
207      */
208     @Override
209     public synchronized void start() {
210         if (disruptor != null) {
211             LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, "
212                     + "using existing object.");
213             return;
214         }
215         LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration.");
216         ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
217         final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
218 
219         final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLoggerConfig", true, Thread.NORM_PRIORITY) {
220             @Override
221             public Thread newThread(final Runnable r) {
222                 final Thread result = super.newThread(r);
223                 backgroundThreadId = result.getId();
224                 return result;
225             }
226         };
227         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
228 
229         translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR;
230         factory = mutable ? MUTABLE_FACTORY : FACTORY;
231         disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy);
232 
233         final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
234         disruptor.setDefaultExceptionHandler(errorHandler);
235 
236         final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
237         disruptor.handleEventsWith(handlers);
238 
239         LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
240                 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
241                 .getClass().getSimpleName(), errorHandler);
242         disruptor.start();
243         super.start();
244     }
245 
246     /**
247      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
248      * shut down and their references set to {@code null}.
249      */
250     @Override
251     public boolean stop(final long timeout, final TimeUnit timeUnit) {
252         final Disruptor<Log4jEventWrapper> temp = disruptor;
253         if (temp == null) {
254             LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down.");
255             return true; // disruptor was already shut down by another thread
256         }
257         setStopping();
258         LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration.");
259 
260         // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
261         disruptor = null; // client code fails with NPE if log after stop = OK
262 
263         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
264         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
265         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
266         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
267             try {
268                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
269             } catch (final InterruptedException e) { // ignored
270             }
271         }
272         try {
273             // busy-spins until all events currently in the disruptor have been processed, or timeout
274             temp.shutdown(timeout, timeUnit);
275         } catch (final TimeoutException e) {
276             LOGGER.warn("AsyncLoggerConfigDisruptor: shutdown timed out after {} {}", timeout, timeUnit);
277             temp.halt(); // give up on remaining log events, if any
278         }
279         LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor has been shut down.");
280 
281         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
282             LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
283                     DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
284         }
285         setStopped();
286         return true;
287     }
288 
289     /**
290      * Returns {@code true} if the specified disruptor still has unprocessed events.
291      */
292     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
293         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
294         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
295     }
296 
297     @Override
298     public EventRoute getEventRoute(final Level logLevel) {
299         final int remainingCapacity = remainingDisruptorCapacity();
300         if (remainingCapacity < 0) {
301             return EventRoute.DISCARD;
302         }
303         return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
304     }
305 
306     private int remainingDisruptorCapacity() {
307         final Disruptor<Log4jEventWrapper> temp = disruptor;
308         if (hasLog4jBeenShutDown(temp)) {
309             return -1;
310         }
311         return (int) temp.getRingBuffer().remainingCapacity();
312     }
313 
314     /**
315      * Returns {@code true} if the specified disruptor is null.
316      */
317     private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
318         if (aDisruptor == null) { // LOG4J2-639
319             LOGGER.warn("Ignoring log event after log4j was shut down");
320             return true;
321         }
322         return false;
323     }
324 
325     @Override
326     public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
327         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
328         try {
329             final LogEvent logEvent = prepareEvent(event);
330             enqueue(logEvent, asyncLoggerConfig);
331         } catch (final NullPointerException npe) {
332             // Note: NPE prevents us from adding a log event to the disruptor after it was shut down,
333             // which could cause the publishEvent method to hang and never return.
334             LOGGER.warn("Ignoring log event after log4j was shut down: {} [{}] {}", event.getLevel(),
335                     event.getLoggerName(), event.getMessage().getFormattedMessage()
336                             + (event.getThrown() == null ? "" : Throwables.toStringList(event.getThrown())));
337         }
338     }
339 
340     private LogEvent prepareEvent(final LogEvent event) {
341         LogEvent logEvent = ensureImmutable(event);
342         if (logEvent.getMessage() instanceof ReusableMessage) {
343             if (logEvent instanceof Log4jLogEvent) {
344                 ((Log4jLogEvent) logEvent).makeMessageImmutable();
345             } else if (logEvent instanceof MutableLogEvent) {
346                 // MutableLogEvents need to be translated into the RingBuffer by the MUTABLE_TRANSLATOR.
347                 // That translator calls MutableLogEvent.initFrom to copy the event, which will makeMessageImmutable the message.
348                 if (translator != MUTABLE_TRANSLATOR) { // should not happen...
349                     // TRANSLATOR expects an immutable LogEvent
350                     logEvent = ((MutableLogEvent) logEvent).createMemento();
351                 }
352             } else { // custom log event, with a ReusableMessage
353                 showWarningAboutCustomLogEventWithReusableMessage(logEvent);
354             }
355         } else { // message is not a ReusableMessage; makeMessageImmutable it to prevent ConcurrentModificationExceptions
356             InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); // LOG4J2-1988, LOG4J2-1914
357         }
358         return logEvent;
359     }
360 
361     private void showWarningAboutCustomLogEventWithReusableMessage(final LogEvent logEvent) {
362         if (!alreadyLoggedWarning) {
363             LOGGER.warn("Custom log event of type {} contains a mutable message of type {}." +
364                             " AsyncLoggerConfig does not know how to make an immutable copy of this message." +
365                             " This may result in ConcurrentModificationExceptions or incorrect log messages" +
366                             " if the application modifies objects in the message while" +
367                             " the background thread is writing it to the appenders.",
368                     logEvent.getClass().getName(), logEvent.getMessage().getClass().getName());
369             alreadyLoggedWarning = true;
370         }
371     }
372 
373     private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
374         if (synchronizeEnqueueWhenQueueFull()) {
375             synchronized (queueFullEnqueueLock) {
376                 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
377             }
378         } else {
379             disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
380         }
381     }
382 
383     private boolean synchronizeEnqueueWhenQueueFull() {
384         return DisruptorUtil.ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL
385                 // Background thread must never block
386                 && backgroundThreadId != Thread.currentThread().getId();
387     }
388 
389     @Override
390     public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
391         final LogEvent logEvent = prepareEvent(event);
392         return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
393     }
394 
395     private LogEvent ensureImmutable(final LogEvent event) {
396         LogEvent result = event;
397         if (event instanceof RingBufferLogEvent) {
398             // Deal with special case where both types of Async Loggers are used together:
399             // RingBufferLogEvents are created by the all-loggers-async type, but
400             // this event is also consumed by the some-loggers-async type (this class).
401             // The original event will be re-used and modified in an application thread later,
402             // so take a snapshot of it, which can be safely processed in the
403             // some-loggers-async background thread.
404             result = ((RingBufferLogEvent) event).createMemento();
405         }
406         return result;
407     }
408 
409     /*
410      * (non-Javadoc)
411      *
412      * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String,
413      * java.lang.String)
414      */
415     @Override
416     public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
417         return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
418     }
419 }