001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.io.Serializable;
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ArrayBlockingQueue;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.logging.log4j.core.Appender;
028import org.apache.logging.log4j.core.Filter;
029import org.apache.logging.log4j.core.LogEvent;
030import org.apache.logging.log4j.core.config.AppenderControl;
031import org.apache.logging.log4j.core.config.AppenderRef;
032import org.apache.logging.log4j.core.config.Configuration;
033import org.apache.logging.log4j.core.config.ConfigurationException;
034import org.apache.logging.log4j.core.config.plugins.Plugin;
035import org.apache.logging.log4j.core.config.plugins.PluginAliases;
036import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
037import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
038import org.apache.logging.log4j.core.config.plugins.PluginElement;
039import org.apache.logging.log4j.core.config.plugins.PluginFactory;
040import org.apache.logging.log4j.core.helpers.Booleans;
041import org.apache.logging.log4j.core.impl.Log4jLogEvent;
042
043/**
044 * Appends to one or more Appenders asynchronously.  You can configure an
045 * AsyncAppender with one or more Appenders and an Appender to append to if the
046 * queue is full. The AsyncAppender does not allow a filter to be specified on
047 * the Appender references.
048 */
049@Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
050public final class AsyncAppender extends AbstractAppender {
051
052    private static final int DEFAULT_QUEUE_SIZE = 128;
053    private static final String SHUTDOWN = "Shutdown";
054
055    private final BlockingQueue<Serializable> queue;
056    private final int queueSize;
057    private final boolean blocking;
058    private final Configuration config;
059    private final AppenderRef[] appenderRefs;
060    private final String errorRef;
061    private final boolean includeLocation;
062    private AppenderControl errorAppender;
063    private AsyncThread thread;
064    private static final AtomicLong threadSequence = new AtomicLong(1);
065    private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
066
067
068    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
069                           final String errorRef, final int queueSize, final boolean blocking,
070                           final boolean ignoreExceptions, final Configuration config,
071                           final boolean includeLocation) {
072        super(name, filter, null, ignoreExceptions);
073        this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
074        this.queueSize = queueSize;
075        this.blocking = blocking;
076        this.config = config;
077        this.appenderRefs = appenderRefs;
078        this.errorRef = errorRef;
079        this.includeLocation = includeLocation;
080    }
081
082    @Override
083    public void start() {
084        final Map<String, Appender> map = config.getAppenders();
085        final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
086        for (final AppenderRef appenderRef : appenderRefs) {
087            if (map.containsKey(appenderRef.getRef())) {
088                appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
089                    appenderRef.getFilter()));
090            } else {
091                LOGGER.error("No appender named {} was configured", appenderRef);
092            }
093        }
094        if (errorRef != null) {
095            if (map.containsKey(errorRef)) {
096                errorAppender = new AppenderControl(map.get(errorRef), null, null);
097            } else {
098                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
099            }
100        }
101        if (appenders.size() > 0) {
102            thread = new AsyncThread(appenders, queue);
103            thread.setName("AsyncAppender-" + getName());
104        } else if (errorRef == null) {
105            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
106        }
107
108        thread.start();
109        super.start();
110    }
111
112    @Override
113    public void stop() {
114        super.stop();
115        thread.shutdown();
116        try {
117            thread.join();
118        } catch (final InterruptedException ex) {
119            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
120        }
121    }
122
123    /**
124     * Actual writing occurs here.
125     * <p/>
126     * @param evt The LogEvent.
127     */
128    @Override
129    public void append(final LogEvent evt) {
130        if (!isStarted()) {
131            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
132        }
133        if (!(evt instanceof Log4jLogEvent)) {
134            return; // only know how to Serialize Log4jLogEvents
135        }
136        Log4jLogEvent event = (Log4jLogEvent) evt;
137        boolean appendSuccessful = false;
138        if (blocking) {
139            if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
140                // LOG4J2-485: avoid deadlock that would result from trying
141                // to add to a full queue from appender thread
142                event.setEndOfBatch(false); // queue is definitely not empty!
143                appendSuccessful = thread.callAppenders(event);
144            } else {
145                try {
146                    // wait for free slots in the queue
147                    queue.put(Log4jLogEvent.serialize(event, includeLocation));
148                    appendSuccessful = true;
149                } catch (final InterruptedException e) {
150                    LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
151                            getName());
152                }
153            }
154        } else {
155            appendSuccessful = queue.offer(Log4jLogEvent.serialize(event, includeLocation));
156            if (!appendSuccessful) {
157                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
158            }
159        }
160        if (!appendSuccessful && errorAppender != null) {
161            errorAppender.callAppender(event);
162        }
163    }
164
165    /**
166     * Create an AsyncAppender.
167     * @param appenderRefs The Appenders to reference.
168     * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
169     * @param blocking True if the Appender should wait when the queue is full. The default is true.
170     * @param size The size of the event queue. The default is 128.
171     * @param name The name of the Appender.
172     * @param includeLocation whether to include location information. The default is false.
173     * @param filter The Filter or null.
174     * @param config The Configuration.
175     * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
176     *               they are propagated to the caller.
177     * @return The AsyncAppender.
178     */
179    @PluginFactory
180    public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
181            @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
182            @PluginAttribute("blocking") final String blocking, 
183            @PluginAttribute("bufferSize") final String size,
184            @PluginAttribute("name") final String name,
185            @PluginAttribute("includeLocation") final String includeLocation,
186            @PluginElement("Filter") final Filter filter, 
187            @PluginConfiguration final Configuration config,
188            @PluginAttribute("ignoreExceptions") final String ignore) {
189        if (name == null) {
190            LOGGER.error("No name provided for AsyncAppender");
191            return null;
192        }
193        if (appenderRefs == null) {
194            LOGGER.error("No appender references provided to AsyncAppender {}", name);
195        }
196
197        final boolean isBlocking = Booleans.parseBoolean(blocking, true);
198        final int queueSize = AbstractAppender.parseInt(size, DEFAULT_QUEUE_SIZE);
199        final boolean isIncludeLocation = Boolean.parseBoolean(includeLocation);
200        final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
201
202        return new AsyncAppender(name, filter, appenderRefs, errorRef,
203                queueSize, isBlocking, ignoreExceptions, config, isIncludeLocation);
204    }
205
206    /**
207     * Thread that calls the Appenders.
208     */
209    private class AsyncThread extends Thread {
210
211        private volatile boolean shutdown = false;
212        private final List<AppenderControl> appenders;
213        private final BlockingQueue<Serializable> queue;
214
215        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
216            this.appenders = appenders;
217            this.queue = queue;
218            setDaemon(true);
219            setName("AsyncAppenderThread" + threadSequence.getAndIncrement());
220        }
221
222        @Override
223        public void run() {
224            isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
225            while (!shutdown) {
226                Serializable s;
227                try {
228                    s = queue.take();
229                    if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
230                        shutdown = true;
231                        continue;
232                    }
233                } catch (final InterruptedException ex) {
234                    // No good reason for this.
235                    continue;
236                }
237                final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
238                event.setEndOfBatch(queue.isEmpty());
239                boolean success = callAppenders(event);
240                if (!success && errorAppender != null) {
241                    try {
242                        errorAppender.callAppender(event);
243                    } catch (final Exception ex) {
244                        // Silently accept the error.
245                    }
246                }
247            }
248            // Process any remaining items in the queue.
249            while (!queue.isEmpty()) {
250                try {
251                    final Serializable s = queue.take();
252                    if (s instanceof Log4jLogEvent) {
253                        final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
254                        event.setEndOfBatch(queue.isEmpty());
255                        callAppenders(event);
256                    }
257                } catch (final InterruptedException ex) {
258                    // May have been interrupted to shut down.
259                }
260            }
261        }
262
263        /**
264         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on
265         * all registered {@code AppenderControl} objects, and returns {@code true}
266         * if at least one appender call was successful, {@code false} otherwise.
267         * Any exceptions are silently ignored.
268         * 
269         * @param event the event to forward to the registered appenders
270         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
271         */
272        boolean callAppenders(final Log4jLogEvent event) {
273            boolean success = false;
274            for (final AppenderControl control : appenders) {
275                try {
276                    control.callAppender(event);
277                    success = true;
278                } catch (final Exception ex) {
279                    // If no appender is successful the error appender will get it.
280                }
281            }
282            return success;
283        }
284
285        public void shutdown() {
286            shutdown = true;
287            if (queue.isEmpty()) {
288                queue.offer(SHUTDOWN);
289            }
290        }
291    }
292
293    /**
294     * Returns the names of the appenders that this asyncAppender delegates to
295     * as an array of Strings.
296     * @return the names of the sink appenders
297     */
298    public String[] getAppenderRefStrings() {
299        final String[] result = new String[appenderRefs.length];
300        for (int i = 0; i < result.length; i++) {
301            result[i] = appenderRefs[i].getRef();
302        }
303        return result;
304    }
305    
306    /**
307     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with
308     * every log event to determine the class and method where the logging call
309     * was made.
310     * @return {@code true} if location is included with every event, {@code false} otherwise
311     */
312    public boolean isIncludeLocation() {
313        return includeLocation;
314    }
315    
316    /**
317     * Returns {@code true} if this AsyncAppender will block when the queue is full,
318     * or {@code false} if events are dropped when the queue is full.
319     * @return whether this AsyncAppender will block or drop events when the queue is full.
320     */
321    public boolean isBlocking() {
322        return blocking;
323    }
324    
325    /**
326     * Returns the name of the appender that any errors are logged to or {@code null}.
327     * @return the name of the appender that any errors are logged to or {@code null}
328     */
329    public String getErrorRef() {
330        return errorRef;
331    }
332    
333    public int getQueueCapacity() {
334        return queueSize;
335    }
336    
337    public int getQueueRemainingCapacity() {
338        return queue.remainingCapacity();
339    }
340}