001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.async;
018
019import java.util.concurrent.ThreadFactory;
020import java.util.concurrent.TimeUnit;
021
022import org.apache.logging.log4j.Level;
023import org.apache.logging.log4j.core.AbstractLifeCycle;
024import org.apache.logging.log4j.core.LogEvent;
025import org.apache.logging.log4j.core.impl.Log4jLogEvent;
026import org.apache.logging.log4j.core.impl.LogEventFactory;
027import org.apache.logging.log4j.core.impl.MutableLogEvent;
028import org.apache.logging.log4j.core.impl.ReusableLogEventFactory;
029import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
030import org.apache.logging.log4j.core.util.Log4jThreadFactory;
031import org.apache.logging.log4j.core.util.Throwables;
032import org.apache.logging.log4j.message.ReusableMessage;
033
034import com.lmax.disruptor.EventFactory;
035import com.lmax.disruptor.EventTranslatorTwoArg;
036import com.lmax.disruptor.ExceptionHandler;
037import com.lmax.disruptor.RingBuffer;
038import com.lmax.disruptor.Sequence;
039import com.lmax.disruptor.SequenceReportingEventHandler;
040import com.lmax.disruptor.TimeoutException;
041import com.lmax.disruptor.WaitStrategy;
042import com.lmax.disruptor.dsl.Disruptor;
043import com.lmax.disruptor.dsl.ProducerType;
044
045/**
046 * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
047 * <p>
048 * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or
049 * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or
050 * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in
051 * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins
052 * definition file.
053 * <p>
054 * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the
055 * {@code AsyncLoggerConfig} is actually used.
056 */
057public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate {
058
059    private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
060    private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
061
062    /**
063     * RingBuffer events contain all information necessary to perform the work in a separate thread.
064     */
065    public static class Log4jEventWrapper {
066        public Log4jEventWrapper() {
067        }
068
069        public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) {
070            event = mutableLogEvent;
071        }
072
073        private AsyncLoggerConfig loggerConfig;
074        private LogEvent event;
075
076        /**
077         * Release references held by ring buffer to allow objects to be garbage-collected.
078         */
079        public void clear() {
080            loggerConfig = null;
081            if (event instanceof MutableLogEvent) {
082                ((MutableLogEvent) event).clear();
083            } else {
084                event = null;
085            }
086        }
087
088        @Override
089        public String toString() {
090            return String.valueOf(event);
091        }
092    }
093
094    /**
095     * EventHandler performs the work in a separate thread.
096     */
097    private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
098        private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
099        private Sequence sequenceCallback;
100        private int counter;
101
102        @Override
103        public void setSequenceCallback(final Sequence sequenceCallback) {
104            this.sequenceCallback = sequenceCallback;
105        }
106
107        @Override
108        public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
109                throws Exception {
110            event.event.setEndOfBatch(endOfBatch);
111            event.loggerConfig.logToAsyncLoggerConfigsOnCurrentThread(event.event);
112            event.clear();
113
114            notifyIntermediateProgress(sequence);
115        }
116
117        /**
118         * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
119         * be progressed until the batch has completely finished.
120         */
121        private void notifyIntermediateProgress(final long sequence) {
122            if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
123                sequenceCallback.set(sequence);
124                counter = 0;
125            }
126        }
127    }
128
129    /**
130     * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
131     * RingBuffer.
132     */
133    private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
134        @Override
135        public Log4jEventWrapper newInstance() {
136            return new Log4jEventWrapper();
137        }
138    };
139
140    /**
141     * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
142     * RingBuffer.
143     */
144    private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = new EventFactory<Log4jEventWrapper>() {
145        @Override
146        public Log4jEventWrapper newInstance() {
147            return new Log4jEventWrapper(new MutableLogEvent());
148        }
149    };
150
151    /**
152     * Object responsible for passing on data to a specific RingBuffer event.
153     */
154    private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
155            new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
156
157        @Override
158        public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
159                final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
160            ringBufferElement.event = logEvent;
161            ringBufferElement.loggerConfig = loggerConfig;
162        }
163    };
164
165    /**
166     * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent.
167     */
168    private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR =
169            new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
170
171        @Override
172        public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
173                final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
174            ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent);
175            ringBufferElement.loggerConfig = loggerConfig;
176        }
177    };
178
179    private int ringBufferSize;
180    private AsyncQueueFullPolicy asyncQueueFullPolicy;
181    private Boolean mutable = Boolean.FALSE;
182
183    private volatile Disruptor<Log4jEventWrapper> disruptor;
184    private long backgroundThreadId; // LOG4J2-471
185    private EventFactory<Log4jEventWrapper> factory;
186    private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator;
187    private volatile boolean alreadyLoggedWarning = false;
188
189    public AsyncLoggerConfigDisruptor() {
190    }
191
192    // called from AsyncLoggerConfig constructor
193    @Override
194    public void setLogEventFactory(final LogEventFactory logEventFactory) {
195        // if any AsyncLoggerConfig uses a ReusableLogEventFactory
196        // then we need to populate our ringbuffer with MutableLogEvents
197        this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory);
198    }
199
200    /**
201     * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently
202     * exists.
203     *
204     * @see #stop()
205     */
206    @Override
207    public synchronized void start() {
208        if (disruptor != null) {
209            LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, "
210                    + "using existing object.");
211            return;
212        }
213        LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration.");
214        ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
215        final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
216
217        final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLoggerConfig", true, Thread.NORM_PRIORITY) {
218            @Override
219            public Thread newThread(final Runnable r) {
220                final Thread result = super.newThread(r);
221                backgroundThreadId = result.getId();
222                return result;
223            }
224        };
225        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
226
227        translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR;
228        factory = mutable ? MUTABLE_FACTORY : FACTORY;
229        disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy);
230
231        final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
232        disruptor.setDefaultExceptionHandler(errorHandler);
233
234        final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
235        disruptor.handleEventsWith(handlers);
236
237        LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
238                + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
239                .getClass().getSimpleName(), errorHandler);
240        disruptor.start();
241        super.start();
242    }
243
244    /**
245     * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
246     * shut down and their references set to {@code null}.
247     */
248    @Override
249    public boolean stop(final long timeout, final TimeUnit timeUnit) {
250        final Disruptor<Log4jEventWrapper> temp = disruptor;
251        if (temp == null) {
252            LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down.");
253            return true; // disruptor was already shut down by another thread
254        }
255        setStopping();
256        LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration.");
257
258        // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
259        disruptor = null; // client code fails with NPE if log after stop = OK
260
261        // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
262        // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
263        // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
264        for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
265            try {
266                Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
267            } catch (final InterruptedException e) { // ignored
268            }
269        }
270        try {
271            // busy-spins until all events currently in the disruptor have been processed, or timeout
272            temp.shutdown(timeout, timeUnit);
273        } catch (final TimeoutException e) {
274            LOGGER.warn("AsyncLoggerConfigDisruptor: shutdown timed out after {} {}", timeout, timeUnit);
275            temp.halt(); // give up on remaining log events, if any
276        }
277        LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor has been shut down.");
278
279        if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
280            LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
281                    DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
282        }
283        setStopped();
284        return true;
285    }
286
287    /**
288     * Returns {@code true} if the specified disruptor still has unprocessed events.
289     */
290    private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
291        final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
292        return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
293    }
294
295    @Override
296    public EventRoute getEventRoute(final Level logLevel) {
297        final int remainingCapacity = remainingDisruptorCapacity();
298        if (remainingCapacity < 0) {
299            return EventRoute.DISCARD;
300        }
301        return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
302    }
303
304    private int remainingDisruptorCapacity() {
305        final Disruptor<Log4jEventWrapper> temp = disruptor;
306        if (hasLog4jBeenShutDown(temp)) {
307            return -1;
308        }
309        return (int) temp.getRingBuffer().remainingCapacity();
310    }
311
312    /**
313     * Returns {@code true} if the specified disruptor is null.
314     */
315    private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
316        if (aDisruptor == null) { // LOG4J2-639
317            LOGGER.warn("Ignoring log event after log4j was shut down");
318            return true;
319        }
320        return false;
321    }
322
323    @Override
324    public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
325        // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
326        try {
327            final LogEvent logEvent = prepareEvent(event);
328            enqueue(logEvent, asyncLoggerConfig);
329        } catch (final NullPointerException npe) {
330            // Note: NPE prevents us from adding a log event to the disruptor after it was shut down,
331            // which could cause the publishEvent method to hang and never return.
332            LOGGER.warn("Ignoring log event after log4j was shut down: {} [{}] {}", event.getLevel(),
333                    event.getLoggerName(), event.getMessage().getFormattedMessage()
334                            + (event.getThrown() == null ? "" : Throwables.toStringList(event.getThrown())));
335        }
336    }
337
338    private LogEvent prepareEvent(final LogEvent event) {
339        LogEvent logEvent = ensureImmutable(event);
340        if (logEvent.getMessage() instanceof ReusableMessage) {
341            if (logEvent instanceof Log4jLogEvent) {
342                ((Log4jLogEvent) logEvent).makeMessageImmutable();
343            } else if (logEvent instanceof MutableLogEvent) {
344                // MutableLogEvents need to be translated into the RingBuffer by the MUTABLE_TRANSLATOR.
345                // That translator calls MutableLogEvent.initFrom to copy the event, which will makeMessageImmutable the message.
346                if (translator != MUTABLE_TRANSLATOR) { // should not happen...
347                    // TRANSLATOR expects an immutable LogEvent
348                    logEvent = ((MutableLogEvent) logEvent).createMemento();
349                }
350            } else { // custom log event, with a ReusableMessage
351                showWarningAboutCustomLogEventWithReusableMessage(logEvent);
352            }
353        } else { // message is not a ReusableMessage; makeMessageImmutable it to prevent ConcurrentModificationExceptions
354            InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); // LOG4J2-1988, LOG4J2-1914
355        }
356        return logEvent;
357    }
358
359    private void showWarningAboutCustomLogEventWithReusableMessage(final LogEvent logEvent) {
360        if (!alreadyLoggedWarning) {
361            LOGGER.warn("Custom log event of type {} contains a mutable message of type {}." +
362                            " AsyncLoggerConfig does not know how to make an immutable copy of this message." +
363                            " This may result in ConcurrentModificationExceptions or incorrect log messages" +
364                            " if the application modifies objects in the message while" +
365                            " the background thread is writing it to the appenders.",
366                    logEvent.getClass().getName(), logEvent.getMessage().getClass().getName());
367            alreadyLoggedWarning = true;
368        }
369    }
370
371    private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
372        disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
373    }
374
375    @Override
376    public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
377        final LogEvent logEvent = prepareEvent(event);
378        return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
379    }
380
381    private LogEvent ensureImmutable(final LogEvent event) {
382        LogEvent result = event;
383        if (event instanceof RingBufferLogEvent) {
384            // Deal with special case where both types of Async Loggers are used together:
385            // RingBufferLogEvents are created by the all-loggers-async type, but
386            // this event is also consumed by the some-loggers-async type (this class).
387            // The original event will be re-used and modified in an application thread later,
388            // so take a snapshot of it, which can be safely processed in the
389            // some-loggers-async background thread.
390            result = ((RingBufferLogEvent) event).createMemento();
391        }
392        return result;
393    }
394
395    /*
396     * (non-Javadoc)
397     *
398     * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String,
399     * java.lang.String)
400     */
401    @Override
402    public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
403        return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
404    }
405}