001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TransferQueue;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.logging.log4j.core.AbstractLogEvent;
028import org.apache.logging.log4j.core.Appender;
029import org.apache.logging.log4j.core.Core;
030import org.apache.logging.log4j.core.Filter;
031import org.apache.logging.log4j.core.LogEvent;
032import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
033import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
034import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
035import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
036import org.apache.logging.log4j.core.async.BlockingQueueFactory;
037import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
038import org.apache.logging.log4j.core.async.EventRoute;
039import org.apache.logging.log4j.core.async.InternalAsyncUtil;
040import org.apache.logging.log4j.core.config.AppenderControl;
041import org.apache.logging.log4j.core.config.AppenderRef;
042import org.apache.logging.log4j.core.config.Configuration;
043import org.apache.logging.log4j.core.config.ConfigurationException;
044import org.apache.logging.log4j.core.config.plugins.Plugin;
045import org.apache.logging.log4j.core.config.plugins.PluginAliases;
046import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
047import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
048import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
049import org.apache.logging.log4j.core.config.plugins.PluginElement;
050import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
051import org.apache.logging.log4j.core.impl.Log4jLogEvent;
052import org.apache.logging.log4j.core.util.Log4jThread;
053import org.apache.logging.log4j.spi.AbstractLogger;
054
055/**
056 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
057 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
058 * references.
059 */
060@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
061public final class AsyncAppender extends AbstractAppender {
062
063    private static final int DEFAULT_QUEUE_SIZE = 1024;
064    private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
065    };
066
067    private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
068
069    private final BlockingQueue<LogEvent> queue;
070    private final int queueSize;
071    private final boolean blocking;
072    private final long shutdownTimeout;
073    private final Configuration config;
074    private final AppenderRef[] appenderRefs;
075    private final String errorRef;
076    private final boolean includeLocation;
077    private AppenderControl errorAppender;
078    private AsyncThread thread;
079    private AsyncQueueFullPolicy asyncQueueFullPolicy;
080
081    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
082                          final String errorRef, final int queueSize, final boolean blocking,
083                          final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
084                          final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
085        super(name, filter, null, ignoreExceptions);
086        this.queue = blockingQueueFactory.create(queueSize);
087        this.queueSize = queueSize;
088        this.blocking = blocking;
089        this.shutdownTimeout = shutdownTimeout;
090        this.config = config;
091        this.appenderRefs = appenderRefs;
092        this.errorRef = errorRef;
093        this.includeLocation = includeLocation;
094    }
095
096    @Override
097    public void start() {
098        final Map<String, Appender> map = config.getAppenders();
099        final List<AppenderControl> appenders = new ArrayList<>();
100        for (final AppenderRef appenderRef : appenderRefs) {
101            final Appender appender = map.get(appenderRef.getRef());
102            if (appender != null) {
103                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
104            } else {
105                LOGGER.error("No appender named {} was configured", appenderRef);
106            }
107        }
108        if (errorRef != null) {
109            final Appender appender = map.get(errorRef);
110            if (appender != null) {
111                errorAppender = new AppenderControl(appender, null, null);
112            } else {
113                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
114            }
115        }
116        if (appenders.size() > 0) {
117            thread = new AsyncThread(appenders, queue);
118            thread.setName("AsyncAppender-" + getName());
119        } else if (errorRef == null) {
120            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
121        }
122        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
123
124        thread.start();
125        super.start();
126    }
127
128    @Override
129    public boolean stop(final long timeout, final TimeUnit timeUnit) {
130        setStopping();
131        super.stop(timeout, timeUnit, false);
132        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
133        thread.shutdown();
134        try {
135            thread.join(shutdownTimeout);
136        } catch (final InterruptedException ex) {
137            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
138        }
139        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
140
141        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
142            LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
143                DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
144        }
145        setStopped();
146        return true;
147    }
148
149    /**
150     * Actual writing occurs here.
151     *
152     * @param logEvent The LogEvent.
153     */
154    @Override
155    public void append(final LogEvent logEvent) {
156        if (!isStarted()) {
157            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
158        }
159        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
160        InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
161        if (!transfer(memento)) {
162            if (blocking) {
163                if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
164                    // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
165                    AsyncQueueFullMessageUtil.logWarningToStatusLogger();
166                    logMessageInCurrentThread(logEvent);
167                } else {
168                    // delegate to the event router (which may discard, enqueue and block, or log in current thread)
169                    final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
170                    route.logMessage(this, memento);
171                }
172            } else {
173                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
174                logToErrorAppenderIfNecessary(false, memento);
175            }
176        }
177    }
178
179    private boolean transfer(final LogEvent memento) {
180        return queue instanceof TransferQueue
181            ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
182            : queue.offer(memento);
183    }
184
185    /**
186     * FOR INTERNAL USE ONLY.
187     *
188     * @param logEvent the event to log
189     */
190    public void logMessageInCurrentThread(final LogEvent logEvent) {
191        logEvent.setEndOfBatch(queue.isEmpty());
192        final boolean appendSuccessful = thread.callAppenders(logEvent);
193        logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
194    }
195
196    /**
197     * FOR INTERNAL USE ONLY.
198     *
199     * @param logEvent the event to log
200     */
201    public void logMessageInBackgroundThread(final LogEvent logEvent) {
202        try {
203            // wait for free slots in the queue
204            queue.put(logEvent);
205        } catch (final InterruptedException e) {
206            final boolean appendSuccessful = handleInterruptedException(logEvent);
207            logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
208        }
209    }
210
211    // LOG4J2-1049: Some applications use Thread.interrupt() to send
212    // messages between application threads. This does not necessarily
213    // mean that the queue is full. To prevent dropping a log message,
214    // quickly try to offer the event to the queue again.
215    // (Yes, this means there is a possibility the same event is logged twice.)
216    //
217    // Finally, catching the InterruptedException means the
218    // interrupted flag has been cleared on the current thread.
219    // This may interfere with the application's expectation of
220    // being interrupted, so when we are done, we set the interrupted
221    // flag again.
222    private boolean handleInterruptedException(final LogEvent memento) {
223        final boolean appendSuccessful = queue.offer(memento);
224        if (!appendSuccessful) {
225            LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
226                getName());
227        }
228        // set the interrupted flag again.
229        Thread.currentThread().interrupt();
230        return appendSuccessful;
231    }
232
233    private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
234        if (!appendSuccessful && errorAppender != null) {
235            errorAppender.callAppender(logEvent);
236        }
237    }
238
239    /**
240     * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
241     * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
242     * pre-2.7.
243     *
244     * @param appenderRefs     The Appenders to reference.
245     * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
246     * @param blocking         True if the Appender should wait when the queue is full. The default is true.
247     * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
248     *                         in the queue on shutdown. The default is zero which means to wait forever.
249     * @param size             The size of the event queue. The default is 128.
250     * @param name             The name of the Appender.
251     * @param includeLocation  whether to include location information. The default is false.
252     * @param filter           The Filter or null.
253     * @param config           The Configuration.
254     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
255     *                         otherwise they are propagated to the caller.
256     * @return The AsyncAppender.
257     * @deprecated use {@link Builder} instead
258     */
259    @Deprecated
260    public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
261                                               final boolean blocking, final long shutdownTimeout, final int size,
262                                               final String name, final boolean includeLocation, final Filter filter,
263                                               final Configuration config, final boolean ignoreExceptions) {
264        if (name == null) {
265            LOGGER.error("No name provided for AsyncAppender");
266            return null;
267        }
268        if (appenderRefs == null) {
269            LOGGER.error("No appender references provided to AsyncAppender {}", name);
270        }
271
272        return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
273            shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
274    }
275
276    @PluginBuilderFactory
277    public static Builder newBuilder() {
278        return new Builder();
279    }
280
281    public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
282
283        @PluginElement("AppenderRef")
284        @Required(message = "No appender references provided to AsyncAppender")
285        private AppenderRef[] appenderRefs;
286
287        @PluginBuilderAttribute
288        @PluginAliases("error-ref")
289        private String errorRef;
290
291        @PluginBuilderAttribute
292        private boolean blocking = true;
293
294        @PluginBuilderAttribute
295        private long shutdownTimeout = 0L;
296
297        @PluginBuilderAttribute
298        private int bufferSize = DEFAULT_QUEUE_SIZE;
299
300        @PluginBuilderAttribute
301        @Required(message = "No name provided for AsyncAppender")
302        private String name;
303
304        @PluginBuilderAttribute
305        private boolean includeLocation = false;
306
307        @PluginElement("Filter")
308        private Filter filter;
309
310        @PluginConfiguration
311        private Configuration configuration;
312
313        @PluginBuilderAttribute
314        private boolean ignoreExceptions = true;
315
316        @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
317        private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
318
319        public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
320            this.appenderRefs = appenderRefs;
321            return this;
322        }
323
324        public Builder setErrorRef(final String errorRef) {
325            this.errorRef = errorRef;
326            return this;
327        }
328
329        public Builder setBlocking(final boolean blocking) {
330            this.blocking = blocking;
331            return this;
332        }
333
334        public Builder setShutdownTimeout(final long shutdownTimeout) {
335            this.shutdownTimeout = shutdownTimeout;
336            return this;
337        }
338
339        public Builder setBufferSize(final int bufferSize) {
340            this.bufferSize = bufferSize;
341            return this;
342        }
343
344        public Builder setName(final String name) {
345            this.name = name;
346            return this;
347        }
348
349        public Builder setIncludeLocation(final boolean includeLocation) {
350            this.includeLocation = includeLocation;
351            return this;
352        }
353
354        public Builder setFilter(final Filter filter) {
355            this.filter = filter;
356            return this;
357        }
358
359        public Builder setConfiguration(final Configuration configuration) {
360            this.configuration = configuration;
361            return this;
362        }
363
364        public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
365            this.ignoreExceptions = ignoreExceptions;
366            return this;
367        }
368
369        public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
370            this.blockingQueueFactory = blockingQueueFactory;
371            return this;
372        }
373
374        @Override
375        public AsyncAppender build() {
376            return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
377                shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
378        }
379    }
380
381    /**
382     * Thread that calls the Appenders.
383     */
384    private class AsyncThread extends Log4jThread {
385
386        private volatile boolean shutdown = false;
387        private final List<AppenderControl> appenders;
388        private final BlockingQueue<LogEvent> queue;
389
390        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
391            super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
392            this.appenders = appenders;
393            this.queue = queue;
394            setDaemon(true);
395        }
396
397        @Override
398        public void run() {
399            while (!shutdown) {
400                LogEvent event;
401                try {
402                    event = queue.take();
403                    if (event == SHUTDOWN_LOG_EVENT) {
404                        shutdown = true;
405                        continue;
406                    }
407                } catch (final InterruptedException ex) {
408                    break; // LOG4J2-830
409                }
410                event.setEndOfBatch(queue.isEmpty());
411                final boolean success = callAppenders(event);
412                if (!success && errorAppender != null) {
413                    try {
414                        errorAppender.callAppender(event);
415                    } catch (final Exception ex) {
416                        // Silently accept the error.
417                    }
418                }
419            }
420            // Process any remaining items in the queue.
421            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
422                queue.size());
423            int count = 0;
424            int ignored = 0;
425            while (!queue.isEmpty()) {
426                try {
427                    final LogEvent event = queue.take();
428                    if (event instanceof Log4jLogEvent) {
429                        final Log4jLogEvent logEvent = (Log4jLogEvent) event;
430                        logEvent.setEndOfBatch(queue.isEmpty());
431                        callAppenders(logEvent);
432                        count++;
433                    } else {
434                        ignored++;
435                        LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
436                    }
437                } catch (final InterruptedException ex) {
438                    // May have been interrupted to shut down.
439                    // Here we ignore interrupts and try to process all remaining events.
440                }
441            }
442            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
443                + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
444        }
445
446        /**
447         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
448         * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
449         * exceptions are silently ignored.
450         *
451         * @param event the event to forward to the registered appenders
452         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
453         */
454        boolean callAppenders(final LogEvent event) {
455            boolean success = false;
456            for (final AppenderControl control : appenders) {
457                try {
458                    control.callAppender(event);
459                    success = true;
460                } catch (final Exception ex) {
461                    // If no appender is successful the error appender will get it.
462                }
463            }
464            return success;
465        }
466
467        public void shutdown() {
468            shutdown = true;
469            if (queue.isEmpty()) {
470                queue.offer(SHUTDOWN_LOG_EVENT);
471            }
472            if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
473                this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
474            }
475        }
476    }
477
478    /**
479     * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
480     *
481     * @return the names of the sink appenders
482     */
483    public String[] getAppenderRefStrings() {
484        final String[] result = new String[appenderRefs.length];
485        for (int i = 0; i < result.length; i++) {
486            result[i] = appenderRefs[i].getRef();
487        }
488        return result;
489    }
490
491    /**
492     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
493     * the class and method where the logging call was made.
494     *
495     * @return {@code true} if location is included with every event, {@code false} otherwise
496     */
497    public boolean isIncludeLocation() {
498        return includeLocation;
499    }
500
501    /**
502     * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
503     * dropped when the queue is full.
504     *
505     * @return whether this AsyncAppender will block or drop events when the queue is full.
506     */
507    public boolean isBlocking() {
508        return blocking;
509    }
510
511    /**
512     * Returns the name of the appender that any errors are logged to or {@code null}.
513     *
514     * @return the name of the appender that any errors are logged to or {@code null}
515     */
516    public String getErrorRef() {
517        return errorRef;
518    }
519
520    public int getQueueCapacity() {
521        return queueSize;
522    }
523
524    public int getQueueRemainingCapacity() {
525        return queue.remainingCapacity();
526    }
527
528    /**
529     * Returns the number of elements in the queue.
530     * 
531     * @return the number of elements in the queue. 
532     * @since 2.11.1
533     */
534    public int getQueueSize() {
535        return queue.size();
536    }
537}