001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.concurrent.ThreadFactory; 020import java.util.concurrent.TimeUnit; 021 022import org.apache.logging.log4j.Level; 023import org.apache.logging.log4j.core.AbstractLifeCycle; 024import org.apache.logging.log4j.core.LogEvent; 025import org.apache.logging.log4j.core.impl.Log4jLogEvent; 026import org.apache.logging.log4j.core.impl.LogEventFactory; 027import org.apache.logging.log4j.core.impl.MutableLogEvent; 028import org.apache.logging.log4j.core.impl.ReusableLogEventFactory; 029import org.apache.logging.log4j.core.jmx.RingBufferAdmin; 030import org.apache.logging.log4j.core.util.Log4jThreadFactory; 031import org.apache.logging.log4j.core.util.Throwables; 032import org.apache.logging.log4j.message.ReusableMessage; 033 034import com.lmax.disruptor.EventFactory; 035import com.lmax.disruptor.EventTranslatorTwoArg; 036import com.lmax.disruptor.ExceptionHandler; 037import com.lmax.disruptor.RingBuffer; 038import com.lmax.disruptor.Sequence; 039import com.lmax.disruptor.SequenceReportingEventHandler; 040import com.lmax.disruptor.TimeoutException; 041import com.lmax.disruptor.WaitStrategy; 042import com.lmax.disruptor.dsl.Disruptor; 043import com.lmax.disruptor.dsl.ProducerType; 044 045/** 046 * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library. 047 * <p> 048 * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or 049 * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or 050 * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in 051 * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins 052 * definition file. 053 * <p> 054 * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the 055 * {@code AsyncLoggerConfig} is actually used. 056 */ 057public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate { 058 059 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200; 060 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50; 061 062 /** 063 * RingBuffer events contain all information necessary to perform the work in a separate thread. 064 */ 065 public static class Log4jEventWrapper { 066 public Log4jEventWrapper() { 067 } 068 069 public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) { 070 event = mutableLogEvent; 071 } 072 073 private AsyncLoggerConfig loggerConfig; 074 private LogEvent event; 075 076 /** 077 * Release references held by ring buffer to allow objects to be garbage-collected. 078 */ 079 public void clear() { 080 loggerConfig = null; 081 if (event instanceof MutableLogEvent) { 082 ((MutableLogEvent) event).clear(); 083 } else { 084 event = null; 085 } 086 } 087 088 @Override 089 public String toString() { 090 return String.valueOf(event); 091 } 092 } 093 094 /** 095 * EventHandler performs the work in a separate thread. 096 */ 097 private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> { 098 private static final int NOTIFY_PROGRESS_THRESHOLD = 50; 099 private Sequence sequenceCallback; 100 private int counter; 101 102 @Override 103 public void setSequenceCallback(final Sequence sequenceCallback) { 104 this.sequenceCallback = sequenceCallback; 105 } 106 107 @Override 108 public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch) 109 throws Exception { 110 event.event.setEndOfBatch(endOfBatch); 111 event.loggerConfig.logToAsyncLoggerConfigsOnCurrentThread(event.event); 112 event.clear(); 113 114 notifyIntermediateProgress(sequence); 115 } 116 117 /** 118 * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not 119 * be progressed until the batch has completely finished. 120 */ 121 private void notifyIntermediateProgress(final long sequence) { 122 if (++counter > NOTIFY_PROGRESS_THRESHOLD) { 123 sequenceCallback.set(sequence); 124 counter = 0; 125 } 126 } 127 } 128 129 /** 130 * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the 131 * RingBuffer. 132 */ 133 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() { 134 @Override 135 public Log4jEventWrapper newInstance() { 136 return new Log4jEventWrapper(); 137 } 138 }; 139 140 /** 141 * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the 142 * RingBuffer. 143 */ 144 private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = new EventFactory<Log4jEventWrapper>() { 145 @Override 146 public Log4jEventWrapper newInstance() { 147 return new Log4jEventWrapper(new MutableLogEvent()); 148 } 149 }; 150 151 /** 152 * Object responsible for passing on data to a specific RingBuffer event. 153 */ 154 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR = 155 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() { 156 157 @Override 158 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence, 159 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) { 160 ringBufferElement.event = logEvent; 161 ringBufferElement.loggerConfig = loggerConfig; 162 } 163 }; 164 165 /** 166 * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent. 167 */ 168 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR = 169 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() { 170 171 @Override 172 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence, 173 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) { 174 ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent); 175 ringBufferElement.loggerConfig = loggerConfig; 176 } 177 }; 178 179 private int ringBufferSize; 180 private AsyncQueueFullPolicy asyncQueueFullPolicy; 181 private Boolean mutable = Boolean.FALSE; 182 183 private volatile Disruptor<Log4jEventWrapper> disruptor; 184 private long backgroundThreadId; // LOG4J2-471 185 private EventFactory<Log4jEventWrapper> factory; 186 private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator; 187 private volatile boolean alreadyLoggedWarning = false; 188 189 private final Object queueFullEnqueueLock = new Object(); 190 191 public AsyncLoggerConfigDisruptor() { 192 } 193 194 // called from AsyncLoggerConfig constructor 195 @Override 196 public void setLogEventFactory(final LogEventFactory logEventFactory) { 197 // if any AsyncLoggerConfig uses a ReusableLogEventFactory 198 // then we need to populate our ringbuffer with MutableLogEvents 199 this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory); 200 } 201 202 /** 203 * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently 204 * exists. 205 * 206 * @see #stop() 207 */ 208 @Override 209 public synchronized void start() { 210 if (disruptor != null) { 211 LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, " 212 + "using existing object."); 213 return; 214 } 215 LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration."); 216 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize"); 217 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy"); 218 219 final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLoggerConfig", true, Thread.NORM_PRIORITY) { 220 @Override 221 public Thread newThread(final Runnable r) { 222 final Thread result = super.newThread(r); 223 backgroundThreadId = result.getId(); 224 return result; 225 } 226 }; 227 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); 228 229 translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR; 230 factory = mutable ? MUTABLE_FACTORY : FACTORY; 231 disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy); 232 233 final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler(); 234 disruptor.setDefaultExceptionHandler(errorHandler); 235 236 final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()}; 237 disruptor.handleEventsWith(handlers); 238 239 LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, " 240 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy 241 .getClass().getSimpleName(), errorHandler); 242 disruptor.start(); 243 super.start(); 244 } 245 246 /** 247 * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are 248 * shut down and their references set to {@code null}. 249 */ 250 @Override 251 public boolean stop(final long timeout, final TimeUnit timeUnit) { 252 final Disruptor<Log4jEventWrapper> temp = disruptor; 253 if (temp == null) { 254 LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down."); 255 return true; // disruptor was already shut down by another thread 256 } 257 setStopping(); 258 LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration."); 259 260 // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown(). 261 disruptor = null; // client code fails with NPE if log after stop = OK 262 263 // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, 264 // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, 265 // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. 266 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 267 try { 268 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while 269 } catch (final InterruptedException e) { // ignored 270 } 271 } 272 try { 273 // busy-spins until all events currently in the disruptor have been processed, or timeout 274 temp.shutdown(timeout, timeUnit); 275 } catch (final TimeoutException e) { 276 LOGGER.warn("AsyncLoggerConfigDisruptor: shutdown timed out after {} {}", timeout, timeUnit); 277 temp.halt(); // give up on remaining log events, if any 278 } 279 LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor has been shut down."); 280 281 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { 282 LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy, 283 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); 284 } 285 setStopped(); 286 return true; 287 } 288 289 /** 290 * Returns {@code true} if the specified disruptor still has unprocessed events. 291 */ 292 private static boolean hasBacklog(final Disruptor<?> theDisruptor) { 293 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer(); 294 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize()); 295 } 296 297 @Override 298 public EventRoute getEventRoute(final Level logLevel) { 299 final int remainingCapacity = remainingDisruptorCapacity(); 300 if (remainingCapacity < 0) { 301 return EventRoute.DISCARD; 302 } 303 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel); 304 } 305 306 private int remainingDisruptorCapacity() { 307 final Disruptor<Log4jEventWrapper> temp = disruptor; 308 if (hasLog4jBeenShutDown(temp)) { 309 return -1; 310 } 311 return (int) temp.getRingBuffer().remainingCapacity(); 312 } 313 314 /** 315 * Returns {@code true} if the specified disruptor is null. 316 */ 317 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) { 318 if (aDisruptor == null) { // LOG4J2-639 319 LOGGER.warn("Ignoring log event after log4j was shut down"); 320 return true; 321 } 322 return false; 323 } 324 325 @Override 326 public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { 327 // LOG4J2-639: catch NPE if disruptor field was set to null after our check above 328 try { 329 final LogEvent logEvent = prepareEvent(event); 330 enqueue(logEvent, asyncLoggerConfig); 331 } catch (final NullPointerException npe) { 332 // Note: NPE prevents us from adding a log event to the disruptor after it was shut down, 333 // which could cause the publishEvent method to hang and never return. 334 LOGGER.warn("Ignoring log event after log4j was shut down: {} [{}] {}", event.getLevel(), 335 event.getLoggerName(), event.getMessage().getFormattedMessage() 336 + (event.getThrown() == null ? "" : Throwables.toStringList(event.getThrown()))); 337 } 338 } 339 340 private LogEvent prepareEvent(final LogEvent event) { 341 LogEvent logEvent = ensureImmutable(event); 342 if (logEvent.getMessage() instanceof ReusableMessage) { 343 if (logEvent instanceof Log4jLogEvent) { 344 ((Log4jLogEvent) logEvent).makeMessageImmutable(); 345 } else if (logEvent instanceof MutableLogEvent) { 346 // MutableLogEvents need to be translated into the RingBuffer by the MUTABLE_TRANSLATOR. 347 // That translator calls MutableLogEvent.initFrom to copy the event, which will makeMessageImmutable the message. 348 if (translator != MUTABLE_TRANSLATOR) { // should not happen... 349 // TRANSLATOR expects an immutable LogEvent 350 logEvent = ((MutableLogEvent) logEvent).createMemento(); 351 } 352 } else { // custom log event, with a ReusableMessage 353 showWarningAboutCustomLogEventWithReusableMessage(logEvent); 354 } 355 } else { // message is not a ReusableMessage; makeMessageImmutable it to prevent ConcurrentModificationExceptions 356 InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); // LOG4J2-1988, LOG4J2-1914 357 } 358 return logEvent; 359 } 360 361 private void showWarningAboutCustomLogEventWithReusableMessage(final LogEvent logEvent) { 362 if (!alreadyLoggedWarning) { 363 LOGGER.warn("Custom log event of type {} contains a mutable message of type {}." + 364 " AsyncLoggerConfig does not know how to make an immutable copy of this message." + 365 " This may result in ConcurrentModificationExceptions or incorrect log messages" + 366 " if the application modifies objects in the message while" + 367 " the background thread is writing it to the appenders.", 368 logEvent.getClass().getName(), logEvent.getMessage().getClass().getName()); 369 alreadyLoggedWarning = true; 370 } 371 } 372 373 private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) { 374 if (synchronizeEnqueueWhenQueueFull()) { 375 synchronized (queueFullEnqueueLock) { 376 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig); 377 } 378 } else { 379 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig); 380 } 381 } 382 383 private boolean synchronizeEnqueueWhenQueueFull() { 384 return DisruptorUtil.ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL 385 // Background thread must never block 386 && backgroundThreadId != Thread.currentThread().getId(); 387 } 388 389 @Override 390 public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { 391 final LogEvent logEvent = prepareEvent(event); 392 return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig); 393 } 394 395 private LogEvent ensureImmutable(final LogEvent event) { 396 LogEvent result = event; 397 if (event instanceof RingBufferLogEvent) { 398 // Deal with special case where both types of Async Loggers are used together: 399 // RingBufferLogEvents are created by the all-loggers-async type, but 400 // this event is also consumed by the some-loggers-async type (this class). 401 // The original event will be re-used and modified in an application thread later, 402 // so take a snapshot of it, which can be safely processed in the 403 // some-loggers-async background thread. 404 result = ((RingBufferLogEvent) event).createMemento(); 405 } 406 return result; 407 } 408 409 /* 410 * (non-Javadoc) 411 * 412 * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String, 413 * java.lang.String) 414 */ 415 @Override 416 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) { 417 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName); 418 } 419}