001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017
018package org.apache.logging.log4j.core.appender.mom;
019
020import java.io.Serializable;
021import java.util.Properties;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.TimeUnit;
024
025import javax.jms.Connection;
026import javax.jms.ConnectionFactory;
027import javax.jms.Destination;
028import javax.jms.JMSException;
029import javax.jms.MapMessage;
030import javax.jms.Message;
031import javax.jms.MessageConsumer;
032import javax.jms.MessageProducer;
033import javax.jms.Session;
034import javax.naming.NamingException;
035
036import org.apache.logging.log4j.core.LogEvent;
037import org.apache.logging.log4j.core.appender.AbstractManager;
038import org.apache.logging.log4j.core.appender.AppenderLoggingException;
039import org.apache.logging.log4j.core.appender.ManagerFactory;
040import org.apache.logging.log4j.core.net.JndiManager;
041import org.apache.logging.log4j.core.util.Log4jThread;
042import org.apache.logging.log4j.status.StatusLogger;
043import org.apache.logging.log4j.util.BiConsumer;
044
045/**
046 * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests.
047 *
048 * <p>
049 * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects
050 * involving a configured ConnectionFactory and Destination.
051 * </p>
052 */
053public class JmsManager extends AbstractManager {
054
055    public static class JmsManagerConfiguration {
056        private final Properties jndiProperties;
057        private final String connectionFactoryName;
058        private final String destinationName;
059        private final String userName;
060        private final char[] password;
061        private final boolean immediateFail;
062        private final boolean retry;
063        private final long reconnectIntervalMillis;
064
065        JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
066                final String destinationName, final String userName, final char[] password, final boolean immediateFail,
067                final long reconnectIntervalMillis) {
068            this.jndiProperties = jndiProperties;
069            this.connectionFactoryName = connectionFactoryName;
070            this.destinationName = destinationName;
071            this.userName = userName;
072            this.password = password;
073            this.immediateFail = immediateFail;
074            this.reconnectIntervalMillis = reconnectIntervalMillis;
075            this.retry = reconnectIntervalMillis > 0;
076        }
077
078        public String getConnectionFactoryName() {
079            return connectionFactoryName;
080        }
081
082        public String getDestinationName() {
083            return destinationName;
084        }
085
086        public JndiManager getJndiManager() {
087            return JndiManager.getJndiManager(getJndiProperties());
088        }
089
090        public Properties getJndiProperties() {
091            return jndiProperties;
092        }
093
094        public char[] getPassword() {
095            return password;
096        }
097
098        public long getReconnectIntervalMillis() {
099            return reconnectIntervalMillis;
100        }
101
102        public String getUserName() {
103            return userName;
104        }
105
106        public boolean isImmediateFail() {
107            return immediateFail;
108        }
109
110        public boolean isRetry() {
111            return retry;
112        }
113
114        @Override
115        public String toString() {
116            return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
117                    + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
118                    + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
119                    + reconnectIntervalMillis + "]";
120        }
121
122    }
123
124    private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
125
126        @Override
127        public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
128            try {
129                return new JmsManager(name, data);
130            } catch (final Exception e) {
131                logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
132                return null;
133            }
134        }
135    }
136
137    /**
138     * Handles reconnecting to JMS on a Thread.
139     */
140    private class Reconnector extends Log4jThread {
141
142        private final CountDownLatch latch = new CountDownLatch(1);
143
144        private volatile boolean shutdown = false;
145
146        private final Object owner;
147
148        private Reconnector(final Object owner) {
149            super("JmsManager-Reconnector");
150            this.owner = owner;
151        }
152
153        public void latch() {
154            try {
155                latch.await();
156            } catch (final InterruptedException ex) {
157                // Ignore the exception.
158            }
159        }
160
161        void reconnect() throws NamingException, JMSException {
162            final JndiManager jndiManager2 = getJndiManager();
163            final Connection connection2 = createConnection(jndiManager2);
164            final Session session2 = createSession(connection2);
165            final Destination destination2 = createDestination(jndiManager2);
166            final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
167            connection2.start();
168            synchronized (owner) {
169                jndiManager = jndiManager2;
170                connection = connection2;
171                session = session2;
172                destination = destination2;
173                messageProducer = messageProducer2;
174                reconnector = null;
175                shutdown = true;
176            }
177            logger().debug("Connection reestablished to {}", configuration);
178        }
179
180        @Override
181        public void run() {
182            while (!shutdown) {
183                try {
184                    sleep(configuration.getReconnectIntervalMillis());
185                    reconnect();
186                } catch (final InterruptedException | JMSException | NamingException e) {
187                    logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
188                            e);
189                } finally {
190                    latch.countDown();
191                }
192            }
193        }
194
195        public void shutdown() {
196            shutdown = true;
197        }
198
199    }
200
201    static final JmsManagerFactory FACTORY = new JmsManagerFactory();
202
203    /**
204     * Gets a JmsManager using the specified configuration parameters.
205     *
206     * @param name
207     *            The name to use for this JmsManager.
208     * @param connectionFactoryName
209     *            The binding name for the {@link javax.jms.ConnectionFactory}.
210     * @param destinationName
211     *            The binding name for the {@link javax.jms.Destination}.
212     * @param userName
213     *            The userName to connect with or {@code null} for no authentication.
214     * @param password
215     *            The password to use with the given userName or {@code null} for no authentication.
216     * @param immediateFail
217     *            Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS
218     *            fails.
219     * @param reconnectIntervalMillis
220     *            How to log sleep in milliseconds before trying to reconnect to JMS.
221     * @param jndiProperties
222     *            JNDI properties.
223     * @return The JmsManager as configured.
224     */
225    public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
226            final String connectionFactoryName, final String destinationName, final String userName,
227            final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
228        final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
229                destinationName, userName, password, immediateFail, reconnectIntervalMillis);
230        return getManager(name, FACTORY, configuration);
231    }
232
233    private final JmsManagerConfiguration configuration;
234
235    private volatile Reconnector reconnector;
236    private volatile JndiManager jndiManager;
237    private volatile Connection connection;
238    private volatile Session session;
239    private volatile Destination destination;
240    private volatile MessageProducer messageProducer;
241
242    private JmsManager(final String name, final JmsManagerConfiguration configuration) {
243        super(null, name);
244        this.configuration = configuration;
245        this.jndiManager = configuration.getJndiManager();
246        try {
247            this.connection = createConnection(this.jndiManager);
248            this.session = createSession(this.connection);
249            this.destination = createDestination(this.jndiManager);
250            this.messageProducer = createMessageProducer(this.session, this.destination);
251            this.connection.start();
252        } catch (NamingException | JMSException e) {
253            this.reconnector = createReconnector();
254            this.reconnector.start();
255        }
256    }
257
258    private boolean closeConnection() {
259        if (connection == null) {
260            return true;
261        }
262        final Connection temp = connection;
263        connection = null;
264        try {
265            temp.close();
266            return true;
267        } catch (final JMSException e) {
268            StatusLogger.getLogger().debug(
269                    "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
270                    e.getLocalizedMessage(), temp, e);
271            return false;
272        }
273    }
274
275    private boolean closeJndiManager() {
276        if (jndiManager == null) {
277            return true;
278        }
279        final JndiManager tmp = jndiManager;
280        jndiManager = null;
281        tmp.close();
282        return true;
283    }
284
285    private boolean closeMessageProducer() {
286        if (messageProducer == null) {
287            return true;
288        }
289        final MessageProducer temp = messageProducer;
290        messageProducer = null;
291        try {
292            temp.close();
293            return true;
294        } catch (final JMSException e) {
295            StatusLogger.getLogger().debug(
296                    "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
297                    e.getLocalizedMessage(), temp, e);
298            return false;
299        }
300    }
301
302    private boolean closeSession() {
303        if (session == null) {
304            return true;
305        }
306        final Session temp = session;
307        session = null;
308        try {
309            temp.close();
310            return true;
311        } catch (final JMSException e) {
312            StatusLogger.getLogger().debug(
313                    "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
314                    e.getLocalizedMessage(), temp, e);
315            return false;
316        }
317    }
318
319    private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
320        final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
321        if (configuration.getUserName() != null && configuration.getPassword() != null) {
322            return connectionFactory.createConnection(configuration.getUserName(),
323                    configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
324        }
325        return connectionFactory.createConnection();
326
327    }
328
329    private Destination createDestination(final JndiManager jndiManager) throws NamingException {
330        return jndiManager.lookup(configuration.getDestinationName());
331    }
332
333    /**
334     * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object.
335     * <p>
336     * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as
337     * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent}
338     * message will be serialized to a String.
339     * </p>
340     * <p>
341     * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message
342     * will be serialized as a Java object.
343     * </p>
344     * <p>
345     * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message
346     * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage.
347     * </p>
348     *
349     * @param object
350     *            The LogEvent or String message to wrap.
351     * @return A new JMS message containing the provided object.
352     * @throws JMSException
353     */
354    public Message createMessage(final Serializable object) throws JMSException {
355        if (object instanceof String) {
356            return this.session.createTextMessage((String) object);
357        } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
358            return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
359        }
360        return this.session.createObjectMessage(object);
361    }
362
363    private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
364        final Message message = createMessage(serializable);
365        message.setJMSTimestamp(event.getTimeMillis());
366        messageProducer.send(message);
367    }
368
369    /**
370     * Creates a MessageConsumer on this Destination using the current Session.
371     *
372     * @return A MessageConsumer on this Destination.
373     * @throws JMSException
374     */
375    public MessageConsumer createMessageConsumer() throws JMSException {
376        return this.session.createConsumer(this.destination);
377    }
378
379    /**
380     * Creates a MessageProducer on this Destination using the current Session.
381     *
382     * @param session
383     *            The JMS Session to use to create the MessageProducer
384     * @param destination
385     *            The JMS Destination for the MessageProducer
386     * @return A MessageProducer on this Destination.
387     * @throws JMSException
388     */
389    public MessageProducer createMessageProducer(final Session session, final Destination destination)
390            throws JMSException {
391        return session.createProducer(destination);
392    }
393
394    private Reconnector createReconnector() {
395        final Reconnector recon = new Reconnector(this);
396        recon.setDaemon(true);
397        recon.setPriority(Thread.MIN_PRIORITY);
398        return recon;
399    }
400
401    private Session createSession(final Connection connection) throws JMSException {
402        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
403    }
404
405    public JmsManagerConfiguration getJmsManagerConfiguration() {
406        return configuration;
407    }
408
409    JndiManager getJndiManager() {
410        return configuration.getJndiManager();
411    }
412
413    <T> T lookup(final String destinationName) throws NamingException {
414        return this.jndiManager.lookup(destinationName);
415    }
416
417    private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
418            final MapMessage jmsMapMessage) {
419        // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map.
420        log4jMapMessage.forEach(new BiConsumer<String, Object>() {
421            @Override
422            public void accept(final String key, final Object value) {
423                try {
424                    jmsMapMessage.setObject(key, value);
425                } catch (final JMSException e) {
426                    throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
427                            e.getClass(), key, value, e.getLocalizedMessage()), e);
428                }
429            }
430        });
431        return jmsMapMessage;
432    }
433
434    @Override
435    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
436        if (reconnector != null) {
437            reconnector.shutdown();
438            reconnector.interrupt();
439            reconnector = null;
440        }
441        boolean closed = false;
442        closed &= closeJndiManager();
443        closed &= closeMessageProducer();
444        closed &= closeSession();
445        closed &= closeConnection();
446        return closed && this.jndiManager.stop(timeout, timeUnit);
447    }
448
449    void send(final LogEvent event, final Serializable serializable) {
450        if (messageProducer == null) {
451            if (reconnector != null && !configuration.isImmediateFail()) {
452                reconnector.latch();
453                if (messageProducer == null) {
454                    throw new AppenderLoggingException(
455                            "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
456                }
457            }
458        }
459        synchronized (this) {
460            try {
461                createMessageAndSend(event, serializable);
462            } catch (final JMSException causeEx) {
463                if (configuration.isRetry() && reconnector == null) {
464                    reconnector = createReconnector();
465                    try {
466                        closeJndiManager();
467                        reconnector.reconnect();
468                    } catch (NamingException | JMSException reconnEx) {
469                        logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
470                                configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
471                        reconnector.start();
472                        throw new AppenderLoggingException(
473                                String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx);
474                    }
475                    try {
476                        createMessageAndSend(event, serializable);
477                    } catch (final JMSException e) {
478                        throw new AppenderLoggingException(
479                                String.format("Error sending to %s after reestablishing JMS connection for %s",
480                                        getName(), configuration),
481                                causeEx);
482                    }
483                }
484            }
485        }
486    }
487
488}