View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.Serializable;
20  import java.util.ArrayList;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.concurrent.ArrayBlockingQueue;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.logging.log4j.core.Appender;
28  import org.apache.logging.log4j.core.Filter;
29  import org.apache.logging.log4j.core.LogEvent;
30  import org.apache.logging.log4j.core.config.AppenderControl;
31  import org.apache.logging.log4j.core.config.AppenderRef;
32  import org.apache.logging.log4j.core.config.Configuration;
33  import org.apache.logging.log4j.core.config.ConfigurationException;
34  import org.apache.logging.log4j.core.config.plugins.Plugin;
35  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
36  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
37  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
38  import org.apache.logging.log4j.core.config.plugins.PluginElement;
39  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
40  import org.apache.logging.log4j.core.helpers.Booleans;
41  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
42  
43  /**
44   * Appends to one or more Appenders asynchronously.  You can configure an
45   * AsyncAppender with one or more Appenders and an Appender to append to if the
46   * queue is full. The AsyncAppender does not allow a filter to be specified on
47   * the Appender references.
48   */
49  @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
50  public final class AsyncAppender extends AbstractAppender {
51  
52      private static final int DEFAULT_QUEUE_SIZE = 128;
53      private static final String SHUTDOWN = "Shutdown";
54  
55      private final BlockingQueue<Serializable> queue;
56      private final int queueSize;
57      private final boolean blocking;
58      private final Configuration config;
59      private final AppenderRef[] appenderRefs;
60      private final String errorRef;
61      private final boolean includeLocation;
62      private AppenderControl errorAppender;
63      private AsyncThread thread;
64      private static final AtomicLong threadSequence = new AtomicLong(1);
65      private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
66  
67  
68      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
69                             final String errorRef, final int queueSize, final boolean blocking,
70                             final boolean ignoreExceptions, final Configuration config,
71                             final boolean includeLocation) {
72          super(name, filter, null, ignoreExceptions);
73          this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
74          this.queueSize = queueSize;
75          this.blocking = blocking;
76          this.config = config;
77          this.appenderRefs = appenderRefs;
78          this.errorRef = errorRef;
79          this.includeLocation = includeLocation;
80      }
81  
82      @Override
83      public void start() {
84          final Map<String, Appender> map = config.getAppenders();
85          final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
86          for (final AppenderRef appenderRef : appenderRefs) {
87              if (map.containsKey(appenderRef.getRef())) {
88                  appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
89                      appenderRef.getFilter()));
90              } else {
91                  LOGGER.error("No appender named {} was configured", appenderRef);
92              }
93          }
94          if (errorRef != null) {
95              if (map.containsKey(errorRef)) {
96                  errorAppender = new AppenderControl(map.get(errorRef), null, null);
97              } else {
98                  LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
99              }
100         }
101         if (appenders.size() > 0) {
102             thread = new AsyncThread(appenders, queue);
103             thread.setName("AsyncAppender-" + getName());
104         } else if (errorRef == null) {
105             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
106         }
107 
108         thread.start();
109         super.start();
110     }
111 
112     @Override
113     public void stop() {
114         super.stop();
115         thread.shutdown();
116         try {
117             thread.join();
118         } catch (final InterruptedException ex) {
119             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
120         }
121     }
122 
123     /**
124      * Actual writing occurs here.
125      * <p/>
126      * @param evt The LogEvent.
127      */
128     @Override
129     public void append(final LogEvent evt) {
130         if (!isStarted()) {
131             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
132         }
133         if (!(evt instanceof Log4jLogEvent)) {
134             return; // only know how to Serialize Log4jLogEvents
135         }
136         Log4jLogEvent event = (Log4jLogEvent) evt;
137         boolean appendSuccessful = false;
138         if (blocking) {
139             if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
140                 // LOG4J2-485: avoid deadlock that would result from trying
141                 // to add to a full queue from appender thread
142                 event.setEndOfBatch(false); // queue is definitely not empty!
143                 appendSuccessful = thread.callAppenders(event);
144             } else {
145                 try {
146                     // wait for free slots in the queue
147                     queue.put(Log4jLogEvent.serialize(event, includeLocation));
148                     appendSuccessful = true;
149                 } catch (final InterruptedException e) {
150                     LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
151                             getName());
152                 }
153             }
154         } else {
155             appendSuccessful = queue.offer(Log4jLogEvent.serialize(event, includeLocation));
156             if (!appendSuccessful) {
157                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
158             }
159         }
160         if (!appendSuccessful && errorAppender != null) {
161             errorAppender.callAppender(event);
162         }
163     }
164 
165     /**
166      * Create an AsyncAppender.
167      * @param appenderRefs The Appenders to reference.
168      * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
169      * @param blocking True if the Appender should wait when the queue is full. The default is true.
170      * @param size The size of the event queue. The default is 128.
171      * @param name The name of the Appender.
172      * @param includeLocation whether to include location information. The default is false.
173      * @param filter The Filter or null.
174      * @param config The Configuration.
175      * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
176      *               they are propagated to the caller.
177      * @return The AsyncAppender.
178      */
179     @PluginFactory
180     public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
181             @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
182             @PluginAttribute("blocking") final String blocking, 
183             @PluginAttribute("bufferSize") final String size,
184             @PluginAttribute("name") final String name,
185             @PluginAttribute("includeLocation") final String includeLocation,
186             @PluginElement("Filter") final Filter filter, 
187             @PluginConfiguration final Configuration config,
188             @PluginAttribute("ignoreExceptions") final String ignore) {
189         if (name == null) {
190             LOGGER.error("No name provided for AsyncAppender");
191             return null;
192         }
193         if (appenderRefs == null) {
194             LOGGER.error("No appender references provided to AsyncAppender {}", name);
195         }
196 
197         final boolean isBlocking = Booleans.parseBoolean(blocking, true);
198         final int queueSize = AbstractAppender.parseInt(size, DEFAULT_QUEUE_SIZE);
199         final boolean isIncludeLocation = Boolean.parseBoolean(includeLocation);
200         final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
201 
202         return new AsyncAppender(name, filter, appenderRefs, errorRef,
203                 queueSize, isBlocking, ignoreExceptions, config, isIncludeLocation);
204     }
205 
206     /**
207      * Thread that calls the Appenders.
208      */
209     private class AsyncThread extends Thread {
210 
211         private volatile boolean shutdown = false;
212         private final List<AppenderControl> appenders;
213         private final BlockingQueue<Serializable> queue;
214 
215         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
216             this.appenders = appenders;
217             this.queue = queue;
218             setDaemon(true);
219             setName("AsyncAppenderThread" + threadSequence.getAndIncrement());
220         }
221 
222         @Override
223         public void run() {
224             isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
225             while (!shutdown) {
226                 Serializable s;
227                 try {
228                     s = queue.take();
229                     if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
230                         shutdown = true;
231                         continue;
232                     }
233                 } catch (final InterruptedException ex) {
234                     // No good reason for this.
235                     continue;
236                 }
237                 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
238                 event.setEndOfBatch(queue.isEmpty());
239                 boolean success = callAppenders(event);
240                 if (!success && errorAppender != null) {
241                     try {
242                         errorAppender.callAppender(event);
243                     } catch (final Exception ex) {
244                         // Silently accept the error.
245                     }
246                 }
247             }
248             // Process any remaining items in the queue.
249             while (!queue.isEmpty()) {
250                 try {
251                     final Serializable s = queue.take();
252                     if (s instanceof Log4jLogEvent) {
253                         final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
254                         event.setEndOfBatch(queue.isEmpty());
255                         callAppenders(event);
256                     }
257                 } catch (final InterruptedException ex) {
258                     // May have been interrupted to shut down.
259                 }
260             }
261         }
262 
263         /**
264          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on
265          * all registered {@code AppenderControl} objects, and returns {@code true}
266          * if at least one appender call was successful, {@code false} otherwise.
267          * Any exceptions are silently ignored.
268          * 
269          * @param event the event to forward to the registered appenders
270          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
271          */
272         boolean callAppenders(final Log4jLogEvent event) {
273             boolean success = false;
274             for (final AppenderControl control : appenders) {
275                 try {
276                     control.callAppender(event);
277                     success = true;
278                 } catch (final Exception ex) {
279                     // If no appender is successful the error appender will get it.
280                 }
281             }
282             return success;
283         }
284 
285         public void shutdown() {
286             shutdown = true;
287             if (queue.isEmpty()) {
288                 queue.offer(SHUTDOWN);
289             }
290         }
291     }
292 
293     /**
294      * Returns the names of the appenders that this asyncAppender delegates to
295      * as an array of Strings.
296      * @return the names of the sink appenders
297      */
298     public String[] getAppenderRefStrings() {
299         final String[] result = new String[appenderRefs.length];
300         for (int i = 0; i < result.length; i++) {
301             result[i] = appenderRefs[i].getRef();
302         }
303         return result;
304     }
305     
306     /**
307      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with
308      * every log event to determine the class and method where the logging call
309      * was made.
310      * @return {@code true} if location is included with every event, {@code false} otherwise
311      */
312     public boolean isIncludeLocation() {
313         return includeLocation;
314     }
315     
316     /**
317      * Returns {@code true} if this AsyncAppender will block when the queue is full,
318      * or {@code false} if events are dropped when the queue is full.
319      * @return whether this AsyncAppender will block or drop events when the queue is full.
320      */
321     public boolean isBlocking() {
322         return blocking;
323     }
324     
325     /**
326      * Returns the name of the appender that any errors are logged to or {@code null}.
327      * @return the name of the appender that any errors are logged to or {@code null}
328      */
329     public String getErrorRef() {
330         return errorRef;
331     }
332     
333     public int getQueueCapacity() {
334         return queueSize;
335     }
336     
337     public int getQueueRemainingCapacity() {
338         return queue.remainingCapacity();
339     }
340 }