View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.async;
19  
20  import java.util.concurrent.ThreadFactory;
21  import java.util.concurrent.TimeUnit;
22  
23  import com.lmax.disruptor.EventTranslatorVararg;
24  import org.apache.logging.log4j.Level;
25  import org.apache.logging.log4j.Marker;
26  import org.apache.logging.log4j.core.AbstractLifeCycle;
27  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
28  import org.apache.logging.log4j.core.util.Log4jThreadFactory;
29  import org.apache.logging.log4j.core.util.Throwables;
30  
31  import com.lmax.disruptor.ExceptionHandler;
32  import com.lmax.disruptor.RingBuffer;
33  import com.lmax.disruptor.TimeoutException;
34  import com.lmax.disruptor.WaitStrategy;
35  import com.lmax.disruptor.dsl.Disruptor;
36  import com.lmax.disruptor.dsl.ProducerType;
37  import org.apache.logging.log4j.message.Message;
38  
39  /**
40   * Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and
41   * works with its associated AsyncLoggerContext to synchronize the life cycle of the Disruptor and its thread with the
42   * life cycle of the context. The AsyncLoggerDisruptor of the context is shared by all AsyncLogger objects created by
43   * that AsyncLoggerContext.
44   */
45  class AsyncLoggerDisruptor extends AbstractLifeCycle {
46      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
47      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
48  
49      private final Object queueFullEnqueueLock = new Object();
50  
51      private volatile Disruptor<RingBufferLogEvent> disruptor;
52      private String contextName;
53  
54      private boolean useThreadLocalTranslator = true;
55      private long backgroundThreadId;
56      private AsyncQueueFullPolicy asyncQueueFullPolicy;
57      private int ringBufferSize;
58  
59      AsyncLoggerDisruptor(final String contextName) {
60          this.contextName = contextName;
61      }
62  
63      public String getContextName() {
64          return contextName;
65      }
66  
67      public void setContextName(final String name) {
68          contextName = name;
69      }
70  
71      Disruptor<RingBufferLogEvent> getDisruptor() {
72          return disruptor;
73      }
74  
75      /**
76       * Creates and starts a new Disruptor and associated thread if none currently exists.
77       *
78       * @see #stop()
79       */
80      @Override
81      public synchronized void start() {
82          if (disruptor != null) {
83              LOGGER.trace(
84                      "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
85                      contextName);
86              return;
87          }
88          LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
89          ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
90          final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
91  
92          final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLogger[" + contextName + "]", true, Thread.NORM_PRIORITY) {
93              @Override
94              public Thread newThread(final Runnable r) {
95                  final Thread result = super.newThread(r);
96                  backgroundThreadId = result.getId();
97                  return result;
98              }
99          };
100         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
101 
102         disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, threadFactory, ProducerType.MULTI,
103                 waitStrategy);
104 
105         final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
106         disruptor.setDefaultExceptionHandler(errorHandler);
107 
108         final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
109         disruptor.handleEventsWith(handlers);
110 
111         LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
112                 + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
113                 .getClass().getSimpleName(), errorHandler);
114         disruptor.start();
115 
116         LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
117                 : "vararg");
118         super.start();
119     }
120 
121     /**
122      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
123      * shut down and their references set to {@code null}.
124      */
125     @Override
126     public boolean stop(final long timeout, final TimeUnit timeUnit) {
127         final Disruptor<RingBufferLogEvent> temp = getDisruptor();
128         if (temp == null) {
129             LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
130             return true; // disruptor was already shut down by another thread
131         }
132         setStopping();
133         LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
134 
135         // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
136         disruptor = null; // client code fails with NPE if log after stop. This is by design.
137 
138         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
139         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
140         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
141         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
142             try {
143                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
144             } catch (final InterruptedException e) { // ignored
145             }
146         }
147         try {
148             // busy-spins until all events currently in the disruptor have been processed, or timeout
149             temp.shutdown(timeout, timeUnit);
150         } catch (final TimeoutException e) {
151             LOGGER.warn("[{}] AsyncLoggerDisruptor: shutdown timed out after {} {}", contextName, timeout, timeUnit);
152             temp.halt(); // give up on remaining log events, if any
153         }
154 
155         LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor has been shut down.", contextName);
156 
157         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
158             LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
159                     DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
160         }
161         setStopped();
162         return true;
163     }
164 
165     /**
166      * Returns {@code true} if the specified disruptor still has unprocessed events.
167      */
168     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
169         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
170         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
171     }
172 
173     /**
174      * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}.
175      *
176      * @param jmxContextName name of the {@code AsyncLoggerContext}
177      * @return a new {@code RingBufferAdmin} that instruments the ringbuffer
178      */
179     public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
180         final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
181         return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
182     }
183 
184     EventRoute getEventRoute(final Level logLevel) {
185         final int remainingCapacity = remainingDisruptorCapacity();
186         if (remainingCapacity < 0) {
187             return EventRoute.DISCARD;
188         }
189         return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
190     }
191 
192     private int remainingDisruptorCapacity() {
193         final Disruptor<RingBufferLogEvent> temp = disruptor;
194         if (hasLog4jBeenShutDown(temp)) {
195             return -1;
196         }
197         return (int) temp.getRingBuffer().remainingCapacity();
198     }
199         /**
200          * Returns {@code true} if the specified disruptor is null.
201          */
202     private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) {
203         if (aDisruptor == null) { // LOG4J2-639
204             LOGGER.warn("Ignoring log event after log4j was shut down");
205             return true;
206         }
207         return false;
208     }
209 
210     boolean tryPublish(final RingBufferLogEventTranslator translator) {
211         try {
212             // Note: we deliberately access the volatile disruptor field afresh here.
213             // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
214             // was shut down, which could cause the publishEvent method to hang and never return.
215             return disruptor.getRingBuffer().tryPublishEvent(translator);
216         } catch (final NullPointerException npe) {
217             // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
218             logWarningOnNpeFromDisruptorPublish(translator);
219             return false;
220         }
221     }
222 
223     void enqueueLogMessageWhenQueueFull(final RingBufferLogEventTranslator translator) {
224         try {
225             // Note: we deliberately access the volatile disruptor field afresh here.
226             // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
227             // was shut down, which could cause the publishEvent method to hang and never return.
228             if (synchronizeEnqueueWhenQueueFull()) {
229                 synchronized (queueFullEnqueueLock) {
230                     disruptor.publishEvent(translator);
231                 }
232             } else {
233                 disruptor.publishEvent(translator);
234             }
235         } catch (final NullPointerException npe) {
236             // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
237             logWarningOnNpeFromDisruptorPublish(translator);
238         }
239     }
240 
241     void enqueueLogMessageWhenQueueFull(
242             final EventTranslatorVararg<RingBufferLogEvent> translator,
243             final AsyncLogger asyncLogger,
244             final StackTraceElement location,
245             final String fqcn,
246             final Level level,
247             final Marker marker,
248             final Message msg,
249             final Throwable thrown) {
250         try {
251             // Note: we deliberately access the volatile disruptor field afresh here.
252             // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
253             // was shut down, which could cause the publishEvent method to hang and never return.
254             if (synchronizeEnqueueWhenQueueFull()) {
255                 synchronized (queueFullEnqueueLock) {
256                     disruptor.getRingBuffer().publishEvent(translator,
257                             asyncLogger, // asyncLogger: 0
258                             location, // location: 1
259                             fqcn, // 2
260                             level, // 3
261                             marker, // 4
262                             msg, // 5
263                             thrown); // 6
264                 }
265             } else {
266                 disruptor.getRingBuffer().publishEvent(translator,
267                         asyncLogger, // asyncLogger: 0
268                         location, // location: 1
269                         fqcn, // 2
270                         level, // 3
271                         marker, // 4
272                         msg, // 5
273                         thrown); // 6
274             }
275         } catch (final NullPointerException npe) {
276             // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
277             logWarningOnNpeFromDisruptorPublish(level, fqcn, msg, thrown);
278         }
279     }
280 
281     private boolean synchronizeEnqueueWhenQueueFull() {
282         return DisruptorUtil.ASYNC_LOGGER_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL
283                 // Background thread must never block
284                 && backgroundThreadId != Thread.currentThread().getId();
285     }
286 
287     private void logWarningOnNpeFromDisruptorPublish(final RingBufferLogEventTranslator translator) {
288         logWarningOnNpeFromDisruptorPublish(
289                 translator.level, translator.loggerName, translator.message, translator.thrown);
290     }
291 
292     private void logWarningOnNpeFromDisruptorPublish(
293             final Level level, final String fqcn, final Message msg, final Throwable thrown) {
294         LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}{}", contextName,
295                 level, fqcn, msg.getFormattedMessage(), thrown == null ? "" : Throwables.toStringList(thrown));
296     }
297 
298     /**
299      * Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.
300      *
301      * @return whether AsyncLoggers are allowed to use ThreadLocal objects
302      * @since 2.5
303      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
304      */
305     public boolean isUseThreadLocals() {
306         return useThreadLocalTranslator;
307     }
308 
309     /**
310      * Signals this AsyncLoggerDisruptor whether it is allowed to store non-JDK classes in ThreadLocal objects for
311      * efficiency.
312      * <p>
313      * This property may be modified after the {@link #start()} method has been called.
314      * </p>
315      *
316      * @param allow whether AsyncLoggers are allowed to use ThreadLocal objects
317      * @since 2.5
318      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
319      */
320     public void setUseThreadLocals(final boolean allow) {
321         useThreadLocalTranslator = allow;
322         LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName,
323                 useThreadLocalTranslator ? "threadlocal" : "vararg");
324     }
325 }