001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.concurrent.ThreadFactory;
020import java.util.concurrent.TimeUnit;
021
022import org.apache.logging.log4j.Level;
023import org.apache.logging.log4j.core.AbstractLifeCycle;
024import org.apache.logging.log4j.core.LogEvent;
025import org.apache.logging.log4j.core.impl.Log4jLogEvent;
026import org.apache.logging.log4j.core.impl.LogEventFactory;
027import org.apache.logging.log4j.core.impl.MutableLogEvent;
028import org.apache.logging.log4j.core.impl.ReusableLogEventFactory;
029import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
030import org.apache.logging.log4j.core.util.Log4jThreadFactory;
031import org.apache.logging.log4j.core.util.Throwables;
032import org.apache.logging.log4j.message.ReusableMessage;
033
034import com.lmax.disruptor.EventFactory;
035import com.lmax.disruptor.EventTranslatorTwoArg;
036import com.lmax.disruptor.ExceptionHandler;
037import com.lmax.disruptor.RingBuffer;
038import com.lmax.disruptor.Sequence;
039import com.lmax.disruptor.SequenceReportingEventHandler;
040import com.lmax.disruptor.TimeoutException;
041import com.lmax.disruptor.WaitStrategy;
042import com.lmax.disruptor.dsl.Disruptor;
043import com.lmax.disruptor.dsl.ProducerType;
044
045/**
046 * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
047 * <p>
048 * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or
049 * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or
050 * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in
051 * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins
052 * definition file.
053 * <p>
054 * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the
055 * {@code AsyncLoggerConfig} is actually used.
056 */
057public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate {
058
059    private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
060    private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
061
062    /**
063     * RingBuffer events contain all information necessary to perform the work in a separate thread.
064     */
065    public static class Log4jEventWrapper {
066        public Log4jEventWrapper() {
067        }
068
069        public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) {
070            event = mutableLogEvent;
071        }
072
073        private AsyncLoggerConfig loggerConfig;
074        private LogEvent event;
075
076        /**
077         * Release references held by ring buffer to allow objects to be garbage-collected.
078         */
079        public void clear() {
080            loggerConfig = null;
081            if (event instanceof MutableLogEvent) {
082                ((MutableLogEvent) event).clear();
083            } else {
084                event = null;
085            }
086        }
087
088        @Override
089        public String toString() {
090            return String.valueOf(event);
091        }
092    }
093
094    /**
095     * EventHandler performs the work in a separate thread.
096     */
097    private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
098        private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
099        private Sequence sequenceCallback;
100        private int counter;
101
102        @Override
103        public void setSequenceCallback(final Sequence sequenceCallback) {
104            this.sequenceCallback = sequenceCallback;
105        }
106
107        @Override
108        public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
109                throws Exception {
110            event.event.setEndOfBatch(endOfBatch);
111            event.loggerConfig.logToAsyncLoggerConfigsOnCurrentThread(event.event);
112            event.clear();
113
114            notifyIntermediateProgress(sequence);
115        }
116
117        /**
118         * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
119         * be progressed until the batch has completely finished.
120         */
121        private void notifyIntermediateProgress(final long sequence) {
122            if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
123                sequenceCallback.set(sequence);
124                counter = 0;
125            }
126        }
127    }
128
129    /**
130     * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
131     * RingBuffer.
132     */
133    private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
134        @Override
135        public Log4jEventWrapper newInstance() {
136            return new Log4jEventWrapper();
137        }
138    };
139
140    /**
141     * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
142     * RingBuffer.
143     */
144    private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = new EventFactory<Log4jEventWrapper>() {
145        @Override
146        public Log4jEventWrapper newInstance() {
147            return new Log4jEventWrapper(new MutableLogEvent());
148        }
149    };
150
151    /**
152     * Object responsible for passing on data to a specific RingBuffer event.
153     */
154    private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
155            new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
156
157        @Override
158        public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
159                final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
160            ringBufferElement.event = logEvent;
161            ringBufferElement.loggerConfig = loggerConfig;
162        }
163    };
164
165    /**
166     * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent.
167     */
168    private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR =
169            new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
170
171        @Override
172        public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
173                final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
174            ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent);
175            ringBufferElement.loggerConfig = loggerConfig;
176        }
177    };
178
179    private int ringBufferSize;
180    private AsyncQueueFullPolicy asyncQueueFullPolicy;
181    private Boolean mutable = Boolean.FALSE;
182
183    private volatile Disruptor<Log4jEventWrapper> disruptor;
184    private long backgroundThreadId; // LOG4J2-471
185    private EventFactory<Log4jEventWrapper> factory;
186    private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator;
187    private volatile boolean alreadyLoggedWarning = false;
188
189    private final Object queueFullEnqueueLock = new Object();
190
191    public AsyncLoggerConfigDisruptor() {
192    }
193
194    // called from AsyncLoggerConfig constructor
195    @Override
196    public void setLogEventFactory(final LogEventFactory logEventFactory) {
197        // if any AsyncLoggerConfig uses a ReusableLogEventFactory
198        // then we need to populate our ringbuffer with MutableLogEvents
199        this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory);
200    }
201
202    /**
203     * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently
204     * exists.
205     *
206     * @see #stop()
207     */
208    @Override
209    public synchronized void start() {
210        if (disruptor != null) {
211            LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, "
212                    + "using existing object.");
213            return;
214        }
215        LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration.");
216        ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
217        final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
218
219        final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLoggerConfig", true, Thread.NORM_PRIORITY) {
220            @Override
221            public Thread newThread(final Runnable r) {
222                final Thread result = super.newThread(r);
223                backgroundThreadId = result.getId();
224                return result;
225            }
226        };
227        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
228
229        translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR;
230        factory = mutable ? MUTABLE_FACTORY : FACTORY;
231        disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy);
232
233        final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
234        disruptor.setDefaultExceptionHandler(errorHandler);
235
236        final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
237        disruptor.handleEventsWith(handlers);
238
239        LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
240                + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
241                .getClass().getSimpleName(), errorHandler);
242        disruptor.start();
243        super.start();
244    }
245
246    /**
247     * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
248     * shut down and their references set to {@code null}.
249     */
250    @Override
251    public boolean stop(final long timeout, final TimeUnit timeUnit) {
252        final Disruptor<Log4jEventWrapper> temp = disruptor;
253        if (temp == null) {
254            LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down.");
255            return true; // disruptor was already shut down by another thread
256        }
257        setStopping();
258        LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration.");
259
260        // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
261        disruptor = null; // client code fails with NPE if log after stop = OK
262
263        // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
264        // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
265        // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
266        for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
267            try {
268                Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
269            } catch (final InterruptedException e) { // ignored
270            }
271        }
272        try {
273            // busy-spins until all events currently in the disruptor have been processed, or timeout
274            temp.shutdown(timeout, timeUnit);
275        } catch (final TimeoutException e) {
276            LOGGER.warn("AsyncLoggerConfigDisruptor: shutdown timed out after {} {}", timeout, timeUnit);
277            temp.halt(); // give up on remaining log events, if any
278        }
279        LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor has been shut down.");
280
281        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
282            LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
283                    DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
284        }
285        setStopped();
286        return true;
287    }
288
289    /**
290     * Returns {@code true} if the specified disruptor still has unprocessed events.
291     */
292    private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
293        final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
294        return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
295    }
296
297    @Override
298    public EventRoute getEventRoute(final Level logLevel) {
299        final int remainingCapacity = remainingDisruptorCapacity();
300        if (remainingCapacity < 0) {
301            return EventRoute.DISCARD;
302        }
303        return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
304    }
305
306    private int remainingDisruptorCapacity() {
307        final Disruptor<Log4jEventWrapper> temp = disruptor;
308        if (hasLog4jBeenShutDown(temp)) {
309            return -1;
310        }
311        return (int) temp.getRingBuffer().remainingCapacity();
312    }
313
314    /**
315     * Returns {@code true} if the specified disruptor is null.
316     */
317    private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
318        if (aDisruptor == null) { // LOG4J2-639
319            LOGGER.warn("Ignoring log event after log4j was shut down");
320            return true;
321        }
322        return false;
323    }
324
325    @Override
326    public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
327        // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
328        try {
329            final LogEvent logEvent = prepareEvent(event);
330            enqueue(logEvent, asyncLoggerConfig);
331        } catch (final NullPointerException npe) {
332            // Note: NPE prevents us from adding a log event to the disruptor after it was shut down,
333            // which could cause the publishEvent method to hang and never return.
334            LOGGER.warn("Ignoring log event after log4j was shut down: {} [{}] {}", event.getLevel(),
335                    event.getLoggerName(), event.getMessage().getFormattedMessage()
336                            + (event.getThrown() == null ? "" : Throwables.toStringList(event.getThrown())));
337        }
338    }
339
340    private LogEvent prepareEvent(final LogEvent event) {
341        LogEvent logEvent = ensureImmutable(event);
342        if (logEvent.getMessage() instanceof ReusableMessage) {
343            if (logEvent instanceof Log4jLogEvent) {
344                ((Log4jLogEvent) logEvent).makeMessageImmutable();
345            } else if (logEvent instanceof MutableLogEvent) {
346                // MutableLogEvents need to be translated into the RingBuffer by the MUTABLE_TRANSLATOR.
347                // That translator calls MutableLogEvent.initFrom to copy the event, which will makeMessageImmutable the message.
348                if (translator != MUTABLE_TRANSLATOR) { // should not happen...
349                    // TRANSLATOR expects an immutable LogEvent
350                    logEvent = ((MutableLogEvent) logEvent).createMemento();
351                }
352            } else { // custom log event, with a ReusableMessage
353                showWarningAboutCustomLogEventWithReusableMessage(logEvent);
354            }
355        } else { // message is not a ReusableMessage; makeMessageImmutable it to prevent ConcurrentModificationExceptions
356            InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); // LOG4J2-1988, LOG4J2-1914
357        }
358        return logEvent;
359    }
360
361    private void showWarningAboutCustomLogEventWithReusableMessage(final LogEvent logEvent) {
362        if (!alreadyLoggedWarning) {
363            LOGGER.warn("Custom log event of type {} contains a mutable message of type {}." +
364                            " AsyncLoggerConfig does not know how to make an immutable copy of this message." +
365                            " This may result in ConcurrentModificationExceptions or incorrect log messages" +
366                            " if the application modifies objects in the message while" +
367                            " the background thread is writing it to the appenders.",
368                    logEvent.getClass().getName(), logEvent.getMessage().getClass().getName());
369            alreadyLoggedWarning = true;
370        }
371    }
372
373    private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
374        if (synchronizeEnqueueWhenQueueFull()) {
375            synchronized (queueFullEnqueueLock) {
376                disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
377            }
378        } else {
379            disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
380        }
381    }
382
383    private boolean synchronizeEnqueueWhenQueueFull() {
384        return DisruptorUtil.ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL
385                // Background thread must never block
386                && backgroundThreadId != Thread.currentThread().getId();
387    }
388
389    @Override
390    public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
391        final LogEvent logEvent = prepareEvent(event);
392        return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
393    }
394
395    private LogEvent ensureImmutable(final LogEvent event) {
396        LogEvent result = event;
397        if (event instanceof RingBufferLogEvent) {
398            // Deal with special case where both types of Async Loggers are used together:
399            // RingBufferLogEvents are created by the all-loggers-async type, but
400            // this event is also consumed by the some-loggers-async type (this class).
401            // The original event will be re-used and modified in an application thread later,
402            // so take a snapshot of it, which can be safely processed in the
403            // some-loggers-async background thread.
404            result = ((RingBufferLogEvent) event).createMemento();
405        }
406        return result;
407    }
408
409    /*
410     * (non-Javadoc)
411     *
412     * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String,
413     * java.lang.String)
414     */
415    @Override
416    public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
417        return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
418    }
419}