001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TransferQueue;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.logging.log4j.core.AbstractLogEvent;
028import org.apache.logging.log4j.core.Appender;
029import org.apache.logging.log4j.core.Core;
030import org.apache.logging.log4j.core.Filter;
031import org.apache.logging.log4j.core.LogEvent;
032import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
033import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
034import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
035import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
036import org.apache.logging.log4j.core.async.BlockingQueueFactory;
037import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
038import org.apache.logging.log4j.core.async.EventRoute;
039import org.apache.logging.log4j.core.async.InternalAsyncUtil;
040import org.apache.logging.log4j.core.config.AppenderControl;
041import org.apache.logging.log4j.core.config.AppenderRef;
042import org.apache.logging.log4j.core.config.Configuration;
043import org.apache.logging.log4j.core.config.ConfigurationException;
044import org.apache.logging.log4j.core.config.Property;
045import org.apache.logging.log4j.core.config.plugins.Plugin;
046import org.apache.logging.log4j.core.config.plugins.PluginAliases;
047import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
048import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
049import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
050import org.apache.logging.log4j.core.config.plugins.PluginElement;
051import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
052import org.apache.logging.log4j.core.filter.AbstractFilterable;
053import org.apache.logging.log4j.core.impl.Log4jLogEvent;
054import org.apache.logging.log4j.core.util.Log4jThread;
055import org.apache.logging.log4j.spi.AbstractLogger;
056
057/**
058 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
059 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
060 * references.
061 */
062@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
063public final class AsyncAppender extends AbstractAppender {
064
065    private static final int DEFAULT_QUEUE_SIZE = 1024;
066    private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
067        private static final long serialVersionUID = -1761035149477086330L;
068    };
069
070    private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
071
072    private final BlockingQueue<LogEvent> queue;
073    private final int queueSize;
074    private final boolean blocking;
075    private final long shutdownTimeout;
076    private final Configuration config;
077    private final AppenderRef[] appenderRefs;
078    private final String errorRef;
079    private final boolean includeLocation;
080    private AppenderControl errorAppender;
081    private AsyncThread thread;
082    private AsyncQueueFullPolicy asyncQueueFullPolicy;
083
084    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
085            final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
086            final long shutdownTimeout, final Configuration config, final boolean includeLocation,
087            final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) {
088        super(name, filter, null, ignoreExceptions, properties);
089        this.queue = blockingQueueFactory.create(queueSize);
090        this.queueSize = queueSize;
091        this.blocking = blocking;
092        this.shutdownTimeout = shutdownTimeout;
093        this.config = config;
094        this.appenderRefs = appenderRefs;
095        this.errorRef = errorRef;
096        this.includeLocation = includeLocation;
097    }
098
099    @Override
100    public void start() {
101        final Map<String, Appender> map = config.getAppenders();
102        final List<AppenderControl> appenders = new ArrayList<>();
103        for (final AppenderRef appenderRef : appenderRefs) {
104            final Appender appender = map.get(appenderRef.getRef());
105            if (appender != null) {
106                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
107            } else {
108                LOGGER.error("No appender named {} was configured", appenderRef);
109            }
110        }
111        if (errorRef != null) {
112            final Appender appender = map.get(errorRef);
113            if (appender != null) {
114                errorAppender = new AppenderControl(appender, null, null);
115            } else {
116                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
117            }
118        }
119        if (appenders.size() > 0) {
120            thread = new AsyncThread(appenders, queue);
121            thread.setName("AsyncAppender-" + getName());
122        } else if (errorRef == null) {
123            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
124        }
125        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
126
127        thread.start();
128        super.start();
129    }
130
131    @Override
132    public boolean stop(final long timeout, final TimeUnit timeUnit) {
133        setStopping();
134        super.stop(timeout, timeUnit, false);
135        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
136        thread.shutdown();
137        try {
138            thread.join(shutdownTimeout);
139        } catch (final InterruptedException ex) {
140            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
141        }
142        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
143
144        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
145            LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
146                DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
147        }
148        setStopped();
149        return true;
150    }
151
152    /**
153     * Actual writing occurs here.
154     *
155     * @param logEvent The LogEvent.
156     */
157    @Override
158    public void append(final LogEvent logEvent) {
159        if (!isStarted()) {
160            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
161        }
162        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
163        InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
164        if (!transfer(memento)) {
165            if (blocking) {
166                if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
167                    // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
168                    AsyncQueueFullMessageUtil.logWarningToStatusLogger();
169                    logMessageInCurrentThread(logEvent);
170                } else {
171                    // delegate to the event router (which may discard, enqueue and block, or log in current thread)
172                    final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
173                    route.logMessage(this, memento);
174                }
175            } else {
176                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
177                logToErrorAppenderIfNecessary(false, memento);
178            }
179        }
180    }
181
182    private boolean transfer(final LogEvent memento) {
183        return queue instanceof TransferQueue
184            ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
185            : queue.offer(memento);
186    }
187
188    /**
189     * FOR INTERNAL USE ONLY.
190     *
191     * @param logEvent the event to log
192     */
193    public void logMessageInCurrentThread(final LogEvent logEvent) {
194        logEvent.setEndOfBatch(queue.isEmpty());
195        final boolean appendSuccessful = thread.callAppenders(logEvent);
196        logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
197    }
198
199    /**
200     * FOR INTERNAL USE ONLY.
201     *
202     * @param logEvent the event to log
203     */
204    public void logMessageInBackgroundThread(final LogEvent logEvent) {
205        try {
206            // wait for free slots in the queue
207            queue.put(logEvent);
208        } catch (final InterruptedException e) {
209            final boolean appendSuccessful = handleInterruptedException(logEvent);
210            logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
211        }
212    }
213
214    // LOG4J2-1049: Some applications use Thread.interrupt() to send
215    // messages between application threads. This does not necessarily
216    // mean that the queue is full. To prevent dropping a log message,
217    // quickly try to offer the event to the queue again.
218    // (Yes, this means there is a possibility the same event is logged twice.)
219    //
220    // Finally, catching the InterruptedException means the
221    // interrupted flag has been cleared on the current thread.
222    // This may interfere with the application's expectation of
223    // being interrupted, so when we are done, we set the interrupted
224    // flag again.
225    private boolean handleInterruptedException(final LogEvent memento) {
226        final boolean appendSuccessful = queue.offer(memento);
227        if (!appendSuccessful) {
228            LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
229                getName());
230        }
231        // set the interrupted flag again.
232        Thread.currentThread().interrupt();
233        return appendSuccessful;
234    }
235
236    private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
237        if (!appendSuccessful && errorAppender != null) {
238            errorAppender.callAppender(logEvent);
239        }
240    }
241
242    /**
243     * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
244     * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
245     * pre-2.7.
246     *
247     * @param appenderRefs     The Appenders to reference.
248     * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
249     * @param blocking         True if the Appender should wait when the queue is full. The default is true.
250     * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
251     *                         in the queue on shutdown. The default is zero which means to wait forever.
252     * @param size             The size of the event queue. The default is 128.
253     * @param name             The name of the Appender.
254     * @param includeLocation  whether to include location information. The default is false.
255     * @param filter           The Filter or null.
256     * @param config           The Configuration.
257     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
258     *                         otherwise they are propagated to the caller.
259     * @return The AsyncAppender.
260     * @deprecated use {@link Builder} instead
261     */
262    @Deprecated
263    public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
264                                               final boolean blocking, final long shutdownTimeout, final int size,
265                                               final String name, final boolean includeLocation, final Filter filter,
266                                               final Configuration config, final boolean ignoreExceptions) {
267        if (name == null) {
268            LOGGER.error("No name provided for AsyncAppender");
269            return null;
270        }
271        if (appenderRefs == null) {
272            LOGGER.error("No appender references provided to AsyncAppender {}", name);
273        }
274
275        return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
276            shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null);
277    }
278
279    @PluginBuilderFactory
280    public static Builder newBuilder() {
281        return new Builder();
282    }
283
284    public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B>
285            implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
286
287        @PluginElement("AppenderRef")
288        @Required(message = "No appender references provided to AsyncAppender")
289        private AppenderRef[] appenderRefs;
290
291        @PluginBuilderAttribute
292        @PluginAliases("error-ref")
293        private String errorRef;
294
295        @PluginBuilderAttribute
296        private boolean blocking = true;
297
298        @PluginBuilderAttribute
299        private long shutdownTimeout = 0L;
300
301        @PluginBuilderAttribute
302        private int bufferSize = DEFAULT_QUEUE_SIZE;
303
304        @PluginBuilderAttribute
305        @Required(message = "No name provided for AsyncAppender")
306        private String name;
307
308        @PluginBuilderAttribute
309        private boolean includeLocation = false;
310
311        @PluginConfiguration
312        private Configuration configuration;
313
314        @PluginBuilderAttribute
315        private boolean ignoreExceptions = true;
316
317        @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
318        private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
319
320        public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
321            this.appenderRefs = appenderRefs;
322            return this;
323        }
324
325        public Builder setErrorRef(final String errorRef) {
326            this.errorRef = errorRef;
327            return this;
328        }
329
330        public Builder setBlocking(final boolean blocking) {
331            this.blocking = blocking;
332            return this;
333        }
334
335        public Builder setShutdownTimeout(final long shutdownTimeout) {
336            this.shutdownTimeout = shutdownTimeout;
337            return this;
338        }
339
340        public Builder setBufferSize(final int bufferSize) {
341            this.bufferSize = bufferSize;
342            return this;
343        }
344
345        public Builder setName(final String name) {
346            this.name = name;
347            return this;
348        }
349
350        public Builder setIncludeLocation(final boolean includeLocation) {
351            this.includeLocation = includeLocation;
352            return this;
353        }
354
355        public Builder setConfiguration(final Configuration configuration) {
356            this.configuration = configuration;
357            return this;
358        }
359
360        public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
361            this.ignoreExceptions = ignoreExceptions;
362            return this;
363        }
364
365        public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
366            this.blockingQueueFactory = blockingQueueFactory;
367            return this;
368        }
369
370        @Override
371        public AsyncAppender build() {
372            return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
373                shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray());
374        }
375    }
376
377    /**
378     * Thread that calls the Appenders.
379     */
380    private class AsyncThread extends Log4jThread {
381
382        private volatile boolean shutdown = false;
383        private final List<AppenderControl> appenders;
384        private final BlockingQueue<LogEvent> queue;
385
386        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
387            super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
388            this.appenders = appenders;
389            this.queue = queue;
390            setDaemon(true);
391        }
392
393        @Override
394        public void run() {
395            while (!shutdown) {
396                LogEvent event;
397                try {
398                    event = queue.take();
399                    if (event == SHUTDOWN_LOG_EVENT) {
400                        shutdown = true;
401                        continue;
402                    }
403                } catch (final InterruptedException ex) {
404                    break; // LOG4J2-830
405                }
406                event.setEndOfBatch(queue.isEmpty());
407                final boolean success = callAppenders(event);
408                if (!success && errorAppender != null) {
409                    try {
410                        errorAppender.callAppender(event);
411                    } catch (final Exception ex) {
412                        // Silently accept the error.
413                    }
414                }
415            }
416            // Process any remaining items in the queue.
417            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
418                queue.size());
419            int count = 0;
420            int ignored = 0;
421            while (!queue.isEmpty()) {
422                try {
423                    final LogEvent event = queue.take();
424                    if (event instanceof Log4jLogEvent) {
425                        final Log4jLogEvent logEvent = (Log4jLogEvent) event;
426                        logEvent.setEndOfBatch(queue.isEmpty());
427                        callAppenders(logEvent);
428                        count++;
429                    } else {
430                        ignored++;
431                        LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
432                    }
433                } catch (final InterruptedException ex) {
434                    // May have been interrupted to shut down.
435                    // Here we ignore interrupts and try to process all remaining events.
436                }
437            }
438            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
439                + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
440        }
441
442        /**
443         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
444         * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
445         * exceptions are silently ignored.
446         *
447         * @param event the event to forward to the registered appenders
448         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
449         */
450        boolean callAppenders(final LogEvent event) {
451            boolean success = false;
452            for (final AppenderControl control : appenders) {
453                try {
454                    control.callAppender(event);
455                    success = true;
456                } catch (final Exception ex) {
457                    // If no appender is successful the error appender will get it.
458                }
459            }
460            return success;
461        }
462
463        public void shutdown() {
464            shutdown = true;
465            if (queue.isEmpty()) {
466                queue.offer(SHUTDOWN_LOG_EVENT);
467            }
468            if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
469                this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
470            }
471        }
472    }
473
474    /**
475     * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
476     *
477     * @return the names of the sink appenders
478     */
479    public String[] getAppenderRefStrings() {
480        final String[] result = new String[appenderRefs.length];
481        for (int i = 0; i < result.length; i++) {
482            result[i] = appenderRefs[i].getRef();
483        }
484        return result;
485    }
486
487    /**
488     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
489     * the class and method where the logging call was made.
490     *
491     * @return {@code true} if location is included with every event, {@code false} otherwise
492     */
493    public boolean isIncludeLocation() {
494        return includeLocation;
495    }
496
497    /**
498     * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
499     * dropped when the queue is full.
500     *
501     * @return whether this AsyncAppender will block or drop events when the queue is full.
502     */
503    public boolean isBlocking() {
504        return blocking;
505    }
506
507    /**
508     * Returns the name of the appender that any errors are logged to or {@code null}.
509     *
510     * @return the name of the appender that any errors are logged to or {@code null}
511     */
512    public String getErrorRef() {
513        return errorRef;
514    }
515
516    public int getQueueCapacity() {
517        return queueSize;
518    }
519
520    public int getQueueRemainingCapacity() {
521        return queue.remainingCapacity();
522    }
523
524    /**
525     * Returns the number of elements in the queue.
526     *
527     * @return the number of elements in the queue.
528     * @since 2.11.1
529     */
530    public int getQueueSize() {
531        return queue.size();
532    }
533}