001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.async; 018 019import java.util.Map; 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.Executors; 022 023import org.apache.logging.log4j.Level; 024import org.apache.logging.log4j.Marker; 025import org.apache.logging.log4j.ThreadContext; 026import org.apache.logging.log4j.core.Logger; 027import org.apache.logging.log4j.core.LoggerContext; 028import org.apache.logging.log4j.core.config.Property; 029import org.apache.logging.log4j.core.impl.Log4jLogEvent; 030import org.apache.logging.log4j.core.jmx.RingBufferAdmin; 031import org.apache.logging.log4j.core.util.Clock; 032import org.apache.logging.log4j.core.util.ClockFactory; 033import org.apache.logging.log4j.core.util.Integers; 034import org.apache.logging.log4j.core.util.Loader; 035import org.apache.logging.log4j.message.Message; 036import org.apache.logging.log4j.message.MessageFactory; 037import org.apache.logging.log4j.message.TimestampMessage; 038import org.apache.logging.log4j.status.StatusLogger; 039import org.apache.logging.log4j.util.PropertiesUtil; 040 041import com.lmax.disruptor.BlockingWaitStrategy; 042import com.lmax.disruptor.ExceptionHandler; 043import com.lmax.disruptor.RingBuffer; 044import com.lmax.disruptor.SleepingWaitStrategy; 045import com.lmax.disruptor.WaitStrategy; 046import com.lmax.disruptor.YieldingWaitStrategy; 047import com.lmax.disruptor.dsl.Disruptor; 048import com.lmax.disruptor.dsl.ProducerType; 049 050/** 051 * AsyncLogger is a logger designed for high throughput and low latency logging. 052 * It does not perform any I/O in the calling (application) thread, but instead 053 * hands off the work to another thread as soon as possible. The actual logging 054 * is performed in the background thread. It uses the LMAX Disruptor library for 055 * inter-thread communication. (<a 056 * href="http://lmax-exchange.github.com/disruptor/" 057 * >http://lmax-exchange.github.com/disruptor/</a>) 058 * <p> 059 * To use AsyncLogger, specify the System property 060 * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector} 061 * before you obtain a Logger, and all Loggers returned by LogManager.getLogger 062 * will be AsyncLoggers. 063 * <p> 064 * Note that for performance reasons, this logger does not include source 065 * location by default. You need to specify {@code includeLocation="true"} in 066 * the configuration or any %class, %location or %line conversion patterns in 067 * your log4j.xml configuration will produce either a "?" character or no output 068 * at all. 069 * <p> 070 * For best performance, use AsyncLogger with the RandomAccessFileAppender or 071 * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders 072 * have built-in support for the batching mechanism used by the Disruptor 073 * library, and they will flush to disk at the end of each batch. This means 074 * that even with immediateFlush=false, there will never be any items left in 075 * the buffer; all log events will all be written to disk in a very efficient 076 * manner. 077 */ 078public class AsyncLogger extends Logger { 079 private static final long serialVersionUID = 1L; 080 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50; 081 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200; 082 private static final int RINGBUFFER_MIN_SIZE = 128; 083 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024; 084 private static final StatusLogger LOGGER = StatusLogger.getLogger(); 085 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create(); 086 private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>(); 087 088 static enum ThreadNameStrategy { // LOG4J2-467 089 CACHED { 090 @Override 091 public String getThreadName(final Info info) { 092 return info.cachedThreadName; 093 } 094 }, 095 UNCACHED { 096 @Override 097 public String getThreadName(final Info info) { 098 return Thread.currentThread().getName(); 099 } 100 }; 101 abstract String getThreadName(Info info); 102 103 static ThreadNameStrategy create() { 104 final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", CACHED.name()); 105 try { 106 return ThreadNameStrategy.valueOf(name); 107 } catch (final Exception ex) { 108 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString()); 109 return CACHED; 110 } 111 } 112 } 113 private static volatile Disruptor<RingBufferLogEvent> disruptor; 114 private static final Clock clock = ClockFactory.getClock(); 115 116 private static final ExecutorService executor = Executors 117 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-")); 118 119 static { 120 initInfoForExecutorThread(); 121 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY); 122 final int ringBufferSize = calculateRingBufferSize(); 123 124 final WaitStrategy waitStrategy = createWaitStrategy(); 125 disruptor = new Disruptor<RingBufferLogEvent>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, 126 ProducerType.MULTI, waitStrategy); 127 disruptor.handleExceptionsWith(getExceptionHandler()); 128 disruptor.handleEventsWith(new RingBufferLogEventHandler()); 129 130 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer() 131 .getBufferSize()); 132 disruptor.start(); 133 } 134 135 private static int calculateRingBufferSize() { 136 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE; 137 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.RingBufferSize", 138 String.valueOf(ringBufferSize)); 139 try { 140 int size = Integer.parseInt(userPreferredRBSize); 141 if (size < RINGBUFFER_MIN_SIZE) { 142 size = RINGBUFFER_MIN_SIZE; 143 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize, 144 RINGBUFFER_MIN_SIZE); 145 } 146 ringBufferSize = size; 147 } catch (final Exception ex) { 148 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize); 149 } 150 return Integers.ceilingNextPowerOfTwo(ringBufferSize); 151 } 152 153 /** 154 * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread. 155 * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}. 156 * All other Info objects will have this attribute set to {@code false}. 157 * This allows us to detect Logger.log() calls initiated from the appender thread, 158 * which may cause deadlock when the RingBuffer is full. (LOG4J2-471) 159 */ 160 private static void initInfoForExecutorThread() { 161 executor.submit(new Runnable(){ 162 @Override 163 public void run() { 164 final boolean isAppenderThread = true; 165 final Info info = new Info(new RingBufferLogEventTranslator(), // 166 Thread.currentThread().getName(), isAppenderThread); 167 threadlocalInfo.set(info); 168 } 169 }); 170 } 171 172 private static WaitStrategy createWaitStrategy() { 173 final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy"); 174 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); 175 if ("Sleep".equals(strategy)) { 176 return new SleepingWaitStrategy(); 177 } else if ("Yield".equals(strategy)) { 178 return new YieldingWaitStrategy(); 179 } else if ("Block".equals(strategy)) { 180 return new BlockingWaitStrategy(); 181 } 182 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); 183 return new BlockingWaitStrategy(); 184 } 185 186 private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() { 187 final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler"); 188 if (cls == null) { 189 LOGGER.debug("No AsyncLogger.ExceptionHandler specified"); 190 return null; 191 } 192 try { 193 final ExceptionHandler<RingBufferLogEvent> result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class); 194 LOGGER.debug("AsyncLogger.ExceptionHandler={}", result); 195 return result; 196 } catch (final Exception ignored) { 197 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored); 198 return null; 199 } 200 } 201 202 /** 203 * Constructs an {@code AsyncLogger} with the specified context, name and 204 * message factory. 205 * 206 * @param context context of this logger 207 * @param name name of this logger 208 * @param messageFactory message factory of this logger 209 */ 210 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) { 211 super(context, name, messageFactory); 212 } 213 214 /** 215 * Tuple with the event translator and thread name for a thread. 216 */ 217 static class Info { 218 private final RingBufferLogEventTranslator translator; 219 private final String cachedThreadName; 220 private final boolean isAppenderThread; 221 public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) { 222 this.translator = translator; 223 this.cachedThreadName = threadName; 224 this.isAppenderThread = appenderThread; 225 } 226 } 227 228 @Override 229 public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) { 230 // TODO refactor to reduce size to <= 35 bytecodes to allow JVM to inline it 231 Info info = threadlocalInfo.get(); 232 if (info == null) { 233 info = new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false); 234 threadlocalInfo.set(info); 235 } 236 237 final Disruptor<RingBufferLogEvent> temp = disruptor; 238 if (temp == null) { // LOG4J2-639 239 LOGGER.fatal("Ignoring log event after log4j was shut down"); 240 return; 241 } 242 243 // LOG4J2-471: prevent deadlock when RingBuffer is full and object 244 // being logged calls Logger.log() from its toString() method 245 if (info.isAppenderThread && temp.getRingBuffer().remainingCapacity() == 0) { 246 // bypass RingBuffer and invoke Appender directly 247 config.loggerConfig.log(getName(), fqcn, marker, level, message, thrown); 248 return; 249 } 250 message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters 251 final boolean includeLocation = config.loggerConfig.isIncludeLocation(); 252 info.translator.setValues(this, getName(), marker, fqcn, level, message, // 253 // don't construct ThrowableProxy until required 254 thrown, // 255 256 // config properties are taken care of in the EventHandler 257 // thread in the #actualAsyncLog method 258 259 // needs shallow copy to be fast (LOG4J2-154) 260 ThreadContext.getImmutableContext(), // 261 262 // needs shallow copy to be fast (LOG4J2-154) 263 ThreadContext.getImmutableStack(), // 264 265 // Thread.currentThread().getName(), // 266 // info.cachedThreadName, // 267 THREAD_NAME_STRATEGY.getThreadName(info), // LOG4J2-467 268 269 // location: very expensive operation. LOG4J2-153: 270 // Only include if "includeLocation=true" is specified, 271 // exclude if not specified or if "false" was specified. 272 includeLocation ? location(fqcn) : null, 273 274 // System.currentTimeMillis()); 275 // CoarseCachedClock: 20% faster than system clock, 16ms gaps 276 // CachedClock: 10% faster than system clock, smaller gaps 277 // LOG4J2-744 avoid calling clock altogether if message has the timestamp 278 message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() : 279 clock.currentTimeMillis()); 280 281 // LOG4J2-639: catch NPE if disruptor field was set to null after our check above 282 try { 283 // Note: do NOT use the temp variable above! 284 // That could result in adding a log event to the disruptor after it was shut down, 285 // which could cause the publishEvent method to hang and never return. 286 disruptor.publishEvent(info.translator); 287 } catch (final NullPointerException npe) { 288 LOGGER.fatal("Ignoring log event after log4j was shut down."); 289 } 290 } 291 292 private static StackTraceElement location(final String fqcnOfLogger) { 293 return Log4jLogEvent.calcLocation(fqcnOfLogger); 294 } 295 296 /** 297 * This method is called by the EventHandler that processes the 298 * RingBufferLogEvent in a separate thread. 299 * 300 * @param event the event to log 301 */ 302 public void actualAsyncLog(final RingBufferLogEvent event) { 303 final Map<Property, Boolean> properties = config.loggerConfig.getProperties(); 304 event.mergePropertiesIntoContextMap(properties, config.config.getStrSubstitutor()); 305 config.logEvent(event); 306 } 307 308 public static void stop() { 309 final Disruptor<RingBufferLogEvent> temp = disruptor; 310 311 // Must guarantee that publishing to the RingBuffer has stopped 312 // before we call disruptor.shutdown() 313 disruptor = null; // client code fails with NPE if log after stop = OK 314 if (temp == null) { 315 return; // stop() has already been called 316 } 317 318 // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, 319 // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, 320 // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. 321 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { 322 try { 323 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while 324 } catch (final InterruptedException e) { // ignored 325 } 326 } 327 temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed 328 executor.shutdown(); // finally, kill the processor thread 329 threadlocalInfo.remove(); // LOG4J2-323 330 } 331 332 /** 333 * Returns {@code true} if the specified disruptor still has unprocessed events. 334 */ 335 private static boolean hasBacklog(final Disruptor<?> disruptor) { 336 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer(); 337 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize()); 338 } 339 340 /** 341 * Creates and returns a new {@code RingBufferAdmin} that instruments the 342 * ringbuffer of the {@code AsyncLogger}. 343 * 344 * @param contextName name of the global {@code AsyncLoggerContext} 345 */ 346 public static RingBufferAdmin createRingBufferAdmin(final String contextName) { 347 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName); 348 } 349}