001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import org.apache.logging.log4j.core.Appender;
020import org.apache.logging.log4j.core.Core;
021import org.apache.logging.log4j.core.Filter;
022import org.apache.logging.log4j.core.LogEvent;
023import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
024import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
025import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
026import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
027import org.apache.logging.log4j.core.async.BlockingQueueFactory;
028import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
029import org.apache.logging.log4j.core.async.EventRoute;
030import org.apache.logging.log4j.core.async.InternalAsyncUtil;
031import org.apache.logging.log4j.core.config.AppenderControl;
032import org.apache.logging.log4j.core.config.AppenderRef;
033import org.apache.logging.log4j.core.config.Configuration;
034import org.apache.logging.log4j.core.config.ConfigurationException;
035import org.apache.logging.log4j.core.config.Property;
036import org.apache.logging.log4j.core.config.plugins.Plugin;
037import org.apache.logging.log4j.core.config.plugins.PluginAliases;
038import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
039import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
040import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
041import org.apache.logging.log4j.core.config.plugins.PluginElement;
042import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
043import org.apache.logging.log4j.core.filter.AbstractFilterable;
044import org.apache.logging.log4j.core.impl.Log4jLogEvent;
045import org.apache.logging.log4j.spi.AbstractLogger;
046
047import java.util.ArrayList;
048import java.util.List;
049import java.util.Map;
050import java.util.concurrent.BlockingQueue;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.TransferQueue;
053import java.util.stream.Collectors;
054
055/**
056 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
057 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
058 * references.
059 */
060@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
061public final class AsyncAppender extends AbstractAppender {
062
063    private static final int DEFAULT_QUEUE_SIZE = 1024;
064
065    private final BlockingQueue<LogEvent> queue;
066    private final int queueSize;
067    private final boolean blocking;
068    private final long shutdownTimeout;
069    private final Configuration config;
070    private final AppenderRef[] appenderRefs;
071    private final String errorRef;
072    private final boolean includeLocation;
073    private AppenderControl errorAppender;
074    private AsyncAppenderEventDispatcher dispatcher;
075    private AsyncQueueFullPolicy asyncQueueFullPolicy;
076
077    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
078            final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
079            final long shutdownTimeout, final Configuration config, final boolean includeLocation,
080            final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) {
081        super(name, filter, null, ignoreExceptions, properties);
082        this.queue = blockingQueueFactory.create(queueSize);
083        this.queueSize = queueSize;
084        this.blocking = blocking;
085        this.shutdownTimeout = shutdownTimeout;
086        this.config = config;
087        this.appenderRefs = appenderRefs;
088        this.errorRef = errorRef;
089        this.includeLocation = includeLocation;
090    }
091
092    @Override
093    public void start() {
094        final Map<String, Appender> map = config.getAppenders();
095        final List<AppenderControl> appenders = new ArrayList<>();
096        for (final AppenderRef appenderRef : appenderRefs) {
097            final Appender appender = map.get(appenderRef.getRef());
098            if (appender != null) {
099                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
100            } else {
101                LOGGER.error("No appender named {} was configured", appenderRef);
102            }
103        }
104        if (errorRef != null) {
105            final Appender appender = map.get(errorRef);
106            if (appender != null) {
107                errorAppender = new AppenderControl(appender, null, null);
108            } else {
109                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
110            }
111        }
112        if (appenders.size() > 0) {
113            dispatcher = new AsyncAppenderEventDispatcher(
114                    getName(), errorAppender, appenders, queue);
115        } else if (errorRef == null) {
116            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
117        }
118        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
119
120        dispatcher.start();
121        super.start();
122    }
123
124    @Override
125    public boolean stop(final long timeout, final TimeUnit timeUnit) {
126        setStopping();
127        super.stop(timeout, timeUnit, false);
128        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
129        try {
130            dispatcher.stop(shutdownTimeout);
131        } catch (final InterruptedException ignored) {
132            // Restore the interrupted flag cleared when the exception is caught.
133            Thread.currentThread().interrupt();
134            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
135        }
136        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
137
138        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
139            LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
140                DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
141        }
142        setStopped();
143        return true;
144    }
145
146    /**
147     * Actual writing occurs here.
148     *
149     * @param logEvent The LogEvent.
150     */
151    @Override
152    public void append(final LogEvent logEvent) {
153        if (!isStarted()) {
154            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
155        }
156        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
157        InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
158        if (!transfer(memento)) {
159            if (blocking) {
160                if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
161                    // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
162                    AsyncQueueFullMessageUtil.logWarningToStatusLogger();
163                    logMessageInCurrentThread(logEvent);
164                } else {
165                    // delegate to the event router (which may discard, enqueue and block, or log in current thread)
166                    final EventRoute route = asyncQueueFullPolicy.getRoute(dispatcher.getId(), memento.getLevel());
167                    route.logMessage(this, memento);
168                }
169            } else {
170                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
171                logToErrorAppenderIfNecessary(false, memento);
172            }
173        }
174    }
175
176    private boolean transfer(final LogEvent memento) {
177        return queue instanceof TransferQueue
178            ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
179            : queue.offer(memento);
180    }
181
182    /**
183     * FOR INTERNAL USE ONLY.
184     *
185     * @param logEvent the event to log
186     */
187    public void logMessageInCurrentThread(final LogEvent logEvent) {
188        logEvent.setEndOfBatch(queue.isEmpty());
189        dispatcher.dispatch(logEvent);
190    }
191
192    /**
193     * FOR INTERNAL USE ONLY.
194     *
195     * @param logEvent the event to log
196     */
197    public void logMessageInBackgroundThread(final LogEvent logEvent) {
198        try {
199            // wait for free slots in the queue
200            queue.put(logEvent);
201        } catch (final InterruptedException ignored) {
202            final boolean appendSuccessful = handleInterruptedException(logEvent);
203            logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
204        }
205    }
206
207    // LOG4J2-1049: Some applications use Thread.interrupt() to send
208    // messages between application threads. This does not necessarily
209    // mean that the queue is full. To prevent dropping a log message,
210    // quickly try to offer the event to the queue again.
211    // (Yes, this means there is a possibility the same event is logged twice.)
212    //
213    // Finally, catching the InterruptedException means the
214    // interrupted flag has been cleared on the current thread.
215    // This may interfere with the application's expectation of
216    // being interrupted, so when we are done, we set the interrupted
217    // flag again.
218    private boolean handleInterruptedException(final LogEvent memento) {
219        final boolean appendSuccessful = queue.offer(memento);
220        if (!appendSuccessful) {
221            LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
222                getName());
223        }
224        // set the interrupted flag again.
225        Thread.currentThread().interrupt();
226        return appendSuccessful;
227    }
228
229    private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
230        if (!appendSuccessful && errorAppender != null) {
231            errorAppender.callAppender(logEvent);
232        }
233    }
234
235    /**
236     * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
237     * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
238     * pre-2.7.
239     *
240     * @param appenderRefs     The Appenders to reference.
241     * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
242     * @param blocking         True if the Appender should wait when the queue is full. The default is true.
243     * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
244     *                         in the queue on shutdown. The default is zero which means to wait forever.
245     * @param size             The size of the event queue. The default is 128.
246     * @param name             The name of the Appender.
247     * @param includeLocation  whether to include location information. The default is false.
248     * @param filter           The Filter or null.
249     * @param config           The Configuration.
250     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
251     *                         otherwise they are propagated to the caller.
252     * @return The AsyncAppender.
253     * @deprecated use {@link Builder} instead
254     */
255    @Deprecated
256    public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
257                                               final boolean blocking, final long shutdownTimeout, final int size,
258                                               final String name, final boolean includeLocation, final Filter filter,
259                                               final Configuration config, final boolean ignoreExceptions) {
260        if (name == null) {
261            LOGGER.error("No name provided for AsyncAppender");
262            return null;
263        }
264        if (appenderRefs == null) {
265            LOGGER.error("No appender references provided to AsyncAppender {}", name);
266        }
267
268        return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
269            shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null);
270    }
271
272    @PluginBuilderFactory
273    public static Builder newBuilder() {
274        return new Builder();
275    }
276
277    public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B>
278            implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
279
280        @PluginElement("AppenderRef")
281        @Required(message = "No appender references provided to AsyncAppender")
282        private AppenderRef[] appenderRefs;
283
284        @PluginBuilderAttribute
285        @PluginAliases("error-ref")
286        private String errorRef;
287
288        @PluginBuilderAttribute
289        private boolean blocking = true;
290
291        @PluginBuilderAttribute
292        private long shutdownTimeout = 0L;
293
294        @PluginBuilderAttribute
295        private int bufferSize = DEFAULT_QUEUE_SIZE;
296
297        @PluginBuilderAttribute
298        @Required(message = "No name provided for AsyncAppender")
299        private String name;
300
301        @PluginBuilderAttribute
302        private boolean includeLocation = false;
303
304        @PluginConfiguration
305        private Configuration configuration;
306
307        @PluginBuilderAttribute
308        private boolean ignoreExceptions = true;
309
310        @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
311        private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
312
313        public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
314            this.appenderRefs = appenderRefs;
315            return this;
316        }
317
318        public Builder setErrorRef(final String errorRef) {
319            this.errorRef = errorRef;
320            return this;
321        }
322
323        public Builder setBlocking(final boolean blocking) {
324            this.blocking = blocking;
325            return this;
326        }
327
328        public Builder setShutdownTimeout(final long shutdownTimeout) {
329            this.shutdownTimeout = shutdownTimeout;
330            return this;
331        }
332
333        public Builder setBufferSize(final int bufferSize) {
334            this.bufferSize = bufferSize;
335            return this;
336        }
337
338        public Builder setName(final String name) {
339            this.name = name;
340            return this;
341        }
342
343        public Builder setIncludeLocation(final boolean includeLocation) {
344            this.includeLocation = includeLocation;
345            return this;
346        }
347
348        public Builder setConfiguration(final Configuration configuration) {
349            this.configuration = configuration;
350            return this;
351        }
352
353        public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
354            this.ignoreExceptions = ignoreExceptions;
355            return this;
356        }
357
358        public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
359            this.blockingQueueFactory = blockingQueueFactory;
360            return this;
361        }
362
363        @Override
364        public AsyncAppender build() {
365            return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
366                shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray());
367        }
368    }
369
370    /**
371     * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
372     *
373     * @return the names of the sink appenders
374     */
375    public String[] getAppenderRefStrings() {
376        final String[] result = new String[appenderRefs.length];
377        for (int i = 0; i < result.length; i++) {
378            result[i] = appenderRefs[i].getRef();
379        }
380        return result;
381    }
382
383    /**
384     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
385     * the class and method where the logging call was made.
386     *
387     * @return {@code true} if location is included with every event, {@code false} otherwise
388     */
389    public boolean isIncludeLocation() {
390        return includeLocation;
391    }
392
393    /**
394     * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
395     * dropped when the queue is full.
396     *
397     * @return whether this AsyncAppender will block or drop events when the queue is full.
398     */
399    public boolean isBlocking() {
400        return blocking;
401    }
402
403    /**
404     * Gets all Appenders.
405     *
406     * @return a list of Appenders.
407     */
408    public List<Appender> getAppenders() {
409        return dispatcher.getAppenders();
410    }
411    
412    /**
413     * Returns the name of the appender that any errors are logged to or {@code null}.
414     *
415     * @return the name of the appender that any errors are logged to or {@code null}
416     */
417    public String getErrorRef() {
418        return errorRef;
419    }
420
421    public int getQueueCapacity() {
422        return queueSize;
423    }
424
425    public int getQueueRemainingCapacity() {
426        return queue.remainingCapacity();
427    }
428
429    /**
430     * Returns the number of elements in the queue.
431     *
432     * @return the number of elements in the queue.
433     * @since 2.11.1
434     */
435    public int getQueueSize() {
436        return queue.size();
437    }
438
439}