001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.ArrayBlockingQueue;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.atomic.AtomicLong;
025
026import org.apache.logging.log4j.core.AbstractLogEvent;
027import org.apache.logging.log4j.core.Appender;
028import org.apache.logging.log4j.core.Filter;
029import org.apache.logging.log4j.core.LogEvent;
030import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
031import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
032import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
033import org.apache.logging.log4j.core.async.EventRoute;
034import org.apache.logging.log4j.core.config.AppenderControl;
035import org.apache.logging.log4j.core.config.AppenderRef;
036import org.apache.logging.log4j.core.config.Configuration;
037import org.apache.logging.log4j.core.config.ConfigurationException;
038import org.apache.logging.log4j.core.config.plugins.Plugin;
039import org.apache.logging.log4j.core.config.plugins.PluginAliases;
040import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
041import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
042import org.apache.logging.log4j.core.config.plugins.PluginElement;
043import org.apache.logging.log4j.core.config.plugins.PluginFactory;
044import org.apache.logging.log4j.core.impl.Log4jLogEvent;
045import org.apache.logging.log4j.core.util.Constants;
046
047/**
048 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
049 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
050 * references.
051 */
052@Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
053public final class AsyncAppender extends AbstractAppender {
054
055    private static final int DEFAULT_QUEUE_SIZE = 128;
056    private static final LogEvent SHUTDOWN = new AbstractLogEvent() {
057    };
058
059    private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
060
061    private final BlockingQueue<LogEvent> queue;
062    private final int queueSize;
063    private final boolean blocking;
064    private final long shutdownTimeout;
065    private final Configuration config;
066    private final AppenderRef[] appenderRefs;
067    private final String errorRef;
068    private final boolean includeLocation;
069    private AppenderControl errorAppender;
070    private AsyncThread thread;
071    private AsyncQueueFullPolicy asyncQueueFullPolicy;
072
073    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
074                          final String errorRef, final int queueSize, final boolean blocking,
075                          final boolean ignoreExceptions,
076                          final long shutdownTimeout, final Configuration config, final boolean includeLocation) {
077        super(name, filter, null, ignoreExceptions);
078        this.queue = new ArrayBlockingQueue<>(queueSize);
079        this.queueSize = queueSize;
080        this.blocking = blocking;
081        this.shutdownTimeout = shutdownTimeout;
082        this.config = config;
083        this.appenderRefs = appenderRefs;
084        this.errorRef = errorRef;
085        this.includeLocation = includeLocation;
086    }
087
088    @Override
089    public void start() {
090        final Map<String, Appender> map = config.getAppenders();
091        final List<AppenderControl> appenders = new ArrayList<>();
092        for (final AppenderRef appenderRef : appenderRefs) {
093            final Appender appender = map.get(appenderRef.getRef());
094            if (appender != null) {
095                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
096            } else {
097                LOGGER.error("No appender named {} was configured", appenderRef);
098            }
099        }
100        if (errorRef != null) {
101            final Appender appender = map.get(errorRef);
102            if (appender != null) {
103                errorAppender = new AppenderControl(appender, null, null);
104            } else {
105                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
106            }
107        }
108        if (appenders.size() > 0) {
109            thread = new AsyncThread(appenders, queue);
110            thread.setName("AsyncAppender-" + getName());
111        } else if (errorRef == null) {
112            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
113        }
114        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
115
116        thread.start();
117        super.start();
118    }
119
120    @Override
121    public void stop() {
122        super.stop();
123        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
124        thread.shutdown();
125        try {
126            thread.join(shutdownTimeout);
127        } catch (final InterruptedException ex) {
128            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
129        }
130        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
131
132        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
133            LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
134                DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
135        }
136    }
137
138    /**
139     * Actual writing occurs here.
140     *
141     * @param logEvent The LogEvent.
142     */
143    @Override
144    public void append(final LogEvent logEvent) {
145        if (!isStarted()) {
146            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
147        }
148        if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
149            logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
150        }
151        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
152        if (!queue.offer(memento)) {
153            if (blocking) {
154                // delegate to the event router (which may discard, enqueue and block, or log in current thread)
155                final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
156                route.logMessage(this, memento);
157            } else {
158                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
159                logToErrorAppenderIfNecessary(false, memento);
160            }
161        }
162    }
163
164    /**
165     * FOR INTERNAL USE ONLY.
166     *
167     * @param logEvent the event to log
168     */
169    public void logMessageInCurrentThread(final LogEvent logEvent) {
170        logEvent.setEndOfBatch(queue.isEmpty());
171        final boolean appendSuccessful = thread.callAppenders(logEvent);
172        logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
173    }
174
175    /**
176     * FOR INTERNAL USE ONLY.
177     *
178     * @param logEvent the event to log
179     */
180    public void logMessageInBackgroundThread(final LogEvent logEvent) {
181        try {
182            // wait for free slots in the queue
183            queue.put(logEvent);
184        } catch (final InterruptedException e) {
185            final boolean appendSuccessful = handleInterruptedException(logEvent);
186            logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
187        }
188    }
189
190    // LOG4J2-1049: Some applications use Thread.interrupt() to send
191    // messages between application threads. This does not necessarily
192    // mean that the queue is full. To prevent dropping a log message,
193    // quickly try to offer the event to the queue again.
194    // (Yes, this means there is a possibility the same event is logged twice.)
195    //
196    // Finally, catching the InterruptedException means the
197    // interrupted flag has been cleared on the current thread.
198    // This may interfere with the application's expectation of
199    // being interrupted, so when we are done, we set the interrupted
200    // flag again.
201    private boolean handleInterruptedException(final LogEvent memento) {
202        final boolean appendSuccessful = queue.offer(memento);
203        if (!appendSuccessful) {
204            LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
205                getName());
206        }
207        // set the interrupted flag again.
208        Thread.currentThread().interrupt();
209        return appendSuccessful;
210    }
211
212    private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
213        if (!appendSuccessful && errorAppender != null) {
214            errorAppender.callAppender(logEvent);
215        }
216    }
217
218    /**
219     * Create an AsyncAppender.
220     *
221     * @param appenderRefs     The Appenders to reference.
222     * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
223     * @param blocking         True if the Appender should wait when the queue is full. The default is true.
224     * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
225     *                         in the queue on shutdown. The default is zero which means to wait forever.
226     * @param size             The size of the event queue. The default is 128.
227     * @param name             The name of the Appender.
228     * @param includeLocation  whether to include location information. The default is false.
229     * @param filter           The Filter or null.
230     * @param config           The Configuration.
231     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
232     *                         otherwise they are propagated to the caller.
233     * @return The AsyncAppender.
234     */
235    @PluginFactory
236    public static AsyncAppender createAppender(
237        // @formatter:off
238        @PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
239        @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
240        @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
241        @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout,
242        @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
243        @PluginAttribute("name") final String name,
244        @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
245        @PluginElement("Filter") final Filter filter,
246        @PluginConfiguration final Configuration config,
247        @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
248        // @formatter:on
249        if (name == null) {
250            LOGGER.error("No name provided for AsyncAppender");
251            return null;
252        }
253        if (appenderRefs == null) {
254            LOGGER.error("No appender references provided to AsyncAppender {}", name);
255        }
256
257        return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
258            shutdownTimeout, config, includeLocation);
259    }
260
261    /**
262     * Thread that calls the Appenders.
263     */
264    private class AsyncThread extends Thread {
265
266        private volatile boolean shutdown = false;
267        private final List<AppenderControl> appenders;
268        private final BlockingQueue<LogEvent> queue;
269
270        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
271            this.appenders = appenders;
272            this.queue = queue;
273            setDaemon(true);
274            setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement());
275        }
276
277        @Override
278        public void run() {
279            while (!shutdown) {
280                LogEvent event;
281                try {
282                    event = queue.take();
283                    if (event == SHUTDOWN) {
284                        shutdown = true;
285                        continue;
286                    }
287                } catch (final InterruptedException ex) {
288                    break; // LOG4J2-830
289                }
290                event.setEndOfBatch(queue.isEmpty());
291                final boolean success = callAppenders(event);
292                if (!success && errorAppender != null) {
293                    try {
294                        errorAppender.callAppender(event);
295                    } catch (final Exception ex) {
296                        // Silently accept the error.
297                    }
298                }
299            }
300            // Process any remaining items in the queue.
301            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
302                queue.size());
303            int count = 0;
304            int ignored = 0;
305            while (!queue.isEmpty()) {
306                try {
307                    final LogEvent event = queue.take();
308                    if (event instanceof Log4jLogEvent) {
309                        final Log4jLogEvent logEvent = (Log4jLogEvent) event;
310                        logEvent.setEndOfBatch(queue.isEmpty());
311                        callAppenders(logEvent);
312                        count++;
313                    } else {
314                        ignored++;
315                        LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
316                    }
317                } catch (final InterruptedException ex) {
318                    // May have been interrupted to shut down.
319                    // Here we ignore interrupts and try to process all remaining events.
320                }
321            }
322            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
323                + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
324        }
325
326        /**
327         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
328         * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
329         * exceptions are silently ignored.
330         *
331         * @param event the event to forward to the registered appenders
332         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
333         */
334        boolean callAppenders(final LogEvent event) {
335            boolean success = false;
336            for (final AppenderControl control : appenders) {
337                try {
338                    control.callAppender(event);
339                    success = true;
340                } catch (final Exception ex) {
341                    // If no appender is successful the error appender will get it.
342                }
343            }
344            return success;
345        }
346
347        public void shutdown() {
348            shutdown = true;
349            if (queue.isEmpty()) {
350                queue.offer(SHUTDOWN);
351            }
352            if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
353                this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
354            }
355        }
356    }
357
358    /**
359     * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
360     *
361     * @return the names of the sink appenders
362     */
363    public String[] getAppenderRefStrings() {
364        final String[] result = new String[appenderRefs.length];
365        for (int i = 0; i < result.length; i++) {
366            result[i] = appenderRefs[i].getRef();
367        }
368        return result;
369    }
370
371    /**
372     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
373     * the class and method where the logging call was made.
374     *
375     * @return {@code true} if location is included with every event, {@code false} otherwise
376     */
377    public boolean isIncludeLocation() {
378        return includeLocation;
379    }
380
381    /**
382     * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
383     * dropped when the queue is full.
384     *
385     * @return whether this AsyncAppender will block or drop events when the queue is full.
386     */
387    public boolean isBlocking() {
388        return blocking;
389    }
390
391    /**
392     * Returns the name of the appender that any errors are logged to or {@code null}.
393     *
394     * @return the name of the appender that any errors are logged to or {@code null}
395     */
396    public String getErrorRef() {
397        return errorRef;
398    }
399
400    public int getQueueCapacity() {
401        return queueSize;
402    }
403
404    public int getQueueRemainingCapacity() {
405        return queue.remainingCapacity();
406    }
407}