001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TransferQueue;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.logging.log4j.core.AbstractLogEvent;
028import org.apache.logging.log4j.core.Appender;
029import org.apache.logging.log4j.core.Core;
030import org.apache.logging.log4j.core.Filter;
031import org.apache.logging.log4j.core.LogEvent;
032import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
033import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
034import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
035import org.apache.logging.log4j.core.async.BlockingQueueFactory;
036import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
037import org.apache.logging.log4j.core.async.EventRoute;
038import org.apache.logging.log4j.core.async.InternalAsyncUtil;
039import org.apache.logging.log4j.core.config.AppenderControl;
040import org.apache.logging.log4j.core.config.AppenderRef;
041import org.apache.logging.log4j.core.config.Configuration;
042import org.apache.logging.log4j.core.config.ConfigurationException;
043import org.apache.logging.log4j.core.config.plugins.Plugin;
044import org.apache.logging.log4j.core.config.plugins.PluginAliases;
045import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
046import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
047import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
048import org.apache.logging.log4j.core.config.plugins.PluginElement;
049import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
050import org.apache.logging.log4j.core.impl.Log4jLogEvent;
051import org.apache.logging.log4j.core.util.Log4jThread;
052
053/**
054 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
055 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
056 * references.
057 */
058@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
059public final class AsyncAppender extends AbstractAppender {
060
061    private static final int DEFAULT_QUEUE_SIZE = 1024;
062    private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
063    };
064
065    private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
066
067    private final BlockingQueue<LogEvent> queue;
068    private final int queueSize;
069    private final boolean blocking;
070    private final long shutdownTimeout;
071    private final Configuration config;
072    private final AppenderRef[] appenderRefs;
073    private final String errorRef;
074    private final boolean includeLocation;
075    private AppenderControl errorAppender;
076    private AsyncThread thread;
077    private AsyncQueueFullPolicy asyncQueueFullPolicy;
078
079    private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
080                          final String errorRef, final int queueSize, final boolean blocking,
081                          final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
082                          final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
083        super(name, filter, null, ignoreExceptions);
084        this.queue = blockingQueueFactory.create(queueSize);
085        this.queueSize = queueSize;
086        this.blocking = blocking;
087        this.shutdownTimeout = shutdownTimeout;
088        this.config = config;
089        this.appenderRefs = appenderRefs;
090        this.errorRef = errorRef;
091        this.includeLocation = includeLocation;
092    }
093
094    @Override
095    public void start() {
096        final Map<String, Appender> map = config.getAppenders();
097        final List<AppenderControl> appenders = new ArrayList<>();
098        for (final AppenderRef appenderRef : appenderRefs) {
099            final Appender appender = map.get(appenderRef.getRef());
100            if (appender != null) {
101                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
102            } else {
103                LOGGER.error("No appender named {} was configured", appenderRef);
104            }
105        }
106        if (errorRef != null) {
107            final Appender appender = map.get(errorRef);
108            if (appender != null) {
109                errorAppender = new AppenderControl(appender, null, null);
110            } else {
111                LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
112            }
113        }
114        if (appenders.size() > 0) {
115            thread = new AsyncThread(appenders, queue);
116            thread.setName("AsyncAppender-" + getName());
117        } else if (errorRef == null) {
118            throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
119        }
120        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
121
122        thread.start();
123        super.start();
124    }
125
126    @Override
127    public boolean stop(final long timeout, final TimeUnit timeUnit) {
128        setStopping();
129        super.stop(timeout, timeUnit, false);
130        LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
131        thread.shutdown();
132        try {
133            thread.join(shutdownTimeout);
134        } catch (final InterruptedException ex) {
135            LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
136        }
137        LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
138
139        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
140            LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
141                DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
142        }
143        setStopped();
144        return true;
145    }
146
147    /**
148     * Actual writing occurs here.
149     *
150     * @param logEvent The LogEvent.
151     */
152    @Override
153    public void append(final LogEvent logEvent) {
154        if (!isStarted()) {
155            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
156        }
157        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
158        InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
159        if (!transfer(memento)) {
160            if (blocking) {
161                // delegate to the event router (which may discard, enqueue and block, or log in current thread)
162                final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
163                route.logMessage(this, memento);
164            } else {
165                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
166                logToErrorAppenderIfNecessary(false, memento);
167            }
168        }
169    }
170
171    private boolean transfer(final LogEvent memento) {
172        return queue instanceof TransferQueue
173            ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
174            : queue.offer(memento);
175    }
176
177    /**
178     * FOR INTERNAL USE ONLY.
179     *
180     * @param logEvent the event to log
181     */
182    public void logMessageInCurrentThread(final LogEvent logEvent) {
183        logEvent.setEndOfBatch(queue.isEmpty());
184        final boolean appendSuccessful = thread.callAppenders(logEvent);
185        logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
186    }
187
188    /**
189     * FOR INTERNAL USE ONLY.
190     *
191     * @param logEvent the event to log
192     */
193    public void logMessageInBackgroundThread(final LogEvent logEvent) {
194        try {
195            // wait for free slots in the queue
196            queue.put(logEvent);
197        } catch (final InterruptedException e) {
198            final boolean appendSuccessful = handleInterruptedException(logEvent);
199            logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
200        }
201    }
202
203    // LOG4J2-1049: Some applications use Thread.interrupt() to send
204    // messages between application threads. This does not necessarily
205    // mean that the queue is full. To prevent dropping a log message,
206    // quickly try to offer the event to the queue again.
207    // (Yes, this means there is a possibility the same event is logged twice.)
208    //
209    // Finally, catching the InterruptedException means the
210    // interrupted flag has been cleared on the current thread.
211    // This may interfere with the application's expectation of
212    // being interrupted, so when we are done, we set the interrupted
213    // flag again.
214    private boolean handleInterruptedException(final LogEvent memento) {
215        final boolean appendSuccessful = queue.offer(memento);
216        if (!appendSuccessful) {
217            LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
218                getName());
219        }
220        // set the interrupted flag again.
221        Thread.currentThread().interrupt();
222        return appendSuccessful;
223    }
224
225    private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
226        if (!appendSuccessful && errorAppender != null) {
227            errorAppender.callAppender(logEvent);
228        }
229    }
230
231    /**
232     * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
233     * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
234     * pre-2.7.
235     *
236     * @param appenderRefs     The Appenders to reference.
237     * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
238     * @param blocking         True if the Appender should wait when the queue is full. The default is true.
239     * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
240     *                         in the queue on shutdown. The default is zero which means to wait forever.
241     * @param size             The size of the event queue. The default is 128.
242     * @param name             The name of the Appender.
243     * @param includeLocation  whether to include location information. The default is false.
244     * @param filter           The Filter or null.
245     * @param config           The Configuration.
246     * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
247     *                         otherwise they are propagated to the caller.
248     * @return The AsyncAppender.
249     * @deprecated use {@link Builder} instead
250     */
251    @Deprecated
252    public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
253                                               final boolean blocking, final long shutdownTimeout, final int size,
254                                               final String name, final boolean includeLocation, final Filter filter,
255                                               final Configuration config, final boolean ignoreExceptions) {
256        if (name == null) {
257            LOGGER.error("No name provided for AsyncAppender");
258            return null;
259        }
260        if (appenderRefs == null) {
261            LOGGER.error("No appender references provided to AsyncAppender {}", name);
262        }
263
264        return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
265            shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
266    }
267
268    @PluginBuilderFactory
269    public static Builder newBuilder() {
270        return new Builder();
271    }
272
273    public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
274
275        @PluginElement("AppenderRef")
276        @Required(message = "No appender references provided to AsyncAppender")
277        private AppenderRef[] appenderRefs;
278
279        @PluginBuilderAttribute
280        @PluginAliases("error-ref")
281        private String errorRef;
282
283        @PluginBuilderAttribute
284        private boolean blocking = true;
285
286        @PluginBuilderAttribute
287        private long shutdownTimeout = 0L;
288
289        @PluginBuilderAttribute
290        private int bufferSize = DEFAULT_QUEUE_SIZE;
291
292        @PluginBuilderAttribute
293        @Required(message = "No name provided for AsyncAppender")
294        private String name;
295
296        @PluginBuilderAttribute
297        private boolean includeLocation = false;
298
299        @PluginElement("Filter")
300        private Filter filter;
301
302        @PluginConfiguration
303        private Configuration configuration;
304
305        @PluginBuilderAttribute
306        private boolean ignoreExceptions = true;
307
308        @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
309        private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
310
311        public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
312            this.appenderRefs = appenderRefs;
313            return this;
314        }
315
316        public Builder setErrorRef(final String errorRef) {
317            this.errorRef = errorRef;
318            return this;
319        }
320
321        public Builder setBlocking(final boolean blocking) {
322            this.blocking = blocking;
323            return this;
324        }
325
326        public Builder setShutdownTimeout(final long shutdownTimeout) {
327            this.shutdownTimeout = shutdownTimeout;
328            return this;
329        }
330
331        public Builder setBufferSize(final int bufferSize) {
332            this.bufferSize = bufferSize;
333            return this;
334        }
335
336        public Builder setName(final String name) {
337            this.name = name;
338            return this;
339        }
340
341        public Builder setIncludeLocation(final boolean includeLocation) {
342            this.includeLocation = includeLocation;
343            return this;
344        }
345
346        public Builder setFilter(final Filter filter) {
347            this.filter = filter;
348            return this;
349        }
350
351        public Builder setConfiguration(final Configuration configuration) {
352            this.configuration = configuration;
353            return this;
354        }
355
356        public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
357            this.ignoreExceptions = ignoreExceptions;
358            return this;
359        }
360
361        public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
362            this.blockingQueueFactory = blockingQueueFactory;
363            return this;
364        }
365
366        @Override
367        public AsyncAppender build() {
368            return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
369                shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
370        }
371    }
372
373    /**
374     * Thread that calls the Appenders.
375     */
376    private class AsyncThread extends Log4jThread {
377
378        private volatile boolean shutdown = false;
379        private final List<AppenderControl> appenders;
380        private final BlockingQueue<LogEvent> queue;
381
382        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
383            super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
384            this.appenders = appenders;
385            this.queue = queue;
386            setDaemon(true);
387        }
388
389        @Override
390        public void run() {
391            while (!shutdown) {
392                LogEvent event;
393                try {
394                    event = queue.take();
395                    if (event == SHUTDOWN_LOG_EVENT) {
396                        shutdown = true;
397                        continue;
398                    }
399                } catch (final InterruptedException ex) {
400                    break; // LOG4J2-830
401                }
402                event.setEndOfBatch(queue.isEmpty());
403                final boolean success = callAppenders(event);
404                if (!success && errorAppender != null) {
405                    try {
406                        errorAppender.callAppender(event);
407                    } catch (final Exception ex) {
408                        // Silently accept the error.
409                    }
410                }
411            }
412            // Process any remaining items in the queue.
413            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
414                queue.size());
415            int count = 0;
416            int ignored = 0;
417            while (!queue.isEmpty()) {
418                try {
419                    final LogEvent event = queue.take();
420                    if (event instanceof Log4jLogEvent) {
421                        final Log4jLogEvent logEvent = (Log4jLogEvent) event;
422                        logEvent.setEndOfBatch(queue.isEmpty());
423                        callAppenders(logEvent);
424                        count++;
425                    } else {
426                        ignored++;
427                        LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
428                    }
429                } catch (final InterruptedException ex) {
430                    // May have been interrupted to shut down.
431                    // Here we ignore interrupts and try to process all remaining events.
432                }
433            }
434            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
435                + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
436        }
437
438        /**
439         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
440         * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
441         * exceptions are silently ignored.
442         *
443         * @param event the event to forward to the registered appenders
444         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
445         */
446        boolean callAppenders(final LogEvent event) {
447            boolean success = false;
448            for (final AppenderControl control : appenders) {
449                try {
450                    control.callAppender(event);
451                    success = true;
452                } catch (final Exception ex) {
453                    // If no appender is successful the error appender will get it.
454                }
455            }
456            return success;
457        }
458
459        public void shutdown() {
460            shutdown = true;
461            if (queue.isEmpty()) {
462                queue.offer(SHUTDOWN_LOG_EVENT);
463            }
464            if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
465                this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
466            }
467        }
468    }
469
470    /**
471     * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
472     *
473     * @return the names of the sink appenders
474     */
475    public String[] getAppenderRefStrings() {
476        final String[] result = new String[appenderRefs.length];
477        for (int i = 0; i < result.length; i++) {
478            result[i] = appenderRefs[i].getRef();
479        }
480        return result;
481    }
482
483    /**
484     * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
485     * the class and method where the logging call was made.
486     *
487     * @return {@code true} if location is included with every event, {@code false} otherwise
488     */
489    public boolean isIncludeLocation() {
490        return includeLocation;
491    }
492
493    /**
494     * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
495     * dropped when the queue is full.
496     *
497     * @return whether this AsyncAppender will block or drop events when the queue is full.
498     */
499    public boolean isBlocking() {
500        return blocking;
501    }
502
503    /**
504     * Returns the name of the appender that any errors are logged to or {@code null}.
505     *
506     * @return the name of the appender that any errors are logged to or {@code null}
507     */
508    public String getErrorRef() {
509        return errorRef;
510    }
511
512    public int getQueueCapacity() {
513        return queueSize;
514    }
515
516    public int getQueueRemainingCapacity() {
517        return queue.remainingCapacity();
518    }
519}