001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TransferQueue;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.logging.log4j.core.AbstractLogEvent;
028import org.apache.logging.log4j.core.Appender;
029import org.apache.logging.log4j.core.Filter;
030import org.apache.logging.log4j.core.LogEvent;
031import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
032import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
033import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
034import org.apache.logging.log4j.core.async.BlockingQueueFactory;
035import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
036import org.apache.logging.log4j.core.async.EventRoute;
037import org.apache.logging.log4j.core.config.AppenderControl;
038import org.apache.logging.log4j.core.config.AppenderRef;
039import org.apache.logging.log4j.core.config.Configuration;
040import org.apache.logging.log4j.core.config.ConfigurationException;
041import org.apache.logging.log4j.core.config.plugins.Plugin;
042import org.apache.logging.log4j.core.config.plugins.PluginAliases;
043import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
044import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
045import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
046import org.apache.logging.log4j.core.config.plugins.PluginElement;
047import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
048import org.apache.logging.log4j.core.impl.Log4jLogEvent;
049import org.apache.logging.log4j.core.util.Constants;
050import org.apache.logging.log4j.core.util.Log4jThread;
051
052/**
053 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
054 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
055 * references.
056 */
057@Plugin(name = "Async", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
058public final class AsyncAppender extends AbstractAppender {
059
060    private static final int DEFAULT_QUEUE_SIZE = 128;
061    private static final LogEvent SHUTDOWN = new AbstractLogEvent() {
062    };
063
064    private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
065
066    private final BlockingQueue<LogEvent> queue;
067    private final int queueSize;
068    private final boolean blocking;
069    private final long shutdownTimeout;
070    private final Configuration config;
071    private final AppenderRef[] appenderRefs;
072    private final String errorRef;
073    private final boolean includeLocation;
074    private AppenderControl errorAppender;
075    private AsyncThread thread;
076    private AsyncQueueFullPolicy asyncQueueFullPolicy;
077
078    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
079                          final String errorRef, final int queueSize, final boolean blocking,
080                          final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
081                          final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
082        super(name, filter, null, ignoreExceptions);
083        this.queue = blockingQueueFactory.create(queueSize);
084        this.queueSize = queueSize;
085        this.blocking = blocking;
086        this.shutdownTimeout = shutdownTimeout;
087        this.config = config;
088        this.appenderRefs = appenderRefs;
089        this.errorRef = errorRef;
090        this.includeLocation = includeLocation;
091    }
092
093    @Override
094    public void start() {
095        final Map<String, Appender> map = config.getAppenders();
096        final List<AppenderControl> appenders = new ArrayList<>();
097        for (final AppenderRef appenderRef : appenderRefs) {
098            final Appender appender = map.get(appenderRef.getRef());
099            if (appender != null) {
100                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
101            } else {
102                LOGGER.error("No appender named {} was configured", appenderRef);
103            }
104        }
105        if (errorRef != null) {
106            final Appender appender = map.get(errorRef);
107            if (appender != null) {
108                errorAppender = new AppenderControl(appender, null, null);
109            } else {
110                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
111            }
112        }
113        if (appenders.size() > 0) {
114            thread = new AsyncThread(appenders, queue);
115            thread.setName("AsyncAppender-" + getName());
116        } else if (errorRef == null) {
117            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
118        }
119        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
120
121        thread.start();
122        super.start();
123    }
124
125    @Override
126    public boolean stop(final long timeout, final TimeUnit timeUnit) {
127        setStopping();
128        super.stop(timeout, timeUnit, false);
129        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
130        thread.shutdown();
131        try {
132            thread.join(shutdownTimeout);
133        } catch (final InterruptedException ex) {
134            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
135        }
136        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
137
138        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
139            LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
140                DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
141        }
142        setStopped();
143        return true;
144    }
145
146    /**
147     * Actual writing occurs here.
148     *
149     * @param logEvent The LogEvent.
150     */
151    @Override
152    public void append(final LogEvent logEvent) {
153        if (!isStarted()) {
154            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
155        }
156        if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
157            logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
158        }
159        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
160        if (!transfer(memento)) {
161            if (blocking) {
162                // delegate to the event router (which may discard, enqueue and block, or log in current thread)
163                final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
164                route.logMessage(this, memento);
165            } else {
166                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
167                logToErrorAppenderIfNecessary(false, memento);
168            }
169        }
170    }
171
172    private boolean transfer(final LogEvent memento) {
173        return queue instanceof TransferQueue
174            ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
175            : queue.offer(memento);
176    }
177
178    /**
179     * FOR INTERNAL USE ONLY.
180     *
181     * @param logEvent the event to log
182     */
183    public void logMessageInCurrentThread(final LogEvent logEvent) {
184        logEvent.setEndOfBatch(queue.isEmpty());
185        final boolean appendSuccessful = thread.callAppenders(logEvent);
186        logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
187    }
188
189    /**
190     * FOR INTERNAL USE ONLY.
191     *
192     * @param logEvent the event to log
193     */
194    public void logMessageInBackgroundThread(final LogEvent logEvent) {
195        try {
196            // wait for free slots in the queue
197            queue.put(logEvent);
198        } catch (final InterruptedException e) {
199            final boolean appendSuccessful = handleInterruptedException(logEvent);
200            logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
201        }
202    }
203
204    // LOG4J2-1049: Some applications use Thread.interrupt() to send
205    // messages between application threads. This does not necessarily
206    // mean that the queue is full. To prevent dropping a log message,
207    // quickly try to offer the event to the queue again.
208    // (Yes, this means there is a possibility the same event is logged twice.)
209    //
210    // Finally, catching the InterruptedException means the
211    // interrupted flag has been cleared on the current thread.
212    // This may interfere with the application's expectation of
213    // being interrupted, so when we are done, we set the interrupted
214    // flag again.
215    private boolean handleInterruptedException(final LogEvent memento) {
216        final boolean appendSuccessful = queue.offer(memento);
217        if (!appendSuccessful) {
218            LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
219                getName());
220        }
221        // set the interrupted flag again.
222        Thread.currentThread().interrupt();
223        return appendSuccessful;
224    }
225
226    private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
227        if (!appendSuccessful && errorAppender != null) {
228            errorAppender.callAppender(logEvent);
229        }
230    }
231
232    /**
233     * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
234     * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
235     * pre-2.7.
236     *
237     * @param appenderRefs     The Appenders to reference.
238     * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
239     * @param blocking         True if the Appender should wait when the queue is full. The default is true.
240     * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
241     *                         in the queue on shutdown. The default is zero which means to wait forever.
242     * @param size             The size of the event queue. The default is 128.
243     * @param name             The name of the Appender.
244     * @param includeLocation  whether to include location information. The default is false.
245     * @param filter           The Filter or null.
246     * @param config           The Configuration.
247     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
248     *                         otherwise they are propagated to the caller.
249     * @return The AsyncAppender.
250     * @deprecated use {@link Builder} instead
251     */
252    @Deprecated
253    public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
254                                               final boolean blocking, final long shutdownTimeout, final int size,
255                                               final String name, final boolean includeLocation, final Filter filter,
256                                               final Configuration config, final boolean ignoreExceptions) {
257        if (name == null) {
258            LOGGER.error("No name provided for AsyncAppender");
259            return null;
260        }
261        if (appenderRefs == null) {
262            LOGGER.error("No appender references provided to AsyncAppender {}", name);
263        }
264
265        return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
266            shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
267    }
268
269    @PluginBuilderFactory
270    public static Builder newBuilder() {
271        return new Builder();
272    }
273
274    public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
275
276        @PluginElement("AppenderRef")
277        @Required(message = "No appender references provided to AsyncAppender")
278        private AppenderRef[] appenderRefs;
279
280        @PluginBuilderAttribute
281        @PluginAliases("error-ref")
282        private String errorRef;
283
284        @PluginBuilderAttribute
285        private boolean blocking = true;
286
287        @PluginBuilderAttribute
288        private long shutdownTimeout = 0L;
289
290        @PluginBuilderAttribute
291        private int bufferSize = DEFAULT_QUEUE_SIZE;
292
293        @PluginBuilderAttribute
294        @Required(message = "No name provided for AsyncAppender")
295        private String name;
296
297        @PluginBuilderAttribute
298        private boolean includeLocation = false;
299
300        @PluginElement("Filter")
301        private Filter filter;
302
303        @PluginConfiguration
304        private Configuration configuration;
305
306        @PluginBuilderAttribute
307        private boolean ignoreExceptions = true;
308
309        @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
310        private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
311
312        public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
313            this.appenderRefs = appenderRefs;
314            return this;
315        }
316
317        public Builder setErrorRef(final String errorRef) {
318            this.errorRef = errorRef;
319            return this;
320        }
321
322        public Builder setBlocking(final boolean blocking) {
323            this.blocking = blocking;
324            return this;
325        }
326
327        public Builder setShutdownTimeout(final long shutdownTimeout) {
328            this.shutdownTimeout = shutdownTimeout;
329            return this;
330        }
331
332        public Builder setBufferSize(final int bufferSize) {
333            this.bufferSize = bufferSize;
334            return this;
335        }
336
337        public Builder setName(final String name) {
338            this.name = name;
339            return this;
340        }
341
342        public Builder setIncludeLocation(final boolean includeLocation) {
343            this.includeLocation = includeLocation;
344            return this;
345        }
346
347        public Builder setFilter(final Filter filter) {
348            this.filter = filter;
349            return this;
350        }
351
352        public Builder setConfiguration(final Configuration configuration) {
353            this.configuration = configuration;
354            return this;
355        }
356
357        public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
358            this.ignoreExceptions = ignoreExceptions;
359            return this;
360        }
361
362        public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
363            this.blockingQueueFactory = blockingQueueFactory;
364            return this;
365        }
366
367        @Override
368        public AsyncAppender build() {
369            return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
370                shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
371        }
372    }
373
374    /**
375     * Thread that calls the Appenders.
376     */
377    private class AsyncThread extends Log4jThread {
378
379        private volatile boolean shutdown = false;
380        private final List<AppenderControl> appenders;
381        private final BlockingQueue<LogEvent> queue;
382
383        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
384            super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
385            this.appenders = appenders;
386            this.queue = queue;
387            setDaemon(true);
388        }
389
390        @Override
391        public void run() {
392            while (!shutdown) {
393                LogEvent event;
394                try {
395                    event = queue.take();
396                    if (event == SHUTDOWN) {
397                        shutdown = true;
398                        continue;
399                    }
400                } catch (final InterruptedException ex) {
401                    break; // LOG4J2-830
402                }
403                event.setEndOfBatch(queue.isEmpty());
404                final boolean success = callAppenders(event);
405                if (!success && errorAppender != null) {
406                    try {
407                        errorAppender.callAppender(event);
408                    } catch (final Exception ex) {
409                        // Silently accept the error.
410                    }
411                }
412            }
413            // Process any remaining items in the queue.
414            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
415                queue.size());
416            int count = 0;
417            int ignored = 0;
418            while (!queue.isEmpty()) {
419                try {
420                    final LogEvent event = queue.take();
421                    if (event instanceof Log4jLogEvent) {
422                        final Log4jLogEvent logEvent = (Log4jLogEvent) event;
423                        logEvent.setEndOfBatch(queue.isEmpty());
424                        callAppenders(logEvent);
425                        count++;
426                    } else {
427                        ignored++;
428                        LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
429                    }
430                } catch (final InterruptedException ex) {
431                    // May have been interrupted to shut down.
432                    // Here we ignore interrupts and try to process all remaining events.
433                }
434            }
435            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
436                + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
437        }
438
439        /**
440         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
441         * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
442         * exceptions are silently ignored.
443         *
444         * @param event the event to forward to the registered appenders
445         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
446         */
447        boolean callAppenders(final LogEvent event) {
448            boolean success = false;
449            for (final AppenderControl control : appenders) {
450                try {
451                    control.callAppender(event);
452                    success = true;
453                } catch (final Exception ex) {
454                    // If no appender is successful the error appender will get it.
455                }
456            }
457            return success;
458        }
459
460        public void shutdown() {
461            shutdown = true;
462            if (queue.isEmpty()) {
463                queue.offer(SHUTDOWN);
464            }
465            if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
466                this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
467            }
468        }
469    }
470
471    /**
472     * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
473     *
474     * @return the names of the sink appenders
475     */
476    public String[] getAppenderRefStrings() {
477        final String[] result = new String[appenderRefs.length];
478        for (int i = 0; i < result.length; i++) {
479            result[i] = appenderRefs[i].getRef();
480        }
481        return result;
482    }
483
484    /**
485     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
486     * the class and method where the logging call was made.
487     *
488     * @return {@code true} if location is included with every event, {@code false} otherwise
489     */
490    public boolean isIncludeLocation() {
491        return includeLocation;
492    }
493
494    /**
495     * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
496     * dropped when the queue is full.
497     *
498     * @return whether this AsyncAppender will block or drop events when the queue is full.
499     */
500    public boolean isBlocking() {
501        return blocking;
502    }
503
504    /**
505     * Returns the name of the appender that any errors are logged to or {@code null}.
506     *
507     * @return the name of the appender that any errors are logged to or {@code null}
508     */
509    public String getErrorRef() {
510        return errorRef;
511    }
512
513    public int getQueueCapacity() {
514        return queueSize;
515    }
516
517    public int getQueueRemainingCapacity() {
518        return queue.remainingCapacity();
519    }
520}