001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.appender; 018 019import java.io.Serializable; 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ArrayBlockingQueue; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.logging.log4j.core.Appender; 028import org.apache.logging.log4j.core.Filter; 029import org.apache.logging.log4j.core.LogEvent; 030import org.apache.logging.log4j.core.async.RingBufferLogEvent; 031import org.apache.logging.log4j.core.config.AppenderControl; 032import org.apache.logging.log4j.core.config.AppenderRef; 033import org.apache.logging.log4j.core.config.Configuration; 034import org.apache.logging.log4j.core.config.ConfigurationException; 035import org.apache.logging.log4j.core.config.plugins.Plugin; 036import org.apache.logging.log4j.core.config.plugins.PluginAliases; 037import org.apache.logging.log4j.core.config.plugins.PluginAttribute; 038import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 039import org.apache.logging.log4j.core.config.plugins.PluginElement; 040import org.apache.logging.log4j.core.config.plugins.PluginFactory; 041import org.apache.logging.log4j.core.impl.Log4jLogEvent; 042 043/** 044 * Appends to one or more Appenders asynchronously. You can configure an 045 * AsyncAppender with one or more Appenders and an Appender to append to if the 046 * queue is full. The AsyncAppender does not allow a filter to be specified on 047 * the Appender references. 048 */ 049@Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true) 050public final class AsyncAppender extends AbstractAppender { 051 052 private static final long serialVersionUID = 1L; 053 private static final int DEFAULT_QUEUE_SIZE = 128; 054 private static final String SHUTDOWN = "Shutdown"; 055 056 private final BlockingQueue<Serializable> queue; 057 private final int queueSize; 058 private final boolean blocking; 059 private final Configuration config; 060 private final AppenderRef[] appenderRefs; 061 private final String errorRef; 062 private final boolean includeLocation; 063 private AppenderControl errorAppender; 064 private AsyncThread thread; 065 private static final AtomicLong threadSequence = new AtomicLong(1); 066 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>(); 067 068 069 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 070 final String errorRef, final int queueSize, final boolean blocking, 071 final boolean ignoreExceptions, final Configuration config, 072 final boolean includeLocation) { 073 super(name, filter, null, ignoreExceptions); 074 this.queue = new ArrayBlockingQueue<Serializable>(queueSize); 075 this.queueSize = queueSize; 076 this.blocking = blocking; 077 this.config = config; 078 this.appenderRefs = appenderRefs; 079 this.errorRef = errorRef; 080 this.includeLocation = includeLocation; 081 } 082 083 @Override 084 public void start() { 085 final Map<String, Appender> map = config.getAppenders(); 086 final List<AppenderControl> appenders = new ArrayList<AppenderControl>(); 087 for (final AppenderRef appenderRef : appenderRefs) { 088 if (map.containsKey(appenderRef.getRef())) { 089 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(), 090 appenderRef.getFilter())); 091 } else { 092 LOGGER.error("No appender named {} was configured", appenderRef); 093 } 094 } 095 if (errorRef != null) { 096 if (map.containsKey(errorRef)) { 097 errorAppender = new AppenderControl(map.get(errorRef), null, null); 098 } else { 099 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 100 } 101 } 102 if (appenders.size() > 0) { 103 thread = new AsyncThread(appenders, queue); 104 thread.setName("AsyncAppender-" + getName()); 105 } else if (errorRef == null) { 106 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); 107 } 108 109 thread.start(); 110 super.start(); 111 } 112 113 @Override 114 public void stop() { 115 super.stop(); 116 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); 117 thread.shutdown(); 118 try { 119 thread.join(); 120 } catch (final InterruptedException ex) { 121 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); 122 } 123 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); 124 } 125 126 /** 127 * Actual writing occurs here. 128 * 129 * @param logEvent 130 * The LogEvent. 131 */ 132 @Override 133 public void append(LogEvent logEvent) { 134 if (!isStarted()) { 135 throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); 136 } 137 if (!(logEvent instanceof Log4jLogEvent)) { 138 if (!(logEvent instanceof RingBufferLogEvent)) { 139 return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents 140 } 141 logEvent = ((RingBufferLogEvent) logEvent).createMemento(); 142 } 143 logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters 144 final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent; 145 boolean appendSuccessful = false; 146 if (blocking) { 147 if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) { 148 // LOG4J2-485: avoid deadlock that would result from trying 149 // to add to a full queue from appender thread 150 coreEvent.setEndOfBatch(false); // queue is definitely not empty! 151 appendSuccessful = thread.callAppenders(coreEvent); 152 } else { 153 try { 154 // wait for free slots in the queue 155 queue.put(Log4jLogEvent.serialize(coreEvent, includeLocation)); 156 appendSuccessful = true; 157 } catch (final InterruptedException e) { 158 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", 159 getName()); 160 } 161 } 162 } else { 163 appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation)); 164 if (!appendSuccessful) { 165 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 166 } 167 } 168 if (!appendSuccessful && errorAppender != null) { 169 errorAppender.callAppender(coreEvent); 170 } 171 } 172 173 /** 174 * Create an AsyncAppender. 175 * @param appenderRefs The Appenders to reference. 176 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 177 * @param blocking True if the Appender should wait when the queue is full. The default is true. 178 * @param size The size of the event queue. The default is 128. 179 * @param name The name of the Appender. 180 * @param includeLocation whether to include location information. The default is false. 181 * @param filter The Filter or null. 182 * @param config The Configuration. 183 * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged; 184 * otherwise they are propagated to the caller. 185 * @return The AsyncAppender. 186 */ 187 @PluginFactory 188 public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs, 189 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef, 190 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking, 191 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size, 192 @PluginAttribute("name") final String name, 193 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation, 194 @PluginElement("Filter") final Filter filter, 195 @PluginConfiguration final Configuration config, 196 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) { 197 if (name == null) { 198 LOGGER.error("No name provided for AsyncAppender"); 199 return null; 200 } 201 if (appenderRefs == null) { 202 LOGGER.error("No appender references provided to AsyncAppender {}", name); 203 } 204 205 return new AsyncAppender(name, filter, appenderRefs, errorRef, 206 size, blocking, ignoreExceptions, config, includeLocation); 207 } 208 209 /** 210 * Thread that calls the Appenders. 211 */ 212 private class AsyncThread extends Thread { 213 214 private volatile boolean shutdown = false; 215 private final List<AppenderControl> appenders; 216 private final BlockingQueue<Serializable> queue; 217 218 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) { 219 this.appenders = appenders; 220 this.queue = queue; 221 setDaemon(true); 222 setName("AsyncAppenderThread" + threadSequence.getAndIncrement()); 223 } 224 225 @Override 226 public void run() { 227 isAppenderThread.set(Boolean.TRUE); // LOG4J2-485 228 while (!shutdown) { 229 Serializable s; 230 try { 231 s = queue.take(); 232 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) { 233 shutdown = true; 234 continue; 235 } 236 } catch (final InterruptedException ex) { 237 break; // LOG4J2-830 238 } 239 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 240 event.setEndOfBatch(queue.isEmpty()); 241 final boolean success = callAppenders(event); 242 if (!success && errorAppender != null) { 243 try { 244 errorAppender.callAppender(event); 245 } catch (final Exception ex) { 246 // Silently accept the error. 247 } 248 } 249 } 250 // Process any remaining items in the queue. 251 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", 252 queue.size()); 253 int count= 0; 254 int ignored = 0; 255 while (!queue.isEmpty()) { 256 try { 257 final Serializable s = queue.take(); 258 if (Log4jLogEvent.canDeserialize(s)) { 259 final Log4jLogEvent event = Log4jLogEvent.deserialize(s); 260 event.setEndOfBatch(queue.isEmpty()); 261 callAppenders(event); 262 count++; 263 } else { 264 ignored++; 265 LOGGER.trace("Ignoring event of class {}", s.getClass().getName()); 266 } 267 } catch (final InterruptedException ex) { 268 // May have been interrupted to shut down. 269 // Here we ignore interrupts and try to process all remaining events. 270 } 271 } 272 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " + 273 "Processed {} and ignored {} events since shutdown started.", 274 queue.size(), count, ignored); 275 } 276 277 /** 278 * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on 279 * all registered {@code AppenderControl} objects, and returns {@code true} 280 * if at least one appender call was successful, {@code false} otherwise. 281 * Any exceptions are silently ignored. 282 * 283 * @param event the event to forward to the registered appenders 284 * @return {@code true} if at least one appender call succeeded, {@code false} otherwise 285 */ 286 boolean callAppenders(final Log4jLogEvent event) { 287 boolean success = false; 288 for (final AppenderControl control : appenders) { 289 try { 290 control.callAppender(event); 291 success = true; 292 } catch (final Exception ex) { 293 // If no appender is successful the error appender will get it. 294 } 295 } 296 return success; 297 } 298 299 public void shutdown() { 300 shutdown = true; 301 if (queue.isEmpty()) { 302 queue.offer(SHUTDOWN); 303 } 304 } 305 } 306 307 /** 308 * Returns the names of the appenders that this asyncAppender delegates to 309 * as an array of Strings. 310 * @return the names of the sink appenders 311 */ 312 public String[] getAppenderRefStrings() { 313 final String[] result = new String[appenderRefs.length]; 314 for (int i = 0; i < result.length; i++) { 315 result[i] = appenderRefs[i].getRef(); 316 } 317 return result; 318 } 319 320 /** 321 * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with 322 * every log event to determine the class and method where the logging call 323 * was made. 324 * @return {@code true} if location is included with every event, {@code false} otherwise 325 */ 326 public boolean isIncludeLocation() { 327 return includeLocation; 328 } 329 330 /** 331 * Returns {@code true} if this AsyncAppender will block when the queue is full, 332 * or {@code false} if events are dropped when the queue is full. 333 * @return whether this AsyncAppender will block or drop events when the queue is full. 334 */ 335 public boolean isBlocking() { 336 return blocking; 337 } 338 339 /** 340 * Returns the name of the appender that any errors are logged to or {@code null}. 341 * @return the name of the appender that any errors are logged to or {@code null} 342 */ 343 public String getErrorRef() { 344 return errorRef; 345 } 346 347 public int getQueueCapacity() { 348 return queueSize; 349 } 350 351 public int getQueueRemainingCapacity() { 352 return queue.remainingCapacity(); 353 } 354}