001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.appender; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.TransferQueue; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.logging.log4j.core.AbstractLogEvent; 028import org.apache.logging.log4j.core.Appender; 029import org.apache.logging.log4j.core.Core; 030import org.apache.logging.log4j.core.Filter; 031import org.apache.logging.log4j.core.LogEvent; 032import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory; 033import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil; 034import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy; 035import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory; 036import org.apache.logging.log4j.core.async.BlockingQueueFactory; 037import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy; 038import org.apache.logging.log4j.core.async.EventRoute; 039import org.apache.logging.log4j.core.async.InternalAsyncUtil; 040import org.apache.logging.log4j.core.config.AppenderControl; 041import org.apache.logging.log4j.core.config.AppenderRef; 042import org.apache.logging.log4j.core.config.Configuration; 043import org.apache.logging.log4j.core.config.ConfigurationException; 044import org.apache.logging.log4j.core.config.Property; 045import org.apache.logging.log4j.core.config.plugins.Plugin; 046import org.apache.logging.log4j.core.config.plugins.PluginAliases; 047import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; 048import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; 049import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; 050import org.apache.logging.log4j.core.config.plugins.PluginElement; 051import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; 052import org.apache.logging.log4j.core.filter.AbstractFilterable; 053import org.apache.logging.log4j.core.impl.Log4jLogEvent; 054import org.apache.logging.log4j.core.util.Log4jThread; 055import org.apache.logging.log4j.spi.AbstractLogger; 056 057/** 058 * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an 059 * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender 060 * references. 061 */ 062@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true) 063public final class AsyncAppender extends AbstractAppender { 064 065 private static final int DEFAULT_QUEUE_SIZE = 1024; 066 private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() { 067 private static final long serialVersionUID = -1761035149477086330L; 068 }; 069 070 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1); 071 072 private final BlockingQueue<LogEvent> queue; 073 private final int queueSize; 074 private final boolean blocking; 075 private final long shutdownTimeout; 076 private final Configuration config; 077 private final AppenderRef[] appenderRefs; 078 private final String errorRef; 079 private final boolean includeLocation; 080 private AppenderControl errorAppender; 081 private AsyncThread thread; 082 private AsyncQueueFullPolicy asyncQueueFullPolicy; 083 084 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, 085 final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions, 086 final long shutdownTimeout, final Configuration config, final boolean includeLocation, 087 final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) { 088 super(name, filter, null, ignoreExceptions, properties); 089 this.queue = blockingQueueFactory.create(queueSize); 090 this.queueSize = queueSize; 091 this.blocking = blocking; 092 this.shutdownTimeout = shutdownTimeout; 093 this.config = config; 094 this.appenderRefs = appenderRefs; 095 this.errorRef = errorRef; 096 this.includeLocation = includeLocation; 097 } 098 099 @Override 100 public void start() { 101 final Map<String, Appender> map = config.getAppenders(); 102 final List<AppenderControl> appenders = new ArrayList<>(); 103 for (final AppenderRef appenderRef : appenderRefs) { 104 final Appender appender = map.get(appenderRef.getRef()); 105 if (appender != null) { 106 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter())); 107 } else { 108 LOGGER.error("No appender named {} was configured", appenderRef); 109 } 110 } 111 if (errorRef != null) { 112 final Appender appender = map.get(errorRef); 113 if (appender != null) { 114 errorAppender = new AppenderControl(appender, null, null); 115 } else { 116 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); 117 } 118 } 119 if (appenders.size() > 0) { 120 thread = new AsyncThread(appenders, queue); 121 thread.setName("AsyncAppender-" + getName()); 122 } else if (errorRef == null) { 123 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); 124 } 125 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); 126 127 thread.start(); 128 super.start(); 129 } 130 131 @Override 132 public boolean stop(final long timeout, final TimeUnit timeUnit) { 133 setStopping(); 134 super.stop(timeout, timeUnit, false); 135 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); 136 thread.shutdown(); 137 try { 138 thread.join(shutdownTimeout); 139 } catch (final InterruptedException ex) { 140 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); 141 } 142 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); 143 144 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { 145 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy, 146 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); 147 } 148 setStopped(); 149 return true; 150 } 151 152 /** 153 * Actual writing occurs here. 154 * 155 * @param logEvent The LogEvent. 156 */ 157 @Override 158 public void append(final LogEvent logEvent) { 159 if (!isStarted()) { 160 throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); 161 } 162 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); 163 InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); 164 if (!transfer(memento)) { 165 if (blocking) { 166 if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 167 // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock 168 AsyncQueueFullMessageUtil.logWarningToStatusLogger(); 169 logMessageInCurrentThread(logEvent); 170 } else { 171 // delegate to the event router (which may discard, enqueue and block, or log in current thread) 172 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); 173 route.logMessage(this, memento); 174 } 175 } else { 176 error("Appender " + getName() + " is unable to write primary appenders. queue is full"); 177 logToErrorAppenderIfNecessary(false, memento); 178 } 179 } 180 } 181 182 private boolean transfer(final LogEvent memento) { 183 return queue instanceof TransferQueue 184 ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento) 185 : queue.offer(memento); 186 } 187 188 /** 189 * FOR INTERNAL USE ONLY. 190 * 191 * @param logEvent the event to log 192 */ 193 public void logMessageInCurrentThread(final LogEvent logEvent) { 194 logEvent.setEndOfBatch(queue.isEmpty()); 195 final boolean appendSuccessful = thread.callAppenders(logEvent); 196 logToErrorAppenderIfNecessary(appendSuccessful, logEvent); 197 } 198 199 /** 200 * FOR INTERNAL USE ONLY. 201 * 202 * @param logEvent the event to log 203 */ 204 public void logMessageInBackgroundThread(final LogEvent logEvent) { 205 try { 206 // wait for free slots in the queue 207 queue.put(logEvent); 208 } catch (final InterruptedException e) { 209 final boolean appendSuccessful = handleInterruptedException(logEvent); 210 logToErrorAppenderIfNecessary(appendSuccessful, logEvent); 211 } 212 } 213 214 // LOG4J2-1049: Some applications use Thread.interrupt() to send 215 // messages between application threads. This does not necessarily 216 // mean that the queue is full. To prevent dropping a log message, 217 // quickly try to offer the event to the queue again. 218 // (Yes, this means there is a possibility the same event is logged twice.) 219 // 220 // Finally, catching the InterruptedException means the 221 // interrupted flag has been cleared on the current thread. 222 // This may interfere with the application's expectation of 223 // being interrupted, so when we are done, we set the interrupted 224 // flag again. 225 private boolean handleInterruptedException(final LogEvent memento) { 226 final boolean appendSuccessful = queue.offer(memento); 227 if (!appendSuccessful) { 228 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", 229 getName()); 230 } 231 // set the interrupted flag again. 232 Thread.currentThread().interrupt(); 233 return appendSuccessful; 234 } 235 236 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) { 237 if (!appendSuccessful && errorAppender != null) { 238 errorAppender.callAppender(logEvent); 239 } 240 } 241 242 /** 243 * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the 244 * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior 245 * pre-2.7. 246 * 247 * @param appenderRefs The Appenders to reference. 248 * @param errorRef An optional Appender to write to if the queue is full or other errors occur. 249 * @param blocking True if the Appender should wait when the queue is full. The default is true. 250 * @param shutdownTimeout How many milliseconds the Appender should wait to flush outstanding log events 251 * in the queue on shutdown. The default is zero which means to wait forever. 252 * @param size The size of the event queue. The default is 128. 253 * @param name The name of the Appender. 254 * @param includeLocation whether to include location information. The default is false. 255 * @param filter The Filter or null. 256 * @param config The Configuration. 257 * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged; 258 * otherwise they are propagated to the caller. 259 * @return The AsyncAppender. 260 * @deprecated use {@link Builder} instead 261 */ 262 @Deprecated 263 public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef, 264 final boolean blocking, final long shutdownTimeout, final int size, 265 final String name, final boolean includeLocation, final Filter filter, 266 final Configuration config, final boolean ignoreExceptions) { 267 if (name == null) { 268 LOGGER.error("No name provided for AsyncAppender"); 269 return null; 270 } 271 if (appenderRefs == null) { 272 LOGGER.error("No appender references provided to AsyncAppender {}", name); 273 } 274 275 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions, 276 shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null); 277 } 278 279 @PluginBuilderFactory 280 public static Builder newBuilder() { 281 return new Builder(); 282 } 283 284 public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B> 285 implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> { 286 287 @PluginElement("AppenderRef") 288 @Required(message = "No appender references provided to AsyncAppender") 289 private AppenderRef[] appenderRefs; 290 291 @PluginBuilderAttribute 292 @PluginAliases("error-ref") 293 private String errorRef; 294 295 @PluginBuilderAttribute 296 private boolean blocking = true; 297 298 @PluginBuilderAttribute 299 private long shutdownTimeout = 0L; 300 301 @PluginBuilderAttribute 302 private int bufferSize = DEFAULT_QUEUE_SIZE; 303 304 @PluginBuilderAttribute 305 @Required(message = "No name provided for AsyncAppender") 306 private String name; 307 308 @PluginBuilderAttribute 309 private boolean includeLocation = false; 310 311 @PluginConfiguration 312 private Configuration configuration; 313 314 @PluginBuilderAttribute 315 private boolean ignoreExceptions = true; 316 317 @PluginElement(BlockingQueueFactory.ELEMENT_TYPE) 318 private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>(); 319 320 public Builder setAppenderRefs(final AppenderRef[] appenderRefs) { 321 this.appenderRefs = appenderRefs; 322 return this; 323 } 324 325 public Builder setErrorRef(final String errorRef) { 326 this.errorRef = errorRef; 327 return this; 328 } 329 330 public Builder setBlocking(final boolean blocking) { 331 this.blocking = blocking; 332 return this; 333 } 334 335 public Builder setShutdownTimeout(final long shutdownTimeout) { 336 this.shutdownTimeout = shutdownTimeout; 337 return this; 338 } 339 340 public Builder setBufferSize(final int bufferSize) { 341 this.bufferSize = bufferSize; 342 return this; 343 } 344 345 public Builder setName(final String name) { 346 this.name = name; 347 return this; 348 } 349 350 public Builder setIncludeLocation(final boolean includeLocation) { 351 this.includeLocation = includeLocation; 352 return this; 353 } 354 355 public Builder setConfiguration(final Configuration configuration) { 356 this.configuration = configuration; 357 return this; 358 } 359 360 public Builder setIgnoreExceptions(final boolean ignoreExceptions) { 361 this.ignoreExceptions = ignoreExceptions; 362 return this; 363 } 364 365 public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) { 366 this.blockingQueueFactory = blockingQueueFactory; 367 return this; 368 } 369 370 @Override 371 public AsyncAppender build() { 372 return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions, 373 shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray()); 374 } 375 } 376 377 /** 378 * Thread that calls the Appenders. 379 */ 380 private class AsyncThread extends Log4jThread { 381 382 private volatile boolean shutdown = false; 383 private final List<AppenderControl> appenders; 384 private final BlockingQueue<LogEvent> queue; 385 386 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) { 387 super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement()); 388 this.appenders = appenders; 389 this.queue = queue; 390 setDaemon(true); 391 } 392 393 @Override 394 public void run() { 395 while (!shutdown) { 396 LogEvent event; 397 try { 398 event = queue.take(); 399 if (event == SHUTDOWN_LOG_EVENT) { 400 shutdown = true; 401 continue; 402 } 403 } catch (final InterruptedException ex) { 404 break; // LOG4J2-830 405 } 406 event.setEndOfBatch(queue.isEmpty()); 407 final boolean success = callAppenders(event); 408 if (!success && errorAppender != null) { 409 try { 410 errorAppender.callAppender(event); 411 } catch (final Exception ex) { 412 // Silently accept the error. 413 } 414 } 415 } 416 // Process any remaining items in the queue. 417 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", 418 queue.size()); 419 int count = 0; 420 int ignored = 0; 421 while (!queue.isEmpty()) { 422 try { 423 final LogEvent event = queue.take(); 424 if (event instanceof Log4jLogEvent) { 425 final Log4jLogEvent logEvent = (Log4jLogEvent) event; 426 logEvent.setEndOfBatch(queue.isEmpty()); 427 callAppenders(logEvent); 428 count++; 429 } else { 430 ignored++; 431 LOGGER.trace("Ignoring event of class {}", event.getClass().getName()); 432 } 433 } catch (final InterruptedException ex) { 434 // May have been interrupted to shut down. 435 // Here we ignore interrupts and try to process all remaining events. 436 } 437 } 438 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " 439 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); 440 } 441 442 /** 443 * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl} 444 * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any 445 * exceptions are silently ignored. 446 * 447 * @param event the event to forward to the registered appenders 448 * @return {@code true} if at least one appender call succeeded, {@code false} otherwise 449 */ 450 boolean callAppenders(final LogEvent event) { 451 boolean success = false; 452 for (final AppenderControl control : appenders) { 453 try { 454 control.callAppender(event); 455 success = true; 456 } catch (final Exception ex) { 457 // If no appender is successful the error appender will get it. 458 } 459 } 460 return success; 461 } 462 463 public void shutdown() { 464 shutdown = true; 465 if (queue.isEmpty()) { 466 queue.offer(SHUTDOWN_LOG_EVENT); 467 } 468 if (getState() == State.TIMED_WAITING || getState() == State.WAITING) { 469 this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call 470 } 471 } 472 } 473 474 /** 475 * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings. 476 * 477 * @return the names of the sink appenders 478 */ 479 public String[] getAppenderRefStrings() { 480 final String[] result = new String[appenderRefs.length]; 481 for (int i = 0; i < result.length; i++) { 482 result[i] = appenderRefs[i].getRef(); 483 } 484 return result; 485 } 486 487 /** 488 * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine 489 * the class and method where the logging call was made. 490 * 491 * @return {@code true} if location is included with every event, {@code false} otherwise 492 */ 493 public boolean isIncludeLocation() { 494 return includeLocation; 495 } 496 497 /** 498 * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are 499 * dropped when the queue is full. 500 * 501 * @return whether this AsyncAppender will block or drop events when the queue is full. 502 */ 503 public boolean isBlocking() { 504 return blocking; 505 } 506 507 /** 508 * Returns the name of the appender that any errors are logged to or {@code null}. 509 * 510 * @return the name of the appender that any errors are logged to or {@code null} 511 */ 512 public String getErrorRef() { 513 return errorRef; 514 } 515 516 public int getQueueCapacity() { 517 return queueSize; 518 } 519 520 public int getQueueRemainingCapacity() { 521 return queue.remainingCapacity(); 522 } 523 524 /** 525 * Returns the number of elements in the queue. 526 * 527 * @return the number of elements in the queue. 528 * @since 2.11.1 529 */ 530 public int getQueueSize() { 531 return queue.size(); 532 } 533}