001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.appender;
018    
019    import java.io.Serializable;
020    import java.util.ArrayList;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.concurrent.ArrayBlockingQueue;
024    import java.util.concurrent.BlockingQueue;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    import org.apache.logging.log4j.core.Appender;
028    import org.apache.logging.log4j.core.Filter;
029    import org.apache.logging.log4j.core.LogEvent;
030    import org.apache.logging.log4j.core.async.RingBufferLogEvent;
031    import org.apache.logging.log4j.core.config.AppenderControl;
032    import org.apache.logging.log4j.core.config.AppenderRef;
033    import org.apache.logging.log4j.core.config.Configuration;
034    import org.apache.logging.log4j.core.config.ConfigurationException;
035    import org.apache.logging.log4j.core.config.plugins.Plugin;
036    import org.apache.logging.log4j.core.config.plugins.PluginAliases;
037    import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
038    import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
039    import org.apache.logging.log4j.core.config.plugins.PluginElement;
040    import org.apache.logging.log4j.core.config.plugins.PluginFactory;
041    import org.apache.logging.log4j.core.impl.Log4jLogEvent;
042    
043    /**
044     * Appends to one or more Appenders asynchronously.  You can configure an
045     * AsyncAppender with one or more Appenders and an Appender to append to if the
046     * queue is full. The AsyncAppender does not allow a filter to be specified on
047     * the Appender references.
048     */
049    @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
050    public final class AsyncAppender extends AbstractAppender {
051    
052        private static final long serialVersionUID = 1L;
053        private static final int DEFAULT_QUEUE_SIZE = 128;
054        private static final String SHUTDOWN = "Shutdown";
055    
056        private final BlockingQueue<Serializable> queue;
057        private final int queueSize;
058        private final boolean blocking;
059        private final Configuration config;
060        private final AppenderRef[] appenderRefs;
061        private final String errorRef;
062        private final boolean includeLocation;
063        private AppenderControl errorAppender;
064        private AsyncThread thread;
065        private static final AtomicLong threadSequence = new AtomicLong(1);
066        private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
067    
068    
069        private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
070                               final String errorRef, final int queueSize, final boolean blocking,
071                               final boolean ignoreExceptions, final Configuration config,
072                               final boolean includeLocation) {
073            super(name, filter, null, ignoreExceptions);
074            this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
075            this.queueSize = queueSize;
076            this.blocking = blocking;
077            this.config = config;
078            this.appenderRefs = appenderRefs;
079            this.errorRef = errorRef;
080            this.includeLocation = includeLocation;
081        }
082    
083        @Override
084        public void start() {
085            final Map<String, Appender> map = config.getAppenders();
086            final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
087            for (final AppenderRef appenderRef : appenderRefs) {
088                if (map.containsKey(appenderRef.getRef())) {
089                    appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
090                        appenderRef.getFilter()));
091                } else {
092                    LOGGER.error("No appender named {} was configured", appenderRef);
093                }
094            }
095            if (errorRef != null) {
096                if (map.containsKey(errorRef)) {
097                    errorAppender = new AppenderControl(map.get(errorRef), null, null);
098                } else {
099                    LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
100                }
101            }
102            if (appenders.size() > 0) {
103                thread = new AsyncThread(appenders, queue);
104                thread.setName("AsyncAppender-" + getName());
105            } else if (errorRef == null) {
106                throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
107            }
108    
109            thread.start();
110            super.start();
111        }
112    
113        @Override
114        public void stop() {
115            super.stop();
116            LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
117            thread.shutdown();
118            try {
119                thread.join();
120            } catch (final InterruptedException ex) {
121                LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
122            }
123            LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
124        }
125    
126        /**
127         * Actual writing occurs here.
128         * 
129         * @param logEvent
130         *        The LogEvent.
131         */
132        @Override
133        public void append(LogEvent logEvent) {
134            if (!isStarted()) {
135                throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
136            }
137            if (!(logEvent instanceof Log4jLogEvent)) {
138                if (!(logEvent instanceof RingBufferLogEvent)) {
139                    return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents
140                }
141                logEvent = ((RingBufferLogEvent) logEvent).createMemento();
142            }
143            logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
144            final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
145            boolean appendSuccessful = false;
146            if (blocking) {
147                if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
148                    // LOG4J2-485: avoid deadlock that would result from trying
149                    // to add to a full queue from appender thread
150                    coreEvent.setEndOfBatch(false); // queue is definitely not empty!
151                    appendSuccessful = thread.callAppenders(coreEvent);
152                } else {
153                    try {
154                        // wait for free slots in the queue
155                        queue.put(Log4jLogEvent.serialize(coreEvent, includeLocation));
156                        appendSuccessful = true;
157                    } catch (final InterruptedException e) {
158                        LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
159                                getName());
160                    }
161                }
162            } else {
163                appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
164                if (!appendSuccessful) {
165                    error("Appender " + getName() + " is unable to write primary appenders. queue is full");
166                }
167            }
168            if (!appendSuccessful && errorAppender != null) {
169                errorAppender.callAppender(coreEvent);
170            }
171        }
172    
173        /**
174         * Create an AsyncAppender.
175         * @param appenderRefs The Appenders to reference.
176         * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
177         * @param blocking True if the Appender should wait when the queue is full. The default is true.
178         * @param size The size of the event queue. The default is 128.
179         * @param name The name of the Appender.
180         * @param includeLocation whether to include location information. The default is false.
181         * @param filter The Filter or null.
182         * @param config The Configuration.
183         * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
184         *                         otherwise they are propagated to the caller.
185         * @return The AsyncAppender.
186         */
187        @PluginFactory
188        public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
189                @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
190                @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
191                @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
192                @PluginAttribute("name") final String name,
193                @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
194                @PluginElement("Filter") final Filter filter,
195                @PluginConfiguration final Configuration config,
196                @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
197            if (name == null) {
198                LOGGER.error("No name provided for AsyncAppender");
199                return null;
200            }
201            if (appenderRefs == null) {
202                LOGGER.error("No appender references provided to AsyncAppender {}", name);
203            }
204    
205            return new AsyncAppender(name, filter, appenderRefs, errorRef,
206                    size, blocking, ignoreExceptions, config, includeLocation);
207        }
208    
209        /**
210         * Thread that calls the Appenders.
211         */
212        private class AsyncThread extends Thread {
213    
214            private volatile boolean shutdown = false;
215            private final List<AppenderControl> appenders;
216            private final BlockingQueue<Serializable> queue;
217    
218            public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
219                this.appenders = appenders;
220                this.queue = queue;
221                setDaemon(true);
222                setName("AsyncAppenderThread" + threadSequence.getAndIncrement());
223            }
224    
225            @Override
226            public void run() {
227                isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
228                while (!shutdown) {
229                    Serializable s;
230                    try {
231                        s = queue.take();
232                        if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
233                            shutdown = true;
234                            continue;
235                        }
236                    } catch (final InterruptedException ex) {
237                        break; // LOG4J2-830
238                    }
239                    final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
240                    event.setEndOfBatch(queue.isEmpty());
241                    final boolean success = callAppenders(event);
242                    if (!success && errorAppender != null) {
243                        try {
244                            errorAppender.callAppender(event);
245                        } catch (final Exception ex) {
246                            // Silently accept the error.
247                        }
248                    }
249                }
250                // Process any remaining items in the queue.
251                LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
252                        queue.size());
253                int count= 0;
254                int ignored = 0;
255                while (!queue.isEmpty()) {
256                    try {
257                        final Serializable s = queue.take();
258                        if (Log4jLogEvent.canDeserialize(s)) {
259                            final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
260                            event.setEndOfBatch(queue.isEmpty());
261                            callAppenders(event);
262                            count++;
263                        } else {
264                            ignored++;
265                            LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
266                        }
267                    } catch (final InterruptedException ex) {
268                        // May have been interrupted to shut down.
269                        // Here we ignore interrupts and try to process all remaining events.
270                    }
271                }
272                LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " +
273                            "Processed {} and ignored {} events since shutdown started.",
274                            queue.size(), count, ignored);
275            }
276    
277            /**
278             * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on
279             * all registered {@code AppenderControl} objects, and returns {@code true}
280             * if at least one appender call was successful, {@code false} otherwise.
281             * Any exceptions are silently ignored.
282             *
283             * @param event the event to forward to the registered appenders
284             * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
285             */
286            boolean callAppenders(final Log4jLogEvent event) {
287                boolean success = false;
288                for (final AppenderControl control : appenders) {
289                    try {
290                        control.callAppender(event);
291                        success = true;
292                    } catch (final Exception ex) {
293                        // If no appender is successful the error appender will get it.
294                    }
295                }
296                return success;
297            }
298    
299            public void shutdown() {
300                shutdown = true;
301                if (queue.isEmpty()) {
302                    queue.offer(SHUTDOWN);
303                }
304            }
305        }
306    
307        /**
308         * Returns the names of the appenders that this asyncAppender delegates to
309         * as an array of Strings.
310         * @return the names of the sink appenders
311         */
312        public String[] getAppenderRefStrings() {
313            final String[] result = new String[appenderRefs.length];
314            for (int i = 0; i < result.length; i++) {
315                result[i] = appenderRefs[i].getRef();
316            }
317            return result;
318        }
319    
320        /**
321         * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with
322         * every log event to determine the class and method where the logging call
323         * was made.
324         * @return {@code true} if location is included with every event, {@code false} otherwise
325         */
326        public boolean isIncludeLocation() {
327            return includeLocation;
328        }
329    
330        /**
331         * Returns {@code true} if this AsyncAppender will block when the queue is full,
332         * or {@code false} if events are dropped when the queue is full.
333         * @return whether this AsyncAppender will block or drop events when the queue is full.
334         */
335        public boolean isBlocking() {
336            return blocking;
337        }
338    
339        /**
340         * Returns the name of the appender that any errors are logged to or {@code null}.
341         * @return the name of the appender that any errors are logged to or {@code null}
342         */
343        public String getErrorRef() {
344            return errorRef;
345        }
346    
347        public int getQueueCapacity() {
348            return queueSize;
349        }
350    
351        public int getQueueRemainingCapacity() {
352            return queue.remainingCapacity();
353        }
354    }