View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.Serializable;
20  import java.util.ArrayList;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.concurrent.ArrayBlockingQueue;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.logging.log4j.core.Appender;
28  import org.apache.logging.log4j.core.Filter;
29  import org.apache.logging.log4j.core.LogEvent;
30  import org.apache.logging.log4j.core.async.RingBufferLogEvent;
31  import org.apache.logging.log4j.core.config.AppenderControl;
32  import org.apache.logging.log4j.core.config.AppenderRef;
33  import org.apache.logging.log4j.core.config.Configuration;
34  import org.apache.logging.log4j.core.config.ConfigurationException;
35  import org.apache.logging.log4j.core.config.plugins.Plugin;
36  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
37  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
38  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
39  import org.apache.logging.log4j.core.config.plugins.PluginElement;
40  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
41  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
42  
43  /**
44   * Appends to one or more Appenders asynchronously.  You can configure an
45   * AsyncAppender with one or more Appenders and an Appender to append to if the
46   * queue is full. The AsyncAppender does not allow a filter to be specified on
47   * the Appender references.
48   */
49  @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
50  public final class AsyncAppender extends AbstractAppender {
51  
52      private static final long serialVersionUID = 1L;
53      private static final int DEFAULT_QUEUE_SIZE = 128;
54      private static final String SHUTDOWN = "Shutdown";
55  
56      private final BlockingQueue<Serializable> queue;
57      private final int queueSize;
58      private final boolean blocking;
59      private final Configuration config;
60      private final AppenderRef[] appenderRefs;
61      private final String errorRef;
62      private final boolean includeLocation;
63      private AppenderControl errorAppender;
64      private AsyncThread thread;
65      private static final AtomicLong threadSequence = new AtomicLong(1);
66      private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
67  
68  
69      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
70                             final String errorRef, final int queueSize, final boolean blocking,
71                             final boolean ignoreExceptions, final Configuration config,
72                             final boolean includeLocation) {
73          super(name, filter, null, ignoreExceptions);
74          this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
75          this.queueSize = queueSize;
76          this.blocking = blocking;
77          this.config = config;
78          this.appenderRefs = appenderRefs;
79          this.errorRef = errorRef;
80          this.includeLocation = includeLocation;
81      }
82  
83      @Override
84      public void start() {
85          final Map<String, Appender> map = config.getAppenders();
86          final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
87          for (final AppenderRef appenderRef : appenderRefs) {
88              if (map.containsKey(appenderRef.getRef())) {
89                  appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
90                      appenderRef.getFilter()));
91              } else {
92                  LOGGER.error("No appender named {} was configured", appenderRef);
93              }
94          }
95          if (errorRef != null) {
96              if (map.containsKey(errorRef)) {
97                  errorAppender = new AppenderControl(map.get(errorRef), null, null);
98              } else {
99                  LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
100             }
101         }
102         if (appenders.size() > 0) {
103             thread = new AsyncThread(appenders, queue);
104             thread.setName("AsyncAppender-" + getName());
105         } else if (errorRef == null) {
106             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
107         }
108 
109         thread.start();
110         super.start();
111     }
112 
113     @Override
114     public void stop() {
115         super.stop();
116         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
117         thread.shutdown();
118         try {
119             thread.join();
120         } catch (final InterruptedException ex) {
121             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
122         }
123         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
124     }
125 
126     /**
127      * Actual writing occurs here.
128      * 
129      * @param logEvent
130      *        The LogEvent.
131      */
132     @Override
133     public void append(LogEvent logEvent) {
134         if (!isStarted()) {
135             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
136         }
137         if (!(logEvent instanceof Log4jLogEvent)) {
138             if (!(logEvent instanceof RingBufferLogEvent)) {
139                 return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents
140             }
141             logEvent = ((RingBufferLogEvent) logEvent).createMemento();
142         }
143         logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
144         final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
145         boolean appendSuccessful = false;
146         if (blocking) {
147             if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
148                 // LOG4J2-485: avoid deadlock that would result from trying
149                 // to add to a full queue from appender thread
150                 coreEvent.setEndOfBatch(false); // queue is definitely not empty!
151                 appendSuccessful = thread.callAppenders(coreEvent);
152             } else {
153                 try {
154                     // wait for free slots in the queue
155                     queue.put(Log4jLogEvent.serialize(coreEvent, includeLocation));
156                     appendSuccessful = true;
157                 } catch (final InterruptedException e) {
158                     LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
159                             getName());
160                 }
161             }
162         } else {
163             appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
164             if (!appendSuccessful) {
165                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
166             }
167         }
168         if (!appendSuccessful && errorAppender != null) {
169             errorAppender.callAppender(coreEvent);
170         }
171     }
172 
173     /**
174      * Create an AsyncAppender.
175      * @param appenderRefs The Appenders to reference.
176      * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
177      * @param blocking True if the Appender should wait when the queue is full. The default is true.
178      * @param size The size of the event queue. The default is 128.
179      * @param name The name of the Appender.
180      * @param includeLocation whether to include location information. The default is false.
181      * @param filter The Filter or null.
182      * @param config The Configuration.
183      * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
184      *                         otherwise they are propagated to the caller.
185      * @return The AsyncAppender.
186      */
187     @PluginFactory
188     public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
189             @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
190             @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
191             @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
192             @PluginAttribute("name") final String name,
193             @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
194             @PluginElement("Filter") final Filter filter,
195             @PluginConfiguration final Configuration config,
196             @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
197         if (name == null) {
198             LOGGER.error("No name provided for AsyncAppender");
199             return null;
200         }
201         if (appenderRefs == null) {
202             LOGGER.error("No appender references provided to AsyncAppender {}", name);
203         }
204 
205         return new AsyncAppender(name, filter, appenderRefs, errorRef,
206                 size, blocking, ignoreExceptions, config, includeLocation);
207     }
208 
209     /**
210      * Thread that calls the Appenders.
211      */
212     private class AsyncThread extends Thread {
213 
214         private volatile boolean shutdown = false;
215         private final List<AppenderControl> appenders;
216         private final BlockingQueue<Serializable> queue;
217 
218         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
219             this.appenders = appenders;
220             this.queue = queue;
221             setDaemon(true);
222             setName("AsyncAppenderThread" + threadSequence.getAndIncrement());
223         }
224 
225         @Override
226         public void run() {
227             isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
228             while (!shutdown) {
229                 Serializable s;
230                 try {
231                     s = queue.take();
232                     if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
233                         shutdown = true;
234                         continue;
235                     }
236                 } catch (final InterruptedException ex) {
237                     break; // LOG4J2-830
238                 }
239                 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
240                 event.setEndOfBatch(queue.isEmpty());
241                 final boolean success = callAppenders(event);
242                 if (!success && errorAppender != null) {
243                     try {
244                         errorAppender.callAppender(event);
245                     } catch (final Exception ex) {
246                         // Silently accept the error.
247                     }
248                 }
249             }
250             // Process any remaining items in the queue.
251             LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
252                     queue.size());
253             int count= 0;
254             int ignored = 0;
255             while (!queue.isEmpty()) {
256                 try {
257                     final Serializable s = queue.take();
258                     if (Log4jLogEvent.canDeserialize(s)) {
259                         final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
260                         event.setEndOfBatch(queue.isEmpty());
261                         callAppenders(event);
262                         count++;
263                     } else {
264                         ignored++;
265                         LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
266                     }
267                 } catch (final InterruptedException ex) {
268                     // May have been interrupted to shut down.
269                     // Here we ignore interrupts and try to process all remaining events.
270                 }
271             }
272             LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " +
273             		"Processed {} and ignored {} events since shutdown started.",
274             		queue.size(), count, ignored);
275         }
276 
277         /**
278          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on
279          * all registered {@code AppenderControl} objects, and returns {@code true}
280          * if at least one appender call was successful, {@code false} otherwise.
281          * Any exceptions are silently ignored.
282          *
283          * @param event the event to forward to the registered appenders
284          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
285          */
286         boolean callAppenders(final Log4jLogEvent event) {
287             boolean success = false;
288             for (final AppenderControl control : appenders) {
289                 try {
290                     control.callAppender(event);
291                     success = true;
292                 } catch (final Exception ex) {
293                     // If no appender is successful the error appender will get it.
294                 }
295             }
296             return success;
297         }
298 
299         public void shutdown() {
300             shutdown = true;
301             if (queue.isEmpty()) {
302                 queue.offer(SHUTDOWN);
303             }
304         }
305     }
306 
307     /**
308      * Returns the names of the appenders that this asyncAppender delegates to
309      * as an array of Strings.
310      * @return the names of the sink appenders
311      */
312     public String[] getAppenderRefStrings() {
313         final String[] result = new String[appenderRefs.length];
314         for (int i = 0; i < result.length; i++) {
315             result[i] = appenderRefs[i].getRef();
316         }
317         return result;
318     }
319 
320     /**
321      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with
322      * every log event to determine the class and method where the logging call
323      * was made.
324      * @return {@code true} if location is included with every event, {@code false} otherwise
325      */
326     public boolean isIncludeLocation() {
327         return includeLocation;
328     }
329 
330     /**
331      * Returns {@code true} if this AsyncAppender will block when the queue is full,
332      * or {@code false} if events are dropped when the queue is full.
333      * @return whether this AsyncAppender will block or drop events when the queue is full.
334      */
335     public boolean isBlocking() {
336         return blocking;
337     }
338 
339     /**
340      * Returns the name of the appender that any errors are logged to or {@code null}.
341      * @return the name of the appender that any errors are logged to or {@code null}
342      */
343     public String getErrorRef() {
344         return errorRef;
345     }
346 
347     public int getQueueCapacity() {
348         return queueSize;
349     }
350 
351     public int getQueueRemainingCapacity() {
352         return queue.remainingCapacity();
353     }
354 }