001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.appender; 018 019import org.apache.logging.log4j.core.Appender; 020import org.apache.logging.log4j.core.Core; 021import org.apache.logging.log4j.core.Filter; 022import org.apache.logging.log4j.core.LogEvent; 023import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory; 024import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil; 025import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy; 026import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory; 027import org.apache.logging.log4j.core.async.BlockingQueueFactory; 028import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy; 029import org.apache.logging.log4j.core.async.EventRoute; 030import org.apache.logging.log4j.core.async.InternalAsyncUtil; 031import org.apache.logging.log4j.core.config.AppenderControl; 032import org.apache.logging.log4j.core.config.AppenderRef; 033import org.apache.logging.log4j.core.config.Configuration; 034import org.apache.logging.log4j.core.config.ConfigurationException; 035import org.apache.logging.log4j.core.config.Property; 036import org.apache.logging.log4j.core.config.plugins.Plugin; 037import org.apache.logging.log4j.core.config.plugins.PluginAliases; 038import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; 039import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; 040import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 041import org.apache.logging.log4j.core.config.plugins.PluginElement; 042import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; 043import org.apache.logging.log4j.core.filter.AbstractFilterable; 044import org.apache.logging.log4j.core.impl.Log4jLogEvent; 045import org.apache.logging.log4j.spi.AbstractLogger; 046 047import java.util.ArrayList; 048import java.util.List; 049import java.util.Map; 050import java.util.concurrent.BlockingQueue; 051import java.util.concurrent.TimeUnit; 052import java.util.concurrent.TransferQueue; 053import java.util.stream.Collectors; 054 055/** 056 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an 057 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender 058 * references. 059 */ 060@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true) 061public final class AsyncAppender extends AbstractAppender { 062 063 private static final int DEFAULT_QUEUE_SIZE = 1024; 064 065 private final BlockingQueue<LogEvent> queue; 066 private final int queueSize; 067 private final boolean blocking; 068 private final long shutdownTimeout; 069 private final Configuration config; 070 private final AppenderRef[] appenderRefs; 071 private final String errorRef; 072 private final boolean includeLocation; 073 private AppenderControl errorAppender; 074 private AsyncAppenderEventDispatcher dispatcher; 075 private AsyncQueueFullPolicy asyncQueueFullPolicy; 076 077 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 078 final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions, 079 final long shutdownTimeout, final Configuration config, final boolean includeLocation, 080 final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) { 081 super(name, filter, null, ignoreExceptions, properties); 082 this.queue = blockingQueueFactory.create(queueSize); 083 this.queueSize = queueSize; 084 this.blocking = blocking; 085 this.shutdownTimeout = shutdownTimeout; 086 this.config = config; 087 this.appenderRefs = appenderRefs; 088 this.errorRef = errorRef; 089 this.includeLocation = includeLocation; 090 } 091 092 @Override 093 public void start() { 094 final Map<String, Appender> map = config.getAppenders(); 095 final List<AppenderControl> appenders = new ArrayList<>(); 096 for (final AppenderRef appenderRef : appenderRefs) { 097 final Appender appender = map.get(appenderRef.getRef()); 098 if (appender != null) { 099 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter())); 100 } else { 101 LOGGER.error("No appender named {} was configured", appenderRef); 102 } 103 } 104 if (errorRef != null) { 105 final Appender appender = map.get(errorRef); 106 if (appender != null) { 107 errorAppender = new AppenderControl(appender, null, null); 108 } else { 109 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 110 } 111 } 112 if (appenders.size() > 0) { 113 dispatcher = new AsyncAppenderEventDispatcher( 114 getName(), errorAppender, appenders, queue); 115 } else if (errorRef == null) { 116 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); 117 } 118 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); 119 120 dispatcher.start(); 121 super.start(); 122 } 123 124 @Override 125 public boolean stop(final long timeout, final TimeUnit timeUnit) { 126 setStopping(); 127 super.stop(timeout, timeUnit, false); 128 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); 129 try { 130 dispatcher.stop(shutdownTimeout); 131 } catch (final InterruptedException ignored) { 132 // Restore the interrupted flag cleared when the exception is caught. 133 Thread.currentThread().interrupt(); 134 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); 135 } 136 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); 137 138 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { 139 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy, 140 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); 141 } 142 setStopped(); 143 return true; 144 } 145 146 /** 147 * Actual writing occurs here. 148 * 149 * @param logEvent The LogEvent. 150 */ 151 @Override 152 public void append(final LogEvent logEvent) { 153 if (!isStarted()) { 154 throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); 155 } 156 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); 157 InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); 158 if (!transfer(memento)) { 159 if (blocking) { 160 if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 161 // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock 162 AsyncQueueFullMessageUtil.logWarningToStatusLogger(); 163 logMessageInCurrentThread(logEvent); 164 } else { 165 // delegate to the event router (which may discard, enqueue and block, or log in current thread) 166 final EventRoute route = asyncQueueFullPolicy.getRoute(dispatcher.getId(), memento.getLevel()); 167 route.logMessage(this, memento); 168 } 169 } else { 170 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 171 logToErrorAppenderIfNecessary(false, memento); 172 } 173 } 174 } 175 176 private boolean transfer(final LogEvent memento) { 177 return queue instanceof TransferQueue 178 ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento) 179 : queue.offer(memento); 180 } 181 182 /** 183 * FOR INTERNAL USE ONLY. 184 * 185 * @param logEvent the event to log 186 */ 187 public void logMessageInCurrentThread(final LogEvent logEvent) { 188 logEvent.setEndOfBatch(queue.isEmpty()); 189 dispatcher.dispatch(logEvent); 190 } 191 192 /** 193 * FOR INTERNAL USE ONLY. 194 * 195 * @param logEvent the event to log 196 */ 197 public void logMessageInBackgroundThread(final LogEvent logEvent) { 198 try { 199 // wait for free slots in the queue 200 queue.put(logEvent); 201 } catch (final InterruptedException ignored) { 202 final boolean appendSuccessful = handleInterruptedException(logEvent); 203 logToErrorAppenderIfNecessary(appendSuccessful, logEvent); 204 } 205 } 206 207 // LOG4J2-1049: Some applications use Thread.interrupt() to send 208 // messages between application threads. This does not necessarily 209 // mean that the queue is full. To prevent dropping a log message, 210 // quickly try to offer the event to the queue again. 211 // (Yes, this means there is a possibility the same event is logged twice.) 212 // 213 // Finally, catching the InterruptedException means the 214 // interrupted flag has been cleared on the current thread. 215 // This may interfere with the application's expectation of 216 // being interrupted, so when we are done, we set the interrupted 217 // flag again. 218 private boolean handleInterruptedException(final LogEvent memento) { 219 final boolean appendSuccessful = queue.offer(memento); 220 if (!appendSuccessful) { 221 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", 222 getName()); 223 } 224 // set the interrupted flag again. 225 Thread.currentThread().interrupt(); 226 return appendSuccessful; 227 } 228 229 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) { 230 if (!appendSuccessful && errorAppender != null) { 231 errorAppender.callAppender(logEvent); 232 } 233 } 234 235 /** 236 * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the 237 * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior 238 * pre-2.7. 239 * 240 * @param appenderRefs The Appenders to reference. 241 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 242 * @param blocking True if the Appender should wait when the queue is full. The default is true. 243 * @param shutdownTimeout How many milliseconds the Appender should wait to flush outstanding log events 244 * in the queue on shutdown. The default is zero which means to wait forever. 245 * @param size The size of the event queue. The default is 128. 246 * @param name The name of the Appender. 247 * @param includeLocation whether to include location information. The default is false. 248 * @param filter The Filter or null. 249 * @param config The Configuration. 250 * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged; 251 * otherwise they are propagated to the caller. 252 * @return The AsyncAppender. 253 * @deprecated use {@link Builder} instead 254 */ 255 @Deprecated 256 public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef, 257 final boolean blocking, final long shutdownTimeout, final int size, 258 final String name, final boolean includeLocation, final Filter filter, 259 final Configuration config, final boolean ignoreExceptions) { 260 if (name == null) { 261 LOGGER.error("No name provided for AsyncAppender"); 262 return null; 263 } 264 if (appenderRefs == null) { 265 LOGGER.error("No appender references provided to AsyncAppender {}", name); 266 } 267 268 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions, 269 shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null); 270 } 271 272 @PluginBuilderFactory 273 public static Builder newBuilder() { 274 return new Builder(); 275 } 276 277 public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B> 278 implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> { 279 280 @PluginElement("AppenderRef") 281 @Required(message = "No appender references provided to AsyncAppender") 282 private AppenderRef[] appenderRefs; 283 284 @PluginBuilderAttribute 285 @PluginAliases("error-ref") 286 private String errorRef; 287 288 @PluginBuilderAttribute 289 private boolean blocking = true; 290 291 @PluginBuilderAttribute 292 private long shutdownTimeout = 0L; 293 294 @PluginBuilderAttribute 295 private int bufferSize = DEFAULT_QUEUE_SIZE; 296 297 @PluginBuilderAttribute 298 @Required(message = "No name provided for AsyncAppender") 299 private String name; 300 301 @PluginBuilderAttribute 302 private boolean includeLocation = false; 303 304 @PluginConfiguration 305 private Configuration configuration; 306 307 @PluginBuilderAttribute 308 private boolean ignoreExceptions = true; 309 310 @PluginElement(BlockingQueueFactory.ELEMENT_TYPE) 311 private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>(); 312 313 public Builder setAppenderRefs(final AppenderRef[] appenderRefs) { 314 this.appenderRefs = appenderRefs; 315 return this; 316 } 317 318 public Builder setErrorRef(final String errorRef) { 319 this.errorRef = errorRef; 320 return this; 321 } 322 323 public Builder setBlocking(final boolean blocking) { 324 this.blocking = blocking; 325 return this; 326 } 327 328 public Builder setShutdownTimeout(final long shutdownTimeout) { 329 this.shutdownTimeout = shutdownTimeout; 330 return this; 331 } 332 333 public Builder setBufferSize(final int bufferSize) { 334 this.bufferSize = bufferSize; 335 return this; 336 } 337 338 public Builder setName(final String name) { 339 this.name = name; 340 return this; 341 } 342 343 public Builder setIncludeLocation(final boolean includeLocation) { 344 this.includeLocation = includeLocation; 345 return this; 346 } 347 348 public Builder setConfiguration(final Configuration configuration) { 349 this.configuration = configuration; 350 return this; 351 } 352 353 public Builder setIgnoreExceptions(final boolean ignoreExceptions) { 354 this.ignoreExceptions = ignoreExceptions; 355 return this; 356 } 357 358 public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) { 359 this.blockingQueueFactory = blockingQueueFactory; 360 return this; 361 } 362 363 @Override 364 public AsyncAppender build() { 365 return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions, 366 shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray()); 367 } 368 } 369 370 /** 371 * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings. 372 * 373 * @return the names of the sink appenders 374 */ 375 public String[] getAppenderRefStrings() { 376 final String[] result = new String[appenderRefs.length]; 377 for (int i = 0; i < result.length; i++) { 378 result[i] = appenderRefs[i].getRef(); 379 } 380 return result; 381 } 382 383 /** 384 * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine 385 * the class and method where the logging call was made. 386 * 387 * @return {@code true} if location is included with every event, {@code false} otherwise 388 */ 389 public boolean isIncludeLocation() { 390 return includeLocation; 391 } 392 393 /** 394 * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are 395 * dropped when the queue is full. 396 * 397 * @return whether this AsyncAppender will block or drop events when the queue is full. 398 */ 399 public boolean isBlocking() { 400 return blocking; 401 } 402 403 /** 404 * Gets all Appenders. 405 * 406 * @return a list of Appenders. 407 */ 408 public List<Appender> getAppenders() { 409 return dispatcher.getAppenders(); 410 } 411 412 /** 413 * Returns the name of the appender that any errors are logged to or {@code null}. 414 * 415 * @return the name of the appender that any errors are logged to or {@code null} 416 */ 417 public String getErrorRef() { 418 return errorRef; 419 } 420 421 public int getQueueCapacity() { 422 return queueSize; 423 } 424 425 public int getQueueRemainingCapacity() { 426 return queue.remainingCapacity(); 427 } 428 429 /** 430 * Returns the number of elements in the queue. 431 * 432 * @return the number of elements in the queue. 433 * @since 2.11.1 434 */ 435 public int getQueueSize() { 436 return queue.size(); 437 } 438 439}