001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.io.Serializable;
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ArrayBlockingQueue;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.logging.log4j.core.Appender;
028import org.apache.logging.log4j.core.Filter;
029import org.apache.logging.log4j.core.LogEvent;
030import org.apache.logging.log4j.core.async.RingBufferLogEvent;
031import org.apache.logging.log4j.core.config.AppenderControl;
032import org.apache.logging.log4j.core.config.AppenderRef;
033import org.apache.logging.log4j.core.config.Configuration;
034import org.apache.logging.log4j.core.config.ConfigurationException;
035import org.apache.logging.log4j.core.config.plugins.Plugin;
036import org.apache.logging.log4j.core.config.plugins.PluginAliases;
037import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
038import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
039import org.apache.logging.log4j.core.config.plugins.PluginElement;
040import org.apache.logging.log4j.core.config.plugins.PluginFactory;
041import org.apache.logging.log4j.core.impl.Log4jLogEvent;
042
043/**
044 * Appends to one or more Appenders asynchronously.  You can configure an
045 * AsyncAppender with one or more Appenders and an Appender to append to if the
046 * queue is full. The AsyncAppender does not allow a filter to be specified on
047 * the Appender references.
048 */
049@Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
050public final class AsyncAppender extends AbstractAppender {
051
052    private static final long serialVersionUID = 1L;
053    private static final int DEFAULT_QUEUE_SIZE = 128;
054    private static final String SHUTDOWN = "Shutdown";
055
056    private final BlockingQueue<Serializable> queue;
057    private final int queueSize;
058    private final boolean blocking;
059    private final Configuration config;
060    private final AppenderRef[] appenderRefs;
061    private final String errorRef;
062    private final boolean includeLocation;
063    private AppenderControl errorAppender;
064    private AsyncThread thread;
065    private static final AtomicLong threadSequence = new AtomicLong(1);
066    private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
067
068
069    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
070                           final String errorRef, final int queueSize, final boolean blocking,
071                           final boolean ignoreExceptions, final Configuration config,
072                           final boolean includeLocation) {
073        super(name, filter, null, ignoreExceptions);
074        this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
075        this.queueSize = queueSize;
076        this.blocking = blocking;
077        this.config = config;
078        this.appenderRefs = appenderRefs;
079        this.errorRef = errorRef;
080        this.includeLocation = includeLocation;
081    }
082
083    @Override
084    public void start() {
085        final Map<String, Appender> map = config.getAppenders();
086        final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
087        for (final AppenderRef appenderRef : appenderRefs) {
088            if (map.containsKey(appenderRef.getRef())) {
089                appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
090                    appenderRef.getFilter()));
091            } else {
092                LOGGER.error("No appender named {} was configured", appenderRef);
093            }
094        }
095        if (errorRef != null) {
096            if (map.containsKey(errorRef)) {
097                errorAppender = new AppenderControl(map.get(errorRef), null, null);
098            } else {
099                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
100            }
101        }
102        if (appenders.size() > 0) {
103            thread = new AsyncThread(appenders, queue);
104            thread.setName("AsyncAppender-" + getName());
105        } else if (errorRef == null) {
106            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
107        }
108
109        thread.start();
110        super.start();
111    }
112
113    @Override
114    public void stop() {
115        super.stop();
116        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
117        thread.shutdown();
118        try {
119            thread.join();
120        } catch (final InterruptedException ex) {
121            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
122        }
123        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
124    }
125
126    /**
127     * Actual writing occurs here.
128     * 
129     * @param logEvent
130     *        The LogEvent.
131     */
132    @Override
133    public void append(LogEvent logEvent) {
134        if (!isStarted()) {
135            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
136        }
137        if (!(logEvent instanceof Log4jLogEvent)) {
138            if (!(logEvent instanceof RingBufferLogEvent)) {
139                return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents
140            }
141            logEvent = ((RingBufferLogEvent) logEvent).createMemento();
142        }
143        logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
144        final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
145        boolean appendSuccessful = false;
146        if (blocking) {
147            if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
148                // LOG4J2-485: avoid deadlock that would result from trying
149                // to add to a full queue from appender thread
150                coreEvent.setEndOfBatch(false); // queue is definitely not empty!
151                appendSuccessful = thread.callAppenders(coreEvent);
152            } else {
153                try {
154                    // wait for free slots in the queue
155                    queue.put(Log4jLogEvent.serialize(coreEvent, includeLocation));
156                    appendSuccessful = true;
157                } catch (final InterruptedException e) {
158                    LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
159                            getName());
160                }
161            }
162        } else {
163            appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
164            if (!appendSuccessful) {
165                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
166            }
167        }
168        if (!appendSuccessful && errorAppender != null) {
169            errorAppender.callAppender(coreEvent);
170        }
171    }
172
173    /**
174     * Create an AsyncAppender.
175     * @param appenderRefs The Appenders to reference.
176     * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
177     * @param blocking True if the Appender should wait when the queue is full. The default is true.
178     * @param size The size of the event queue. The default is 128.
179     * @param name The name of the Appender.
180     * @param includeLocation whether to include location information. The default is false.
181     * @param filter The Filter or null.
182     * @param config The Configuration.
183     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
184     *                         otherwise they are propagated to the caller.
185     * @return The AsyncAppender.
186     */
187    @PluginFactory
188    public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
189            @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
190            @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
191            @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
192            @PluginAttribute("name") final String name,
193            @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
194            @PluginElement("Filter") final Filter filter,
195            @PluginConfiguration final Configuration config,
196            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
197        if (name == null) {
198            LOGGER.error("No name provided for AsyncAppender");
199            return null;
200        }
201        if (appenderRefs == null) {
202            LOGGER.error("No appender references provided to AsyncAppender {}", name);
203        }
204
205        return new AsyncAppender(name, filter, appenderRefs, errorRef,
206                size, blocking, ignoreExceptions, config, includeLocation);
207    }
208
209    /**
210     * Thread that calls the Appenders.
211     */
212    private class AsyncThread extends Thread {
213
214        private volatile boolean shutdown = false;
215        private final List<AppenderControl> appenders;
216        private final BlockingQueue<Serializable> queue;
217
218        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
219            this.appenders = appenders;
220            this.queue = queue;
221            setDaemon(true);
222            setName("AsyncAppenderThread" + threadSequence.getAndIncrement());
223        }
224
225        @Override
226        public void run() {
227            isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
228            while (!shutdown) {
229                Serializable s;
230                try {
231                    s = queue.take();
232                    if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
233                        shutdown = true;
234                        continue;
235                    }
236                } catch (final InterruptedException ex) {
237                    break; // LOG4J2-830
238                }
239                final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
240                event.setEndOfBatch(queue.isEmpty());
241                final boolean success = callAppenders(event);
242                if (!success && errorAppender != null) {
243                    try {
244                        errorAppender.callAppender(event);
245                    } catch (final Exception ex) {
246                        // Silently accept the error.
247                    }
248                }
249            }
250            // Process any remaining items in the queue.
251            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
252                    queue.size());
253            int count= 0;
254            int ignored = 0;
255            while (!queue.isEmpty()) {
256                try {
257                    final Serializable s = queue.take();
258                    if (Log4jLogEvent.canDeserialize(s)) {
259                        final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
260                        event.setEndOfBatch(queue.isEmpty());
261                        callAppenders(event);
262                        count++;
263                    } else {
264                        ignored++;
265                        LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
266                    }
267                } catch (final InterruptedException ex) {
268                    // May have been interrupted to shut down.
269                    // Here we ignore interrupts and try to process all remaining events.
270                }
271            }
272            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " +
273                        "Processed {} and ignored {} events since shutdown started.",
274                        queue.size(), count, ignored);
275        }
276
277        /**
278         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on
279         * all registered {@code AppenderControl} objects, and returns {@code true}
280         * if at least one appender call was successful, {@code false} otherwise.
281         * Any exceptions are silently ignored.
282         *
283         * @param event the event to forward to the registered appenders
284         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
285         */
286        boolean callAppenders(final Log4jLogEvent event) {
287            boolean success = false;
288            for (final AppenderControl control : appenders) {
289                try {
290                    control.callAppender(event);
291                    success = true;
292                } catch (final Exception ex) {
293                    // If no appender is successful the error appender will get it.
294                }
295            }
296            return success;
297        }
298
299        public void shutdown() {
300            shutdown = true;
301            if (queue.isEmpty()) {
302                queue.offer(SHUTDOWN);
303            }
304        }
305    }
306
307    /**
308     * Returns the names of the appenders that this asyncAppender delegates to
309     * as an array of Strings.
310     * @return the names of the sink appenders
311     */
312    public String[] getAppenderRefStrings() {
313        final String[] result = new String[appenderRefs.length];
314        for (int i = 0; i < result.length; i++) {
315            result[i] = appenderRefs[i].getRef();
316        }
317        return result;
318    }
319
320    /**
321     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with
322     * every log event to determine the class and method where the logging call
323     * was made.
324     * @return {@code true} if location is included with every event, {@code false} otherwise
325     */
326    public boolean isIncludeLocation() {
327        return includeLocation;
328    }
329
330    /**
331     * Returns {@code true} if this AsyncAppender will block when the queue is full,
332     * or {@code false} if events are dropped when the queue is full.
333     * @return whether this AsyncAppender will block or drop events when the queue is full.
334     */
335    public boolean isBlocking() {
336        return blocking;
337    }
338
339    /**
340     * Returns the name of the appender that any errors are logged to or {@code null}.
341     * @return the name of the appender that any errors are logged to or {@code null}
342     */
343    public String getErrorRef() {
344        return errorRef;
345    }
346
347    public int getQueueCapacity() {
348        return queueSize;
349    }
350
351    public int getQueueRemainingCapacity() {
352        return queue.remainingCapacity();
353    }
354}