001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom;
019
020import java.io.Serializable;
021import java.util.Properties;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.TimeUnit;
024
025import javax.jms.Connection;
026import javax.jms.ConnectionFactory;
027import javax.jms.Destination;
028import javax.jms.JMSException;
029import javax.jms.MapMessage;
030import javax.jms.Message;
031import javax.jms.MessageConsumer;
032import javax.jms.MessageProducer;
033import javax.jms.Session;
034import javax.naming.NamingException;
035
036import org.apache.logging.log4j.core.LogEvent;
037import org.apache.logging.log4j.core.appender.AbstractManager;
038import org.apache.logging.log4j.core.appender.AppenderLoggingException;
039import org.apache.logging.log4j.core.appender.ManagerFactory;
040import org.apache.logging.log4j.core.net.JndiManager;
041import org.apache.logging.log4j.core.util.Log4jThread;
042import org.apache.logging.log4j.status.StatusLogger;
043import org.apache.logging.log4j.util.BiConsumer;
044
045/**
046 * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests.
047 *
048 * <p>
049 * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects
050 * involving a configured ConnectionFactory and Destination.
051 * </p>
052 */
053public class JmsManager extends AbstractManager {
054
055    public static class JmsManagerConfiguration {
056        private final Properties jndiProperties;
057        private final String connectionFactoryName;
058        private final String destinationName;
059        private final String userName;
060        private final char[] password;
061        private final boolean immediateFail;
062        private final boolean retry;
063        private final long reconnectIntervalMillis;
064
065        JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
066                final String destinationName, final String userName, final char[] password, final boolean immediateFail,
067                final long reconnectIntervalMillis) {
068            this.jndiProperties = jndiProperties;
069            this.connectionFactoryName = connectionFactoryName;
070            this.destinationName = destinationName;
071            this.userName = userName;
072            this.password = password;
073            this.immediateFail = immediateFail;
074            this.reconnectIntervalMillis = reconnectIntervalMillis;
075            this.retry = reconnectIntervalMillis > 0;
076        }
077
078        public String getConnectionFactoryName() {
079            return connectionFactoryName;
080        }
081
082        public String getDestinationName() {
083            return destinationName;
084        }
085
086        public JndiManager getJndiManager() {
087            return JndiManager.getJndiManager(getJndiProperties());
088        }
089
090        public Properties getJndiProperties() {
091            return jndiProperties;
092        }
093
094        public char[] getPassword() {
095            return password;
096        }
097
098        public long getReconnectIntervalMillis() {
099            return reconnectIntervalMillis;
100        }
101
102        public String getUserName() {
103            return userName;
104        }
105
106        public boolean isImmediateFail() {
107            return immediateFail;
108        }
109
110        public boolean isRetry() {
111            return retry;
112        }
113
114        @Override
115        public String toString() {
116            return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
117                    + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
118                    + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
119                    + reconnectIntervalMillis + "]";
120        }
121
122    }
123
124    private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
125
126        @Override
127        public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
128            if (JndiManager.isJndiJmsEnabled()) {
129                try {
130                    return new JmsManager(name, data);
131                } catch (final Exception e) {
132                    logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
133                    return null;
134                }
135            }
136            logger().error("JNDI must be enabled by setting log4j2.enableJndiJms=true");
137            return null;
138        }
139    }
140
141    /**
142     * Handles reconnecting to JMS on a Thread.
143     */
144    private class Reconnector extends Log4jThread {
145
146        private final CountDownLatch latch = new CountDownLatch(1);
147
148        private volatile boolean shutdown = false;
149
150        private final Object owner;
151
152        private Reconnector(final Object owner) {
153            super("JmsManager-Reconnector");
154            this.owner = owner;
155        }
156
157        public void latch() {
158            try {
159                latch.await();
160            } catch (final InterruptedException ex) {
161                // Ignore the exception.
162            }
163        }
164
165        void reconnect() throws NamingException, JMSException {
166            final JndiManager jndiManager2 = getJndiManager();
167            final Connection connection2 = createConnection(jndiManager2);
168            final Session session2 = createSession(connection2);
169            final Destination destination2 = createDestination(jndiManager2);
170            final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
171            connection2.start();
172            synchronized (owner) {
173                jndiManager = jndiManager2;
174                connection = connection2;
175                session = session2;
176                destination = destination2;
177                messageProducer = messageProducer2;
178                reconnector = null;
179                shutdown = true;
180            }
181            logger().debug("Connection reestablished to {}", configuration);
182        }
183
184        @Override
185        public void run() {
186            while (!shutdown) {
187                try {
188                    sleep(configuration.getReconnectIntervalMillis());
189                    reconnect();
190                } catch (final InterruptedException | JMSException | NamingException e) {
191                    logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
192                            e);
193                } finally {
194                    latch.countDown();
195                }
196            }
197        }
198
199        public void shutdown() {
200            shutdown = true;
201        }
202
203    }
204
205    static final JmsManagerFactory FACTORY = new JmsManagerFactory();
206
207    /**
208     * Gets a JmsManager using the specified configuration parameters.
209     *
210     * @param name
211     *            The name to use for this JmsManager.
212     * @param connectionFactoryName
213     *            The binding name for the {@link javax.jms.ConnectionFactory}.
214     * @param destinationName
215     *            The binding name for the {@link javax.jms.Destination}.
216     * @param userName
217     *            The userName to connect with or {@code null} for no authentication.
218     * @param password
219     *            The password to use with the given userName or {@code null} for no authentication.
220     * @param immediateFail
221     *            Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS
222     *            fails.
223     * @param reconnectIntervalMillis
224     *            How to log sleep in milliseconds before trying to reconnect to JMS.
225     * @param jndiProperties
226     *            JNDI properties.
227     * @return The JmsManager as configured.
228     */
229    public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
230            final String connectionFactoryName, final String destinationName, final String userName,
231            final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
232        final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
233                destinationName, userName, password, immediateFail, reconnectIntervalMillis);
234        return getManager(name, FACTORY, configuration);
235    }
236
237    private final JmsManagerConfiguration configuration;
238
239    private volatile Reconnector reconnector;
240    private volatile JndiManager jndiManager;
241    private volatile Connection connection;
242    private volatile Session session;
243    private volatile Destination destination;
244    private volatile MessageProducer messageProducer;
245
246    private JmsManager(final String name, final JmsManagerConfiguration configuration) {
247        super(null, name);
248        this.configuration = configuration;
249        this.jndiManager = configuration.getJndiManager();
250        try {
251            this.connection = createConnection(this.jndiManager);
252            this.session = createSession(this.connection);
253            this.destination = createDestination(this.jndiManager);
254            this.messageProducer = createMessageProducer(this.session, this.destination);
255            this.connection.start();
256        } catch (NamingException | JMSException e) {
257            this.reconnector = createReconnector();
258            this.reconnector.start();
259        }
260    }
261
262    private boolean closeConnection() {
263        if (connection == null) {
264            return true;
265        }
266        final Connection temp = connection;
267        connection = null;
268        try {
269            temp.close();
270            return true;
271        } catch (final JMSException e) {
272            StatusLogger.getLogger().debug(
273                    "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
274                    e.getLocalizedMessage(), temp, e);
275            return false;
276        }
277    }
278
279    private boolean closeJndiManager() {
280        if (jndiManager == null) {
281            return true;
282        }
283        final JndiManager tmp = jndiManager;
284        jndiManager = null;
285        tmp.close();
286        return true;
287    }
288
289    private boolean closeMessageProducer() {
290        if (messageProducer == null) {
291            return true;
292        }
293        final MessageProducer temp = messageProducer;
294        messageProducer = null;
295        try {
296            temp.close();
297            return true;
298        } catch (final JMSException e) {
299            StatusLogger.getLogger().debug(
300                    "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
301                    e.getLocalizedMessage(), temp, e);
302            return false;
303        }
304    }
305
306    private boolean closeSession() {
307        if (session == null) {
308            return true;
309        }
310        final Session temp = session;
311        session = null;
312        try {
313            temp.close();
314            return true;
315        } catch (final JMSException e) {
316            StatusLogger.getLogger().debug(
317                    "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
318                    e.getLocalizedMessage(), temp, e);
319            return false;
320        }
321    }
322
323    private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
324        final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
325        if (configuration.getUserName() != null && configuration.getPassword() != null) {
326            return connectionFactory.createConnection(configuration.getUserName(),
327                    configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
328        }
329        return connectionFactory.createConnection();
330
331    }
332
333    private Destination createDestination(final JndiManager jndiManager) throws NamingException {
334        return jndiManager.lookup(configuration.getDestinationName());
335    }
336
337    /**
338     * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object.
339     * <p>
340     * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as
341     * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent}
342     * message will be serialized to a String.
343     * </p>
344     * <p>
345     * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message
346     * will be serialized as a Java object.
347     * </p>
348     * <p>
349     * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message
350     * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage.
351     * </p>
352     *
353     * @param object
354     *            The LogEvent or String message to wrap.
355     * @return A new JMS message containing the provided object.
356     * @throws JMSException if the JMS provider fails to create a message due to some internal error.
357     */
358    public Message createMessage(final Serializable object) throws JMSException {
359        if (object instanceof String) {
360            return this.session.createTextMessage((String) object);
361        } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
362            return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
363        }
364        return this.session.createObjectMessage(object);
365    }
366
367    private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
368        final Message message = createMessage(serializable);
369        message.setJMSTimestamp(event.getTimeMillis());
370        messageProducer.send(message);
371    }
372
373    /**
374     * Creates a MessageConsumer on this Destination using the current Session.
375     *
376     * @return A MessageConsumer on this Destination.
377     * @throws JMSException if the session fails to create a consumer due to some internal error.
378     */
379    public MessageConsumer createMessageConsumer() throws JMSException {
380        return this.session.createConsumer(this.destination);
381    }
382
383    /**
384     * Creates a MessageProducer on this Destination using the current Session.
385     *
386     * @param session
387     *            The JMS Session to use to create the MessageProducer
388     * @param destination
389     *            The JMS Destination for the MessageProducer
390     * @return A MessageProducer on this Destination.
391     * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
392     */
393    public MessageProducer createMessageProducer(final Session session, final Destination destination)
394            throws JMSException {
395        return session.createProducer(destination);
396    }
397
398    private Reconnector createReconnector() {
399        final Reconnector recon = new Reconnector(this);
400        recon.setDaemon(true);
401        recon.setPriority(Thread.MIN_PRIORITY);
402        return recon;
403    }
404
405    private Session createSession(final Connection connection) throws JMSException {
406        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
407    }
408
409    public JmsManagerConfiguration getJmsManagerConfiguration() {
410        return configuration;
411    }
412
413    JndiManager getJndiManager() {
414        return configuration.getJndiManager();
415    }
416
417    <T> T lookup(final String destinationName) throws NamingException {
418        return this.jndiManager.lookup(destinationName);
419    }
420
421    private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
422            final MapMessage jmsMapMessage) {
423        // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map.
424        log4jMapMessage.forEach(new BiConsumer<String, Object>() {
425            @Override
426            public void accept(final String key, final Object value) {
427                try {
428                    jmsMapMessage.setObject(key, value);
429                } catch (final JMSException e) {
430                    throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
431                            e.getClass(), key, value, e.getLocalizedMessage()), e);
432                }
433            }
434        });
435        return jmsMapMessage;
436    }
437
438    @Override
439    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
440        if (reconnector != null) {
441            reconnector.shutdown();
442            reconnector.interrupt();
443            reconnector = null;
444        }
445        boolean closed = false;
446        closed &= closeJndiManager();
447        closed &= closeMessageProducer();
448        closed &= closeSession();
449        closed &= closeConnection();
450        return closed && this.jndiManager.stop(timeout, timeUnit);
451    }
452
453    void send(final LogEvent event, final Serializable serializable) {
454        if (messageProducer == null) {
455            if (reconnector != null && !configuration.isImmediateFail()) {
456                reconnector.latch();
457                if (messageProducer == null) {
458                    throw new AppenderLoggingException(
459                            "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
460                }
461            }
462        }
463        synchronized (this) {
464            try {
465                createMessageAndSend(event, serializable);
466            } catch (final JMSException causeEx) {
467                if (configuration.isRetry() && reconnector == null) {
468                    reconnector = createReconnector();
469                    try {
470                        closeJndiManager();
471                        reconnector.reconnect();
472                    } catch (NamingException | JMSException reconnEx) {
473                        logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
474                                configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
475                        reconnector.start();
476                        throw new AppenderLoggingException(
477                                String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx);
478                    }
479                    try {
480                        createMessageAndSend(event, serializable);
481                    } catch (final JMSException e) {
482                        throw new AppenderLoggingException(
483                                String.format("Error sending to %s after reestablishing JMS connection for %s",
484                                        getName(), configuration),
485                                causeEx);
486                    }
487                }
488            }
489        }
490    }
491
492}