001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.appender;
018    
019    import java.io.Serializable;
020    import java.util.ArrayList;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.concurrent.ArrayBlockingQueue;
024    import java.util.concurrent.BlockingQueue;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    import org.apache.logging.log4j.core.Appender;
028    import org.apache.logging.log4j.core.Filter;
029    import org.apache.logging.log4j.core.LogEvent;
030    import org.apache.logging.log4j.core.async.RingBufferLogEvent;
031    import org.apache.logging.log4j.core.config.AppenderControl;
032    import org.apache.logging.log4j.core.config.AppenderRef;
033    import org.apache.logging.log4j.core.config.Configuration;
034    import org.apache.logging.log4j.core.config.ConfigurationException;
035    import org.apache.logging.log4j.core.config.plugins.Plugin;
036    import org.apache.logging.log4j.core.config.plugins.PluginAliases;
037    import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
038    import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
039    import org.apache.logging.log4j.core.config.plugins.PluginElement;
040    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
041    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
042    
043    /**
044     * Appends to one or more Appenders asynchronously.  You can configure an
045     * AsyncAppender with one or more Appenders and an Appender to append to if the
046     * queue is full. The AsyncAppender does not allow a filter to be specified on
047     * the Appender references.
048     */
049    @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
050    public final class AsyncAppender extends AbstractAppender {
051    
052        private static final int DEFAULT_QUEUE_SIZE = 128;
053        private static final String SHUTDOWN = "Shutdown";
054    
055        private final BlockingQueue<Serializable> queue;
056        private final int queueSize;
057        private final boolean blocking;
058        private final Configuration config;
059        private final AppenderRef[] appenderRefs;
060        private final String errorRef;
061        private final boolean includeLocation;
062        private AppenderControl errorAppender;
063        private AsyncThread thread;
064        private static final AtomicLong threadSequence = new AtomicLong(1);
065        private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
066    
067    
068        private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
069                               final String errorRef, final int queueSize, final boolean blocking,
070                               final boolean ignoreExceptions, final Configuration config,
071                               final boolean includeLocation) {
072            super(name, filter, null, ignoreExceptions);
073            this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
074            this.queueSize = queueSize;
075            this.blocking = blocking;
076            this.config = config;
077            this.appenderRefs = appenderRefs;
078            this.errorRef = errorRef;
079            this.includeLocation = includeLocation;
080        }
081    
082        @Override
083        public void start() {
084            final Map<String, Appender> map = config.getAppenders();
085            final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
086            for (final AppenderRef appenderRef : appenderRefs) {
087                if (map.containsKey(appenderRef.getRef())) {
088                    appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
089                        appenderRef.getFilter()));
090                } else {
091                    LOGGER.error("No appender named {} was configured", appenderRef);
092                }
093            }
094            if (errorRef != null) {
095                if (map.containsKey(errorRef)) {
096                    errorAppender = new AppenderControl(map.get(errorRef), null, null);
097                } else {
098                    LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
099                }
100            }
101            if (appenders.size() > 0) {
102                thread = new AsyncThread(appenders, queue);
103                thread.setName("AsyncAppender-" + getName());
104            } else if (errorRef == null) {
105                throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
106            }
107    
108            thread.start();
109            super.start();
110        }
111    
112        @Override
113        public void stop() {
114            super.stop();
115            LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
116            thread.shutdown();
117            try {
118                thread.join();
119            } catch (final InterruptedException ex) {
120                LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
121            }
122            LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
123        }
124    
125        /**
126         * Actual writing occurs here.
127         * <p/>
128         * @param logEvent The LogEvent.
129         */
130        @Override
131        public void append(LogEvent logEvent) {
132            if (!isStarted()) {
133                throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
134            }
135            if (!(logEvent instanceof Log4jLogEvent)) {
136                if (!(logEvent instanceof RingBufferLogEvent)) {
137                    return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents
138                }
139                logEvent = ((RingBufferLogEvent) logEvent).createMemento();
140            }
141            logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
142            final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
143            boolean appendSuccessful = false;
144            if (blocking) {
145                if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
146                    // LOG4J2-485: avoid deadlock that would result from trying
147                    // to add to a full queue from appender thread
148                    coreEvent.setEndOfBatch(false); // queue is definitely not empty!
149                    appendSuccessful = thread.callAppenders(coreEvent);
150                } else {
151                    try {
152                        // wait for free slots in the queue
153                        queue.put(Log4jLogEvent.serialize(coreEvent, includeLocation));
154                        appendSuccessful = true;
155                    } catch (final InterruptedException e) {
156                        LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
157                                getName());
158                    }
159                }
160            } else {
161                appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
162                if (!appendSuccessful) {
163                    error("Appender " + getName() + " is unable to write primary appenders. queue is full");
164                }
165            }
166            if (!appendSuccessful && errorAppender != null) {
167                errorAppender.callAppender(coreEvent);
168            }
169        }
170    
171        /**
172         * Create an AsyncAppender.
173         * @param appenderRefs The Appenders to reference.
174         * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
175         * @param blocking True if the Appender should wait when the queue is full. The default is true.
176         * @param size The size of the event queue. The default is 128.
177         * @param name The name of the Appender.
178         * @param includeLocation whether to include location information. The default is false.
179         * @param filter The Filter or null.
180         * @param config The Configuration.
181         * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
182         *                         otherwise they are propagated to the caller.
183         * @return The AsyncAppender.
184         */
185        @PluginFactory
186        public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
187                @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
188                @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
189                @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
190                @PluginAttribute("name") final String name,
191                @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
192                @PluginElement("Filter") final Filter filter,
193                @PluginConfiguration final Configuration config,
194                @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
195            if (name == null) {
196                LOGGER.error("No name provided for AsyncAppender");
197                return null;
198            }
199            if (appenderRefs == null) {
200                LOGGER.error("No appender references provided to AsyncAppender {}", name);
201            }
202    
203            return new AsyncAppender(name, filter, appenderRefs, errorRef,
204                    size, blocking, ignoreExceptions, config, includeLocation);
205        }
206    
207        /**
208         * Thread that calls the Appenders.
209         */
210        private class AsyncThread extends Thread {
211    
212            private volatile boolean shutdown = false;
213            private final List<AppenderControl> appenders;
214            private final BlockingQueue<Serializable> queue;
215    
216            public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
217                this.appenders = appenders;
218                this.queue = queue;
219                setDaemon(true);
220                setName("AsyncAppenderThread" + threadSequence.getAndIncrement());
221            }
222    
223            @Override
224            public void run() {
225                isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
226                while (!shutdown) {
227                    Serializable s;
228                    try {
229                        s = queue.take();
230                        if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
231                            shutdown = true;
232                            continue;
233                        }
234                    } catch (final InterruptedException ex) {
235                        // No good reason for this.
236                        continue;
237                    }
238                    final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
239                    event.setEndOfBatch(queue.isEmpty());
240                    final boolean success = callAppenders(event);
241                    if (!success && errorAppender != null) {
242                        try {
243                            errorAppender.callAppender(event);
244                        } catch (final Exception ex) {
245                            // Silently accept the error.
246                        }
247                    }
248                }
249                // Process any remaining items in the queue.
250                LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
251                        queue.size());
252                int count= 0;
253                int ignored = 0;
254                while (!queue.isEmpty()) {
255                    try {
256                        final Serializable s = queue.take();
257                        if (Log4jLogEvent.canDeserialize(s)) {
258                            final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
259                            event.setEndOfBatch(queue.isEmpty());
260                            callAppenders(event);
261                            count++;
262                        } else {
263                            ignored++;
264                            LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
265                        }
266                    } catch (final InterruptedException ex) {
267                        // May have been interrupted to shut down.
268                    }
269                }
270                LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " +
271                            "Processed {} and ignored {} events since shutdown started.",
272                            queue.size(), count, ignored);
273            }
274    
275            /**
276             * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on
277             * all registered {@code AppenderControl} objects, and returns {@code true}
278             * if at least one appender call was successful, {@code false} otherwise.
279             * Any exceptions are silently ignored.
280             *
281             * @param event the event to forward to the registered appenders
282             * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
283             */
284            boolean callAppenders(final Log4jLogEvent event) {
285                boolean success = false;
286                for (final AppenderControl control : appenders) {
287                    try {
288                        control.callAppender(event);
289                        success = true;
290                    } catch (final Exception ex) {
291                        // If no appender is successful the error appender will get it.
292                    }
293                }
294                return success;
295            }
296    
297            public void shutdown() {
298                shutdown = true;
299                if (queue.isEmpty()) {
300                    queue.offer(SHUTDOWN);
301                }
302            }
303        }
304    
305        /**
306         * Returns the names of the appenders that this asyncAppender delegates to
307         * as an array of Strings.
308         * @return the names of the sink appenders
309         */
310        public String[] getAppenderRefStrings() {
311            final String[] result = new String[appenderRefs.length];
312            for (int i = 0; i < result.length; i++) {
313                result[i] = appenderRefs[i].getRef();
314            }
315            return result;
316        }
317    
318        /**
319         * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with
320         * every log event to determine the class and method where the logging call
321         * was made.
322         * @return {@code true} if location is included with every event, {@code false} otherwise
323         */
324        public boolean isIncludeLocation() {
325            return includeLocation;
326        }
327    
328        /**
329         * Returns {@code true} if this AsyncAppender will block when the queue is full,
330         * or {@code false} if events are dropped when the queue is full.
331         * @return whether this AsyncAppender will block or drop events when the queue is full.
332         */
333        public boolean isBlocking() {
334            return blocking;
335        }
336    
337        /**
338         * Returns the name of the appender that any errors are logged to or {@code null}.
339         * @return the name of the appender that any errors are logged to or {@code null}
340         */
341        public String getErrorRef() {
342            return errorRef;
343        }
344    
345        public int getQueueCapacity() {
346            return queueSize;
347        }
348    
349        public int getQueueRemainingCapacity() {
350            return queue.remainingCapacity();
351        }
352    }