View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TransferQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.logging.log4j.core.AbstractLogEvent;
28  import org.apache.logging.log4j.core.Appender;
29  import org.apache.logging.log4j.core.Core;
30  import org.apache.logging.log4j.core.Filter;
31  import org.apache.logging.log4j.core.LogEvent;
32  import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
33  import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
34  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
35  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
36  import org.apache.logging.log4j.core.async.BlockingQueueFactory;
37  import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
38  import org.apache.logging.log4j.core.async.EventRoute;
39  import org.apache.logging.log4j.core.async.InternalAsyncUtil;
40  import org.apache.logging.log4j.core.config.AppenderControl;
41  import org.apache.logging.log4j.core.config.AppenderRef;
42  import org.apache.logging.log4j.core.config.Configuration;
43  import org.apache.logging.log4j.core.config.ConfigurationException;
44  import org.apache.logging.log4j.core.config.Property;
45  import org.apache.logging.log4j.core.config.plugins.Plugin;
46  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
47  import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
48  import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
49  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
50  import org.apache.logging.log4j.core.config.plugins.PluginElement;
51  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
52  import org.apache.logging.log4j.core.filter.AbstractFilterable;
53  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
54  import org.apache.logging.log4j.core.util.Log4jThread;
55  import org.apache.logging.log4j.spi.AbstractLogger;
56  
57  /**
58   * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
59   * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
60   * references.
61   */
62  @Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
63  public final class AsyncAppender extends AbstractAppender {
64  
65      private static final int DEFAULT_QUEUE_SIZE = 1024;
66      private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
67          private static final long serialVersionUID = -1761035149477086330L;
68      };
69  
70      private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
71  
72      private final BlockingQueue<LogEvent> queue;
73      private final int queueSize;
74      private final boolean blocking;
75      private final long shutdownTimeout;
76      private final Configuration config;
77      private final AppenderRef[] appenderRefs;
78      private final String errorRef;
79      private final boolean includeLocation;
80      private AppenderControl errorAppender;
81      private AsyncThread thread;
82      private AsyncQueueFullPolicy asyncQueueFullPolicy;
83  
84      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
85              final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
86              final long shutdownTimeout, final Configuration config, final boolean includeLocation,
87              final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) {
88          super(name, filter, null, ignoreExceptions, properties);
89          this.queue = blockingQueueFactory.create(queueSize);
90          this.queueSize = queueSize;
91          this.blocking = blocking;
92          this.shutdownTimeout = shutdownTimeout;
93          this.config = config;
94          this.appenderRefs = appenderRefs;
95          this.errorRef = errorRef;
96          this.includeLocation = includeLocation;
97      }
98  
99      @Override
100     public void start() {
101         final Map<String, Appender> map = config.getAppenders();
102         final List<AppenderControl> appenders = new ArrayList<>();
103         for (final AppenderRef appenderRef : appenderRefs) {
104             final Appender appender = map.get(appenderRef.getRef());
105             if (appender != null) {
106                 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
107             } else {
108                 LOGGER.error("No appender named {} was configured", appenderRef);
109             }
110         }
111         if (errorRef != null) {
112             final Appender appender = map.get(errorRef);
113             if (appender != null) {
114                 errorAppender = new AppenderControl(appender, null, null);
115             } else {
116                 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
117             }
118         }
119         if (appenders.size() > 0) {
120             thread = new AsyncThread(appenders, queue);
121             thread.setName("AsyncAppender-" + getName());
122         } else if (errorRef == null) {
123             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
124         }
125         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
126 
127         thread.start();
128         super.start();
129     }
130 
131     @Override
132     public boolean stop(final long timeout, final TimeUnit timeUnit) {
133         setStopping();
134         super.stop(timeout, timeUnit, false);
135         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
136         thread.shutdown();
137         try {
138             thread.join(shutdownTimeout);
139         } catch (final InterruptedException ex) {
140             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
141         }
142         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
143 
144         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
145             LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
146                 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
147         }
148         setStopped();
149         return true;
150     }
151 
152     /**
153      * Actual writing occurs here.
154      *
155      * @param logEvent The LogEvent.
156      */
157     @Override
158     public void append(final LogEvent logEvent) {
159         if (!isStarted()) {
160             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
161         }
162         final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
163         InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
164         if (!transfer(memento)) {
165             if (blocking) {
166                 if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
167                     // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
168                     AsyncQueueFullMessageUtil.logWarningToStatusLogger();
169                     logMessageInCurrentThread(logEvent);
170                 } else {
171                     // delegate to the event router (which may discard, enqueue and block, or log in current thread)
172                     final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
173                     route.logMessage(this, memento);
174                 }
175             } else {
176                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
177                 logToErrorAppenderIfNecessary(false, memento);
178             }
179         }
180     }
181 
182     private boolean transfer(final LogEvent memento) {
183         return queue instanceof TransferQueue
184             ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
185             : queue.offer(memento);
186     }
187 
188     /**
189      * FOR INTERNAL USE ONLY.
190      *
191      * @param logEvent the event to log
192      */
193     public void logMessageInCurrentThread(final LogEvent logEvent) {
194         logEvent.setEndOfBatch(queue.isEmpty());
195         final boolean appendSuccessful = thread.callAppenders(logEvent);
196         logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
197     }
198 
199     /**
200      * FOR INTERNAL USE ONLY.
201      *
202      * @param logEvent the event to log
203      */
204     public void logMessageInBackgroundThread(final LogEvent logEvent) {
205         try {
206             // wait for free slots in the queue
207             queue.put(logEvent);
208         } catch (final InterruptedException e) {
209             final boolean appendSuccessful = handleInterruptedException(logEvent);
210             logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
211         }
212     }
213 
214     // LOG4J2-1049: Some applications use Thread.interrupt() to send
215     // messages between application threads. This does not necessarily
216     // mean that the queue is full. To prevent dropping a log message,
217     // quickly try to offer the event to the queue again.
218     // (Yes, this means there is a possibility the same event is logged twice.)
219     //
220     // Finally, catching the InterruptedException means the
221     // interrupted flag has been cleared on the current thread.
222     // This may interfere with the application's expectation of
223     // being interrupted, so when we are done, we set the interrupted
224     // flag again.
225     private boolean handleInterruptedException(final LogEvent memento) {
226         final boolean appendSuccessful = queue.offer(memento);
227         if (!appendSuccessful) {
228             LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
229                 getName());
230         }
231         // set the interrupted flag again.
232         Thread.currentThread().interrupt();
233         return appendSuccessful;
234     }
235 
236     private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
237         if (!appendSuccessful && errorAppender != null) {
238             errorAppender.callAppender(logEvent);
239         }
240     }
241 
242     /**
243      * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
244      * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
245      * pre-2.7.
246      *
247      * @param appenderRefs     The Appenders to reference.
248      * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
249      * @param blocking         True if the Appender should wait when the queue is full. The default is true.
250      * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
251      *                         in the queue on shutdown. The default is zero which means to wait forever.
252      * @param size             The size of the event queue. The default is 128.
253      * @param name             The name of the Appender.
254      * @param includeLocation  whether to include location information. The default is false.
255      * @param filter           The Filter or null.
256      * @param config           The Configuration.
257      * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
258      *                         otherwise they are propagated to the caller.
259      * @return The AsyncAppender.
260      * @deprecated use {@link Builder} instead
261      */
262     @Deprecated
263     public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
264                                                final boolean blocking, final long shutdownTimeout, final int size,
265                                                final String name, final boolean includeLocation, final Filter filter,
266                                                final Configuration config, final boolean ignoreExceptions) {
267         if (name == null) {
268             LOGGER.error("No name provided for AsyncAppender");
269             return null;
270         }
271         if (appenderRefs == null) {
272             LOGGER.error("No appender references provided to AsyncAppender {}", name);
273         }
274 
275         return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
276             shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null);
277     }
278 
279     @PluginBuilderFactory
280     public static Builder newBuilder() {
281         return new Builder();
282     }
283 
284     public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B>
285             implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
286 
287         @PluginElement("AppenderRef")
288         @Required(message = "No appender references provided to AsyncAppender")
289         private AppenderRef[] appenderRefs;
290 
291         @PluginBuilderAttribute
292         @PluginAliases("error-ref")
293         private String errorRef;
294 
295         @PluginBuilderAttribute
296         private boolean blocking = true;
297 
298         @PluginBuilderAttribute
299         private long shutdownTimeout = 0L;
300 
301         @PluginBuilderAttribute
302         private int bufferSize = DEFAULT_QUEUE_SIZE;
303 
304         @PluginBuilderAttribute
305         @Required(message = "No name provided for AsyncAppender")
306         private String name;
307 
308         @PluginBuilderAttribute
309         private boolean includeLocation = false;
310 
311         @PluginConfiguration
312         private Configuration configuration;
313 
314         @PluginBuilderAttribute
315         private boolean ignoreExceptions = true;
316 
317         @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
318         private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
319 
320         public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
321             this.appenderRefs = appenderRefs;
322             return this;
323         }
324 
325         public Builder setErrorRef(final String errorRef) {
326             this.errorRef = errorRef;
327             return this;
328         }
329 
330         public Builder setBlocking(final boolean blocking) {
331             this.blocking = blocking;
332             return this;
333         }
334 
335         public Builder setShutdownTimeout(final long shutdownTimeout) {
336             this.shutdownTimeout = shutdownTimeout;
337             return this;
338         }
339 
340         public Builder setBufferSize(final int bufferSize) {
341             this.bufferSize = bufferSize;
342             return this;
343         }
344 
345         public Builder setName(final String name) {
346             this.name = name;
347             return this;
348         }
349 
350         public Builder setIncludeLocation(final boolean includeLocation) {
351             this.includeLocation = includeLocation;
352             return this;
353         }
354 
355         public Builder setConfiguration(final Configuration configuration) {
356             this.configuration = configuration;
357             return this;
358         }
359 
360         public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
361             this.ignoreExceptions = ignoreExceptions;
362             return this;
363         }
364 
365         public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
366             this.blockingQueueFactory = blockingQueueFactory;
367             return this;
368         }
369 
370         @Override
371         public AsyncAppender build() {
372             return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
373                 shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray());
374         }
375     }
376 
377     /**
378      * Thread that calls the Appenders.
379      */
380     private class AsyncThread extends Log4jThread {
381 
382         private volatile boolean shutdown = false;
383         private final List<AppenderControl> appenders;
384         private final BlockingQueue<LogEvent> queue;
385 
386         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
387             super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
388             this.appenders = appenders;
389             this.queue = queue;
390             setDaemon(true);
391         }
392 
393         @Override
394         public void run() {
395             while (!shutdown) {
396                 LogEvent event;
397                 try {
398                     event = queue.take();
399                     if (event == SHUTDOWN_LOG_EVENT) {
400                         shutdown = true;
401                         continue;
402                     }
403                 } catch (final InterruptedException ex) {
404                     break; // LOG4J2-830
405                 }
406                 event.setEndOfBatch(queue.isEmpty());
407                 final boolean success = callAppenders(event);
408                 if (!success && errorAppender != null) {
409                     try {
410                         errorAppender.callAppender(event);
411                     } catch (final Exception ex) {
412                         // Silently accept the error.
413                     }
414                 }
415             }
416             // Process any remaining items in the queue.
417             LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
418                 queue.size());
419             int count = 0;
420             int ignored = 0;
421             while (!queue.isEmpty()) {
422                 try {
423                     final LogEvent event = queue.take();
424                     if (event instanceof Log4jLogEvent) {
425                         final Log4jLogEvent logEvent = (Log4jLogEvent) event;
426                         logEvent.setEndOfBatch(queue.isEmpty());
427                         callAppenders(logEvent);
428                         count++;
429                     } else {
430                         ignored++;
431                         LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
432                     }
433                 } catch (final InterruptedException ex) {
434                     // May have been interrupted to shut down.
435                     // Here we ignore interrupts and try to process all remaining events.
436                 }
437             }
438             LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
439                 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
440         }
441 
442         /**
443          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
444          * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
445          * exceptions are silently ignored.
446          *
447          * @param event the event to forward to the registered appenders
448          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
449          */
450         boolean callAppenders(final LogEvent event) {
451             boolean success = false;
452             for (final AppenderControl control : appenders) {
453                 try {
454                     control.callAppender(event);
455                     success = true;
456                 } catch (final Exception ex) {
457                     // If no appender is successful the error appender will get it.
458                 }
459             }
460             return success;
461         }
462 
463         public void shutdown() {
464             shutdown = true;
465             if (queue.isEmpty()) {
466                 queue.offer(SHUTDOWN_LOG_EVENT);
467             }
468             if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
469                 this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
470             }
471         }
472     }
473 
474     /**
475      * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
476      *
477      * @return the names of the sink appenders
478      */
479     public String[] getAppenderRefStrings() {
480         final String[] result = new String[appenderRefs.length];
481         for (int i = 0; i < result.length; i++) {
482             result[i] = appenderRefs[i].getRef();
483         }
484         return result;
485     }
486 
487     /**
488      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
489      * the class and method where the logging call was made.
490      *
491      * @return {@code true} if location is included with every event, {@code false} otherwise
492      */
493     public boolean isIncludeLocation() {
494         return includeLocation;
495     }
496 
497     /**
498      * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
499      * dropped when the queue is full.
500      *
501      * @return whether this AsyncAppender will block or drop events when the queue is full.
502      */
503     public boolean isBlocking() {
504         return blocking;
505     }
506 
507     /**
508      * Returns the name of the appender that any errors are logged to or {@code null}.
509      *
510      * @return the name of the appender that any errors are logged to or {@code null}
511      */
512     public String getErrorRef() {
513         return errorRef;
514     }
515 
516     public int getQueueCapacity() {
517         return queueSize;
518     }
519 
520     public int getQueueRemainingCapacity() {
521         return queue.remainingCapacity();
522     }
523 
524     /**
525      * Returns the number of elements in the queue.
526      *
527      * @return the number of elements in the queue.
528      * @since 2.11.1
529      */
530     public int getQueueSize() {
531         return queue.size();
532     }
533 }