View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TransferQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.logging.log4j.core.AbstractLogEvent;
28  import org.apache.logging.log4j.core.Appender;
29  import org.apache.logging.log4j.core.Core;
30  import org.apache.logging.log4j.core.Filter;
31  import org.apache.logging.log4j.core.LogEvent;
32  import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
33  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
34  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
35  import org.apache.logging.log4j.core.async.BlockingQueueFactory;
36  import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
37  import org.apache.logging.log4j.core.async.EventRoute;
38  import org.apache.logging.log4j.core.async.InternalAsyncUtil;
39  import org.apache.logging.log4j.core.config.AppenderControl;
40  import org.apache.logging.log4j.core.config.AppenderRef;
41  import org.apache.logging.log4j.core.config.Configuration;
42  import org.apache.logging.log4j.core.config.ConfigurationException;
43  import org.apache.logging.log4j.core.config.plugins.Plugin;
44  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
45  import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
46  import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
47  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
48  import org.apache.logging.log4j.core.config.plugins.PluginElement;
49  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
50  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
51  import org.apache.logging.log4j.core.util.Log4jThread;
52  
53  /**
54   * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
55   * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
56   * references.
57   */
58  @Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
59  public final class AsyncAppender extends AbstractAppender {
60  
61      private static final int DEFAULT_QUEUE_SIZE = 1024;
62      private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
63      };
64  
65      private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
66  
67      private final BlockingQueue<LogEvent> queue;
68      private final int queueSize;
69      private final boolean blocking;
70      private final long shutdownTimeout;
71      private final Configuration config;
72      private final AppenderRef[] appenderRefs;
73      private final String errorRef;
74      private final boolean includeLocation;
75      private AppenderControl errorAppender;
76      private AsyncThread thread;
77      private AsyncQueueFullPolicy asyncQueueFullPolicy;
78  
79      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
80                            final String errorRef, final int queueSize, final boolean blocking,
81                            final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
82                            final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
83          super(name, filter, null, ignoreExceptions);
84          this.queue = blockingQueueFactory.create(queueSize);
85          this.queueSize = queueSize;
86          this.blocking = blocking;
87          this.shutdownTimeout = shutdownTimeout;
88          this.config = config;
89          this.appenderRefs = appenderRefs;
90          this.errorRef = errorRef;
91          this.includeLocation = includeLocation;
92      }
93  
94      @Override
95      public void start() {
96          final Map<String, Appender> map = config.getAppenders();
97          final List<AppenderControl> appenders = new ArrayList<>();
98          for (final AppenderRef appenderRef : appenderRefs) {
99              final Appender appender = map.get(appenderRef.getRef());
100             if (appender != null) {
101                 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
102             } else {
103                 LOGGER.error("No appender named {} was configured", appenderRef);
104             }
105         }
106         if (errorRef != null) {
107             final Appender appender = map.get(errorRef);
108             if (appender != null) {
109                 errorAppender = new AppenderControl(appender, null, null);
110             } else {
111                 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
112             }
113         }
114         if (appenders.size() > 0) {
115             thread = new AsyncThread(appenders, queue);
116             thread.setName("AsyncAppender-" + getName());
117         } else if (errorRef == null) {
118             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
119         }
120         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
121 
122         thread.start();
123         super.start();
124     }
125 
126     @Override
127     public boolean stop(final long timeout, final TimeUnit timeUnit) {
128         setStopping();
129         super.stop(timeout, timeUnit, false);
130         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
131         thread.shutdown();
132         try {
133             thread.join(shutdownTimeout);
134         } catch (final InterruptedException ex) {
135             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
136         }
137         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
138 
139         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
140             LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
141                 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
142         }
143         setStopped();
144         return true;
145     }
146 
147     /**
148      * Actual writing occurs here.
149      *
150      * @param logEvent The LogEvent.
151      */
152     @Override
153     public void append(final LogEvent logEvent) {
154         if (!isStarted()) {
155             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
156         }
157         final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
158         InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
159         if (!transfer(memento)) {
160             if (blocking) {
161                 // delegate to the event router (which may discard, enqueue and block, or log in current thread)
162                 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
163                 route.logMessage(this, memento);
164             } else {
165                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
166                 logToErrorAppenderIfNecessary(false, memento);
167             }
168         }
169     }
170 
171     private boolean transfer(final LogEvent memento) {
172         return queue instanceof TransferQueue
173             ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
174             : queue.offer(memento);
175     }
176 
177     /**
178      * FOR INTERNAL USE ONLY.
179      *
180      * @param logEvent the event to log
181      */
182     public void logMessageInCurrentThread(final LogEvent logEvent) {
183         logEvent.setEndOfBatch(queue.isEmpty());
184         final boolean appendSuccessful = thread.callAppenders(logEvent);
185         logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
186     }
187 
188     /**
189      * FOR INTERNAL USE ONLY.
190      *
191      * @param logEvent the event to log
192      */
193     public void logMessageInBackgroundThread(final LogEvent logEvent) {
194         try {
195             // wait for free slots in the queue
196             queue.put(logEvent);
197         } catch (final InterruptedException e) {
198             final boolean appendSuccessful = handleInterruptedException(logEvent);
199             logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
200         }
201     }
202 
203     // LOG4J2-1049: Some applications use Thread.interrupt() to send
204     // messages between application threads. This does not necessarily
205     // mean that the queue is full. To prevent dropping a log message,
206     // quickly try to offer the event to the queue again.
207     // (Yes, this means there is a possibility the same event is logged twice.)
208     //
209     // Finally, catching the InterruptedException means the
210     // interrupted flag has been cleared on the current thread.
211     // This may interfere with the application's expectation of
212     // being interrupted, so when we are done, we set the interrupted
213     // flag again.
214     private boolean handleInterruptedException(final LogEvent memento) {
215         final boolean appendSuccessful = queue.offer(memento);
216         if (!appendSuccessful) {
217             LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
218                 getName());
219         }
220         // set the interrupted flag again.
221         Thread.currentThread().interrupt();
222         return appendSuccessful;
223     }
224 
225     private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
226         if (!appendSuccessful && errorAppender != null) {
227             errorAppender.callAppender(logEvent);
228         }
229     }
230 
231     /**
232      * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
233      * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
234      * pre-2.7.
235      *
236      * @param appenderRefs     The Appenders to reference.
237      * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
238      * @param blocking         True if the Appender should wait when the queue is full. The default is true.
239      * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
240      *                         in the queue on shutdown. The default is zero which means to wait forever.
241      * @param size             The size of the event queue. The default is 128.
242      * @param name             The name of the Appender.
243      * @param includeLocation  whether to include location information. The default is false.
244      * @param filter           The Filter or null.
245      * @param config           The Configuration.
246      * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
247      *                         otherwise they are propagated to the caller.
248      * @return The AsyncAppender.
249      * @deprecated use {@link Builder} instead
250      */
251     @Deprecated
252     public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
253                                                final boolean blocking, final long shutdownTimeout, final int size,
254                                                final String name, final boolean includeLocation, final Filter filter,
255                                                final Configuration config, final boolean ignoreExceptions) {
256         if (name == null) {
257             LOGGER.error("No name provided for AsyncAppender");
258             return null;
259         }
260         if (appenderRefs == null) {
261             LOGGER.error("No appender references provided to AsyncAppender {}", name);
262         }
263 
264         return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
265             shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
266     }
267 
268     @PluginBuilderFactory
269     public static Builder newBuilder() {
270         return new Builder();
271     }
272 
273     public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
274 
275         @PluginElement("AppenderRef")
276         @Required(message = "No appender references provided to AsyncAppender")
277         private AppenderRef[] appenderRefs;
278 
279         @PluginBuilderAttribute
280         @PluginAliases("error-ref")
281         private String errorRef;
282 
283         @PluginBuilderAttribute
284         private boolean blocking = true;
285 
286         @PluginBuilderAttribute
287         private long shutdownTimeout = 0L;
288 
289         @PluginBuilderAttribute
290         private int bufferSize = DEFAULT_QUEUE_SIZE;
291 
292         @PluginBuilderAttribute
293         @Required(message = "No name provided for AsyncAppender")
294         private String name;
295 
296         @PluginBuilderAttribute
297         private boolean includeLocation = false;
298 
299         @PluginElement("Filter")
300         private Filter filter;
301 
302         @PluginConfiguration
303         private Configuration configuration;
304 
305         @PluginBuilderAttribute
306         private boolean ignoreExceptions = true;
307 
308         @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
309         private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
310 
311         public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
312             this.appenderRefs = appenderRefs;
313             return this;
314         }
315 
316         public Builder setErrorRef(final String errorRef) {
317             this.errorRef = errorRef;
318             return this;
319         }
320 
321         public Builder setBlocking(final boolean blocking) {
322             this.blocking = blocking;
323             return this;
324         }
325 
326         public Builder setShutdownTimeout(final long shutdownTimeout) {
327             this.shutdownTimeout = shutdownTimeout;
328             return this;
329         }
330 
331         public Builder setBufferSize(final int bufferSize) {
332             this.bufferSize = bufferSize;
333             return this;
334         }
335 
336         public Builder setName(final String name) {
337             this.name = name;
338             return this;
339         }
340 
341         public Builder setIncludeLocation(final boolean includeLocation) {
342             this.includeLocation = includeLocation;
343             return this;
344         }
345 
346         public Builder setFilter(final Filter filter) {
347             this.filter = filter;
348             return this;
349         }
350 
351         public Builder setConfiguration(final Configuration configuration) {
352             this.configuration = configuration;
353             return this;
354         }
355 
356         public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
357             this.ignoreExceptions = ignoreExceptions;
358             return this;
359         }
360 
361         public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
362             this.blockingQueueFactory = blockingQueueFactory;
363             return this;
364         }
365 
366         @Override
367         public AsyncAppender build() {
368             return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
369                 shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
370         }
371     }
372 
373     /**
374      * Thread that calls the Appenders.
375      */
376     private class AsyncThread extends Log4jThread {
377 
378         private volatile boolean shutdown = false;
379         private final List<AppenderControl> appenders;
380         private final BlockingQueue<LogEvent> queue;
381 
382         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
383             super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
384             this.appenders = appenders;
385             this.queue = queue;
386             setDaemon(true);
387         }
388 
389         @Override
390         public void run() {
391             while (!shutdown) {
392                 LogEvent event;
393                 try {
394                     event = queue.take();
395                     if (event == SHUTDOWN_LOG_EVENT) {
396                         shutdown = true;
397                         continue;
398                     }
399                 } catch (final InterruptedException ex) {
400                     break; // LOG4J2-830
401                 }
402                 event.setEndOfBatch(queue.isEmpty());
403                 final boolean success = callAppenders(event);
404                 if (!success && errorAppender != null) {
405                     try {
406                         errorAppender.callAppender(event);
407                     } catch (final Exception ex) {
408                         // Silently accept the error.
409                     }
410                 }
411             }
412             // Process any remaining items in the queue.
413             LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
414                 queue.size());
415             int count = 0;
416             int ignored = 0;
417             while (!queue.isEmpty()) {
418                 try {
419                     final LogEvent event = queue.take();
420                     if (event instanceof Log4jLogEvent) {
421                         final Log4jLogEvent logEvent = (Log4jLogEvent) event;
422                         logEvent.setEndOfBatch(queue.isEmpty());
423                         callAppenders(logEvent);
424                         count++;
425                     } else {
426                         ignored++;
427                         LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
428                     }
429                 } catch (final InterruptedException ex) {
430                     // May have been interrupted to shut down.
431                     // Here we ignore interrupts and try to process all remaining events.
432                 }
433             }
434             LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
435                 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
436         }
437 
438         /**
439          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
440          * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
441          * exceptions are silently ignored.
442          *
443          * @param event the event to forward to the registered appenders
444          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
445          */
446         boolean callAppenders(final LogEvent event) {
447             boolean success = false;
448             for (final AppenderControl control : appenders) {
449                 try {
450                     control.callAppender(event);
451                     success = true;
452                 } catch (final Exception ex) {
453                     // If no appender is successful the error appender will get it.
454                 }
455             }
456             return success;
457         }
458 
459         public void shutdown() {
460             shutdown = true;
461             if (queue.isEmpty()) {
462                 queue.offer(SHUTDOWN_LOG_EVENT);
463             }
464             if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
465                 this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
466             }
467         }
468     }
469 
470     /**
471      * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
472      *
473      * @return the names of the sink appenders
474      */
475     public String[] getAppenderRefStrings() {
476         final String[] result = new String[appenderRefs.length];
477         for (int i = 0; i < result.length; i++) {
478             result[i] = appenderRefs[i].getRef();
479         }
480         return result;
481     }
482 
483     /**
484      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
485      * the class and method where the logging call was made.
486      *
487      * @return {@code true} if location is included with every event, {@code false} otherwise
488      */
489     public boolean isIncludeLocation() {
490         return includeLocation;
491     }
492 
493     /**
494      * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
495      * dropped when the queue is full.
496      *
497      * @return whether this AsyncAppender will block or drop events when the queue is full.
498      */
499     public boolean isBlocking() {
500         return blocking;
501     }
502 
503     /**
504      * Returns the name of the appender that any errors are logged to or {@code null}.
505      *
506      * @return the name of the appender that any errors are logged to or {@code null}
507      */
508     public String getErrorRef() {
509         return errorRef;
510     }
511 
512     public int getQueueCapacity() {
513         return queueSize;
514     }
515 
516     public int getQueueRemainingCapacity() {
517         return queue.remainingCapacity();
518     }
519 }