001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom;
019
020import java.io.Serializable;
021import java.util.Properties;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.TimeUnit;
024
025import javax.jms.Connection;
026import javax.jms.ConnectionFactory;
027import javax.jms.Destination;
028import javax.jms.JMSException;
029import javax.jms.MapMessage;
030import javax.jms.Message;
031import javax.jms.MessageConsumer;
032import javax.jms.MessageProducer;
033import javax.jms.Session;
034import javax.naming.NamingException;
035
036import org.apache.logging.log4j.core.LogEvent;
037import org.apache.logging.log4j.core.appender.AbstractManager;
038import org.apache.logging.log4j.core.appender.AppenderLoggingException;
039import org.apache.logging.log4j.core.appender.ManagerFactory;
040import org.apache.logging.log4j.core.net.JndiManager;
041import org.apache.logging.log4j.core.util.Log4jThread;
042import org.apache.logging.log4j.status.StatusLogger;
043
044/**
045 * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests.
046 *
047 * <p>
048 * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects
049 * involving a configured ConnectionFactory and Destination.
050 * </p>
051 */
052public class JmsManager extends AbstractManager {
053
054    public static class JmsManagerConfiguration {
055        private final Properties jndiProperties;
056        private final String connectionFactoryName;
057        private final String destinationName;
058        private final String userName;
059        private final char[] password;
060        private final boolean immediateFail;
061        private final boolean retry;
062        private final long reconnectIntervalMillis;
063
064        JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
065                final String destinationName, final String userName, final char[] password, final boolean immediateFail,
066                final long reconnectIntervalMillis) {
067            this.jndiProperties = jndiProperties;
068            this.connectionFactoryName = connectionFactoryName;
069            this.destinationName = destinationName;
070            this.userName = userName;
071            this.password = password;
072            this.immediateFail = immediateFail;
073            this.reconnectIntervalMillis = reconnectIntervalMillis;
074            this.retry = reconnectIntervalMillis > 0;
075        }
076
077        public String getConnectionFactoryName() {
078            return connectionFactoryName;
079        }
080
081        public String getDestinationName() {
082            return destinationName;
083        }
084
085        public JndiManager getJndiManager() {
086            return JndiManager.getJndiManager(getJndiProperties());
087        }
088
089        public Properties getJndiProperties() {
090            return jndiProperties;
091        }
092
093        public char[] getPassword() {
094            return password;
095        }
096
097        public long getReconnectIntervalMillis() {
098            return reconnectIntervalMillis;
099        }
100
101        public String getUserName() {
102            return userName;
103        }
104
105        public boolean isImmediateFail() {
106            return immediateFail;
107        }
108
109        public boolean isRetry() {
110            return retry;
111        }
112
113        @Override
114        public String toString() {
115            return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
116                    + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
117                    + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
118                    + reconnectIntervalMillis + "]";
119        }
120
121    }
122
123    private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
124
125        @Override
126        public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
127            if (JndiManager.isJndiEnabled()) {
128                try {
129                    return new JmsManager(name, data);
130                } catch (final Exception e) {
131                    logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
132                    return null;
133                }
134            } else {
135                logger().error("JNDI has not been enabled. The log4j2.enableJndi property must be set to true");
136                return null;
137            }
138        }
139    }
140
141    /**
142     * Handles reconnecting to JMS on a Thread.
143     */
144    private class Reconnector extends Log4jThread {
145
146        private final CountDownLatch latch = new CountDownLatch(1);
147
148        private volatile boolean shutdown;
149
150        private final Object owner;
151
152        private Reconnector(final Object owner) {
153            super("JmsManager-Reconnector");
154            this.owner = owner;
155        }
156
157        public void latch() {
158            try {
159                latch.await();
160            } catch (final InterruptedException ex) {
161                // Ignore the exception.
162            }
163        }
164
165        void reconnect() throws NamingException, JMSException {
166            final JndiManager jndiManager2 = getJndiManager();
167            final Connection connection2 = createConnection(jndiManager2);
168            final Session session2 = createSession(connection2);
169            final Destination destination2 = createDestination(jndiManager2);
170            final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
171            connection2.start();
172            synchronized (owner) {
173                jndiManager = jndiManager2;
174                connection = connection2;
175                session = session2;
176                destination = destination2;
177                messageProducer = messageProducer2;
178                reconnector = null;
179                shutdown = true;
180            }
181            logger().debug("Connection reestablished to {}", configuration);
182        }
183
184        @Override
185        public void run() {
186            while (!shutdown) {
187                try {
188                    sleep(configuration.getReconnectIntervalMillis());
189                    reconnect();
190                } catch (final InterruptedException | JMSException | NamingException e) {
191                    logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
192                            e);
193                } finally {
194                    latch.countDown();
195                }
196            }
197        }
198
199        public void shutdown() {
200            shutdown = true;
201        }
202
203    }
204
205    static final JmsManagerFactory FACTORY = new JmsManagerFactory();
206
207    /**
208     * Gets a JmsManager using the specified configuration parameters.
209     *
210     * @param name
211     *            The name to use for this JmsManager.
212     * @param connectionFactoryName
213     *            The binding name for the {@link javax.jms.ConnectionFactory}.
214     * @param destinationName
215     *            The binding name for the {@link javax.jms.Destination}.
216     * @param userName
217     *            The userName to connect with or {@code null} for no authentication.
218     * @param password
219     *            The password to use with the given userName or {@code null} for no authentication.
220     * @param immediateFail
221     *            Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS
222     *            fails.
223     * @param reconnectIntervalMillis
224     *            How to log sleep in milliseconds before trying to reconnect to JMS.
225     * @param jndiProperties
226     *            JNDI properties.
227     * @return The JmsManager as configured.
228     */
229    public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
230            final String connectionFactoryName, final String destinationName, final String userName,
231            final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
232        final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
233                destinationName, userName, password, immediateFail, reconnectIntervalMillis);
234        return getManager(name, FACTORY, configuration);
235    }
236
237    private final JmsManagerConfiguration configuration;
238
239    private volatile Reconnector reconnector;
240    private volatile JndiManager jndiManager;
241    private volatile Connection connection;
242    private volatile Session session;
243    private volatile Destination destination;
244    private volatile MessageProducer messageProducer;
245
246    private JmsManager(final String name, final JmsManagerConfiguration configuration) {
247        super(null, name);
248        this.configuration = configuration;
249        this.jndiManager = configuration.getJndiManager();
250        try {
251            this.connection = createConnection(this.jndiManager);
252            this.session = createSession(this.connection);
253            this.destination = createDestination(this.jndiManager);
254            this.messageProducer = createMessageProducer(this.session, this.destination);
255            this.connection.start();
256        } catch (NamingException | JMSException e) {
257            this.reconnector = createReconnector();
258            this.reconnector.start();
259        }
260    }
261
262    private boolean closeConnection() {
263        if (connection == null) {
264            return true;
265        }
266        final Connection temp = connection;
267        connection = null;
268        try {
269            temp.close();
270            return true;
271        } catch (final JMSException e) {
272            StatusLogger.getLogger().debug(
273                    "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
274                    e.getLocalizedMessage(), temp, e);
275            return false;
276        }
277    }
278
279    private boolean closeJndiManager() {
280        if (jndiManager == null) {
281            return true;
282        }
283        final JndiManager tmp = jndiManager;
284        jndiManager = null;
285        tmp.close();
286        return true;
287    }
288
289    private boolean closeMessageProducer() {
290        if (messageProducer == null) {
291            return true;
292        }
293        final MessageProducer temp = messageProducer;
294        messageProducer = null;
295        try {
296            temp.close();
297            return true;
298        } catch (final JMSException e) {
299            StatusLogger.getLogger().debug(
300                    "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
301                    e.getLocalizedMessage(), temp, e);
302            return false;
303        }
304    }
305
306    private boolean closeSession() {
307        if (session == null) {
308            return true;
309        }
310        final Session temp = session;
311        session = null;
312        try {
313            temp.close();
314            return true;
315        } catch (final JMSException e) {
316            StatusLogger.getLogger().debug(
317                    "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
318                    e.getLocalizedMessage(), temp, e);
319            return false;
320        }
321    }
322
323    private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
324        final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
325        if (configuration.getUserName() != null && configuration.getPassword() != null) {
326            return connectionFactory.createConnection(configuration.getUserName(),
327                    configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
328        }
329        return connectionFactory.createConnection();
330
331    }
332
333    private Destination createDestination(final JndiManager jndiManager) throws NamingException {
334        return jndiManager.lookup(configuration.getDestinationName());
335    }
336
337    /**
338     * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object.
339     * <p>
340     * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as
341     * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent}
342     * message will be serialized to a String.
343     * </p>
344     * <p>
345     * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message
346     * will be serialized as a Java object.
347     * </p>
348     * <p>
349     * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message
350     * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage.
351     * </p>
352     *
353     * @param object
354     *            The LogEvent or String message to wrap.
355     * @return A new JMS message containing the provided object.
356     * @throws JMSException
357     */
358    public Message createMessage(final Serializable object) throws JMSException {
359        if (object instanceof String) {
360            return this.session.createTextMessage((String) object);
361        } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
362            return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
363        }
364        return this.session.createObjectMessage(object);
365    }
366
367    private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
368        final Message message = createMessage(serializable);
369        message.setJMSTimestamp(event.getTimeMillis());
370        messageProducer.send(message);
371    }
372
373    /**
374     * Creates a MessageConsumer on this Destination using the current Session.
375     *
376     * @return A MessageConsumer on this Destination.
377     * @throws JMSException
378     */
379    public MessageConsumer createMessageConsumer() throws JMSException {
380        return this.session.createConsumer(this.destination);
381    }
382
383    /**
384     * Creates a MessageProducer on this Destination using the current Session.
385     *
386     * @param session
387     *            The JMS Session to use to create the MessageProducer
388     * @param destination
389     *            The JMS Destination for the MessageProducer
390     * @return A MessageProducer on this Destination.
391     * @throws JMSException
392     */
393    public MessageProducer createMessageProducer(final Session session, final Destination destination)
394            throws JMSException {
395        return session.createProducer(destination);
396    }
397
398    private Reconnector createReconnector() {
399        final Reconnector recon = new Reconnector(this);
400        recon.setDaemon(true);
401        recon.setPriority(Thread.MIN_PRIORITY);
402        return recon;
403    }
404
405    private Session createSession(final Connection connection) throws JMSException {
406        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
407    }
408
409    public JmsManagerConfiguration getJmsManagerConfiguration() {
410        return configuration;
411    }
412
413    JndiManager getJndiManager() {
414        return configuration.getJndiManager();
415    }
416
417    <T> T lookup(final String destinationName) throws NamingException {
418        return this.jndiManager.lookup(destinationName);
419    }
420
421    private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
422            final MapMessage jmsMapMessage) {
423        // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map.
424        log4jMapMessage.forEach((key, value) -> {
425            try {
426                jmsMapMessage.setObject(key, value);
427            } catch (final JMSException e) {
428                throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
429                        e.getClass(), key, value, e.getLocalizedMessage()), e);
430            }
431        });
432        return jmsMapMessage;
433    }
434
435    @Override
436    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
437        if (reconnector != null) {
438            reconnector.shutdown();
439            reconnector.interrupt();
440            reconnector = null;
441        }
442        boolean closed = false;
443        closed &= closeJndiManager();
444        closed &= closeMessageProducer();
445        closed &= closeSession();
446        closed &= closeConnection();
447        return closed && this.jndiManager.stop(timeout, timeUnit);
448    }
449
450    void send(final LogEvent event, final Serializable serializable) {
451        if (messageProducer == null) {
452            if (reconnector != null && !configuration.isImmediateFail()) {
453                reconnector.latch();
454                if (messageProducer == null) {
455                    throw new AppenderLoggingException(
456                            "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
457                }
458            }
459        }
460        synchronized (this) {
461            try {
462                createMessageAndSend(event, serializable);
463            } catch (final JMSException causeEx) {
464                if (configuration.isRetry() && reconnector == null) {
465                    reconnector = createReconnector();
466                    try {
467                        closeJndiManager();
468                        reconnector.reconnect();
469                    } catch (NamingException | JMSException reconnEx) {
470                        logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
471                                configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
472                        reconnector.start();
473                        throw new AppenderLoggingException(
474                                String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx);
475                    }
476                    try {
477                        createMessageAndSend(event, serializable);
478                    } catch (final JMSException e) {
479                        throw new AppenderLoggingException(
480                                String.format("Error sending to %s after reestablishing JMS connection for %s",
481                                        getName(), configuration),
482                                causeEx);
483                    }
484                }
485            }
486        }
487    }
488
489}