001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.io.Serializable;
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ArrayBlockingQueue;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.logging.log4j.core.Appender;
028import org.apache.logging.log4j.core.Filter;
029import org.apache.logging.log4j.core.LogEvent;
030import org.apache.logging.log4j.core.async.RingBufferLogEvent;
031import org.apache.logging.log4j.core.config.AppenderControl;
032import org.apache.logging.log4j.core.config.AppenderRef;
033import org.apache.logging.log4j.core.config.Configuration;
034import org.apache.logging.log4j.core.config.ConfigurationException;
035import org.apache.logging.log4j.core.config.plugins.Plugin;
036import org.apache.logging.log4j.core.config.plugins.PluginAliases;
037import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
038import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
039import org.apache.logging.log4j.core.config.plugins.PluginElement;
040import org.apache.logging.log4j.core.config.plugins.PluginFactory;
041import org.apache.logging.log4j.core.impl.Log4jLogEvent;
042import org.apache.logging.log4j.core.util.Constants;
043
044/**
045 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
046 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
047 * references.
048 */
049@Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
050public final class AsyncAppender extends AbstractAppender {
051
052    private static final long serialVersionUID = 1L;
053    private static final int DEFAULT_QUEUE_SIZE = 128;
054    private static final String SHUTDOWN = "Shutdown";
055
056    private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
057    private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
058
059    private final BlockingQueue<Serializable> queue;
060    private final int queueSize;
061    private final boolean blocking;
062    private final long shutdownTimeout;
063    private final Configuration config;
064    private final AppenderRef[] appenderRefs;
065    private final String errorRef;
066    private final boolean includeLocation;
067    private AppenderControl errorAppender;
068    private AsyncThread thread;
069
070    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
071            final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
072            final long shutdownTimeout, final Configuration config, final boolean includeLocation) {
073        super(name, filter, null, ignoreExceptions);
074        this.queue = new ArrayBlockingQueue<>(queueSize);
075        this.queueSize = queueSize;
076        this.blocking = blocking;
077        this.shutdownTimeout = shutdownTimeout;
078        this.config = config;
079        this.appenderRefs = appenderRefs;
080        this.errorRef = errorRef;
081        this.includeLocation = includeLocation;
082    }
083
084    @Override
085    public void start() {
086        final Map<String, Appender> map = config.getAppenders();
087        final List<AppenderControl> appenders = new ArrayList<>();
088        for (final AppenderRef appenderRef : appenderRefs) {
089            final Appender appender = map.get(appenderRef.getRef());
090            if (appender != null) {
091                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
092            } else {
093                LOGGER.error("No appender named {} was configured", appenderRef);
094            }
095        }
096        if (errorRef != null) {
097            final Appender appender = map.get(errorRef);
098            if (appender != null) {
099                errorAppender = new AppenderControl(appender, null, null);
100            } else {
101                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
102            }
103        }
104        if (appenders.size() > 0) {
105            thread = new AsyncThread(appenders, queue);
106            thread.setName("AsyncAppender-" + getName());
107        } else if (errorRef == null) {
108            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
109        }
110
111        thread.start();
112        super.start();
113    }
114
115    @Override
116    public void stop() {
117        super.stop();
118        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
119        thread.shutdown();
120        try {
121            thread.join(shutdownTimeout);
122        } catch (final InterruptedException ex) {
123            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
124        }
125        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
126    }
127
128    /**
129     * Actual writing occurs here.
130     * 
131     * @param logEvent The LogEvent.
132     */
133    @Override
134    public void append(LogEvent logEvent) {
135        if (!isStarted()) {
136            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
137        }
138        if (!(logEvent instanceof Log4jLogEvent)) {
139            if (!(logEvent instanceof RingBufferLogEvent)) {
140                return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents
141            }
142            logEvent = ((RingBufferLogEvent) logEvent).createMemento();
143        }
144        if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
145            logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
146        }
147        final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
148        boolean appendSuccessful = false;
149        if (blocking) {
150            if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
151                // LOG4J2-485: avoid deadlock that would result from trying
152                // to add to a full queue from appender thread
153                coreEvent.setEndOfBatch(false); // queue is definitely not empty!
154                appendSuccessful = thread.callAppenders(coreEvent);
155            } else {
156                final Serializable serialized = Log4jLogEvent.serialize(coreEvent, includeLocation);
157                try {
158                    // wait for free slots in the queue
159                    queue.put(serialized);
160                    appendSuccessful = true;
161                } catch (final InterruptedException e) {
162                    // LOG4J2-1049: Some applications use Thread.interrupt() to send
163                    // messages between application threads. This does not necessarily
164                    // mean that the queue is full. To prevent dropping a log message,
165                    // quickly try to offer the event to the queue again.
166                    // (Yes, this means there is a possibility the same event is logged twice.)
167                    //
168                    // Finally, catching the InterruptedException means the
169                    // interrupted flag has been cleared on the current thread.
170                    // This may interfere with the application's expectation of
171                    // being interrupted, so when we are done, we set the interrupted
172                    // flag again.
173                    appendSuccessful = queue.offer(serialized);
174                    if (!appendSuccessful) {
175                        LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
176                                getName());
177                    }
178                    // set the interrupted flag again.
179                    Thread.currentThread().interrupt();
180                }
181            }
182        } else {
183            appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
184            if (!appendSuccessful) {
185                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
186            }
187        }
188        if (!appendSuccessful && errorAppender != null) {
189            errorAppender.callAppender(coreEvent);
190        }
191    }
192
193    /**
194     * Create an AsyncAppender.
195     * 
196     * @param appenderRefs The Appenders to reference.
197     * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
198     * @param blocking True if the Appender should wait when the queue is full. The default is true.
199     * @param shutdownTimeout How many milliseconds the Appender should wait to flush outstanding log events
200     *                        in the queue on shutdown. The default is zero which means to wait forever.
201     * @param size The size of the event queue. The default is 128.
202     * @param name The name of the Appender.
203     * @param includeLocation whether to include location information. The default is false.
204     * @param filter The Filter or null.
205     * @param config The Configuration.
206     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
207     *            otherwise they are propagated to the caller.
208     * @return The AsyncAppender.
209     */
210    @PluginFactory
211    public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
212            @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
213            @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
214            @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout,
215            @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
216            @PluginAttribute("name") final String name,
217            @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
218            @PluginElement("Filter") final Filter filter, @PluginConfiguration final Configuration config,
219            @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
220        if (name == null) {
221            LOGGER.error("No name provided for AsyncAppender");
222            return null;
223        }
224        if (appenderRefs == null) {
225            LOGGER.error("No appender references provided to AsyncAppender {}", name);
226        }
227
228        return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
229                shutdownTimeout, config, includeLocation);
230    }
231
232    /**
233     * Thread that calls the Appenders.
234     */
235    private class AsyncThread extends Thread {
236
237        private volatile boolean shutdown = false;
238        private final List<AppenderControl> appenders;
239        private final BlockingQueue<Serializable> queue;
240
241        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
242            this.appenders = appenders;
243            this.queue = queue;
244            setDaemon(true);
245            setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement());
246        }
247
248        @Override
249        public void run() {
250            isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
251            while (!shutdown) {
252                Serializable s;
253                try {
254                    s = queue.take();
255                    if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
256                        shutdown = true;
257                        continue;
258                    }
259                } catch (final InterruptedException ex) {
260                    break; // LOG4J2-830
261                }
262                final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
263                event.setEndOfBatch(queue.isEmpty());
264                final boolean success = callAppenders(event);
265                if (!success && errorAppender != null) {
266                    try {
267                        errorAppender.callAppender(event);
268                    } catch (final Exception ex) {
269                        // Silently accept the error.
270                    }
271                }
272            }
273            // Process any remaining items in the queue.
274            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
275                    queue.size());
276            int count = 0;
277            int ignored = 0;
278            while (!queue.isEmpty()) {
279                try {
280                    final Serializable s = queue.take();
281                    if (Log4jLogEvent.canDeserialize(s)) {
282                        final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
283                        event.setEndOfBatch(queue.isEmpty());
284                        callAppenders(event);
285                        count++;
286                    } else {
287                        ignored++;
288                        LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
289                    }
290                } catch (final InterruptedException ex) {
291                    // May have been interrupted to shut down.
292                    // Here we ignore interrupts and try to process all remaining events.
293                }
294            }
295            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
296                    + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
297        }
298
299        /**
300         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
301         * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
302         * exceptions are silently ignored.
303         *
304         * @param event the event to forward to the registered appenders
305         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
306         */
307        boolean callAppenders(final Log4jLogEvent event) {
308            boolean success = false;
309            for (final AppenderControl control : appenders) {
310                try {
311                    control.callAppender(event);
312                    success = true;
313                } catch (final Exception ex) {
314                    // If no appender is successful the error appender will get it.
315                }
316            }
317            return success;
318        }
319
320        public void shutdown() {
321            shutdown = true;
322            if (queue.isEmpty()) {
323                queue.offer(SHUTDOWN);
324            }
325        }
326    }
327
328    /**
329     * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
330     * 
331     * @return the names of the sink appenders
332     */
333    public String[] getAppenderRefStrings() {
334        final String[] result = new String[appenderRefs.length];
335        for (int i = 0; i < result.length; i++) {
336            result[i] = appenderRefs[i].getRef();
337        }
338        return result;
339    }
340
341    /**
342     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
343     * the class and method where the logging call was made.
344     * 
345     * @return {@code true} if location is included with every event, {@code false} otherwise
346     */
347    public boolean isIncludeLocation() {
348        return includeLocation;
349    }
350
351    /**
352     * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
353     * dropped when the queue is full.
354     * 
355     * @return whether this AsyncAppender will block or drop events when the queue is full.
356     */
357    public boolean isBlocking() {
358        return blocking;
359    }
360
361    /**
362     * Returns the name of the appender that any errors are logged to or {@code null}.
363     * 
364     * @return the name of the appender that any errors are logged to or {@code null}
365     */
366    public String getErrorRef() {
367        return errorRef;
368    }
369
370    public int getQueueCapacity() {
371        return queueSize;
372    }
373
374    public int getQueueRemainingCapacity() {
375        return queue.remainingCapacity();
376    }
377}