View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom;
19  
20  import java.io.Serializable;
21  import java.util.Properties;
22  import java.util.concurrent.CountDownLatch;
23  import java.util.concurrent.TimeUnit;
24  
25  import javax.jms.Connection;
26  import javax.jms.ConnectionFactory;
27  import javax.jms.Destination;
28  import javax.jms.JMSException;
29  import javax.jms.MapMessage;
30  import javax.jms.Message;
31  import javax.jms.MessageConsumer;
32  import javax.jms.MessageProducer;
33  import javax.jms.Session;
34  import javax.naming.NamingException;
35  
36  import org.apache.logging.log4j.core.LogEvent;
37  import org.apache.logging.log4j.core.appender.AbstractManager;
38  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
39  import org.apache.logging.log4j.core.appender.ManagerFactory;
40  import org.apache.logging.log4j.core.net.JndiManager;
41  import org.apache.logging.log4j.core.util.Log4jThread;
42  import org.apache.logging.log4j.status.StatusLogger;
43  import org.apache.logging.log4j.util.BiConsumer;
44  
45  /**
46   * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests.
47   *
48   * <p>
49   * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects
50   * involving a configured ConnectionFactory and Destination.
51   * </p>
52   */
53  public class JmsManager extends AbstractManager {
54  
55      public static class JmsManagerConfiguration {
56          private final Properties jndiProperties;
57          private final String connectionFactoryName;
58          private final String destinationName;
59          private final String userName;
60          private final char[] password;
61          private final boolean immediateFail;
62          private final boolean retry;
63          private final long reconnectIntervalMillis;
64  
65          JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
66                  final String destinationName, final String userName, final char[] password, final boolean immediateFail,
67                  final long reconnectIntervalMillis) {
68              this.jndiProperties = jndiProperties;
69              this.connectionFactoryName = connectionFactoryName;
70              this.destinationName = destinationName;
71              this.userName = userName;
72              this.password = password;
73              this.immediateFail = immediateFail;
74              this.reconnectIntervalMillis = reconnectIntervalMillis;
75              this.retry = reconnectIntervalMillis > 0;
76          }
77  
78          public String getConnectionFactoryName() {
79              return connectionFactoryName;
80          }
81  
82          public String getDestinationName() {
83              return destinationName;
84          }
85  
86          public JndiManager getJndiManager() {
87              return JndiManager.getJndiManager(getJndiProperties());
88          }
89  
90          public Properties getJndiProperties() {
91              return jndiProperties;
92          }
93  
94          public char[] getPassword() {
95              return password;
96          }
97  
98          public long getReconnectIntervalMillis() {
99              return reconnectIntervalMillis;
100         }
101 
102         public String getUserName() {
103             return userName;
104         }
105 
106         public boolean isImmediateFail() {
107             return immediateFail;
108         }
109 
110         public boolean isRetry() {
111             return retry;
112         }
113 
114         @Override
115         public String toString() {
116             return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
117                     + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
118                     + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
119                     + reconnectIntervalMillis + "]";
120         }
121 
122     }
123 
124     private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
125 
126         @Override
127         public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
128             if (JndiManager.isJndiJmsEnabled()) {
129                 try {
130                     return new JmsManager(name, data);
131                 } catch (final Exception e) {
132                     logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
133                     return null;
134                 }
135             }
136             logger().error("JNDI must be enabled by setting log4j2.enableJndiJms=true");
137             return null;
138         }
139     }
140 
141     /**
142      * Handles reconnecting to JMS on a Thread.
143      */
144     private class Reconnector extends Log4jThread {
145 
146         private final CountDownLatch latch = new CountDownLatch(1);
147 
148         private volatile boolean shutdown = false;
149 
150         private final Object owner;
151 
152         private Reconnector(final Object owner) {
153             super("JmsManager-Reconnector");
154             this.owner = owner;
155         }
156 
157         public void latch() {
158             try {
159                 latch.await();
160             } catch (final InterruptedException ex) {
161                 // Ignore the exception.
162             }
163         }
164 
165         void reconnect() throws NamingException, JMSException {
166             final JndiManager jndiManager2 = getJndiManager();
167             final Connection connection2 = createConnection(jndiManager2);
168             final Session session2 = createSession(connection2);
169             final Destination destination2 = createDestination(jndiManager2);
170             final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
171             connection2.start();
172             synchronized (owner) {
173                 jndiManager = jndiManager2;
174                 connection = connection2;
175                 session = session2;
176                 destination = destination2;
177                 messageProducer = messageProducer2;
178                 reconnector = null;
179                 shutdown = true;
180             }
181             logger().debug("Connection reestablished to {}", configuration);
182         }
183 
184         @Override
185         public void run() {
186             while (!shutdown) {
187                 try {
188                     sleep(configuration.getReconnectIntervalMillis());
189                     reconnect();
190                 } catch (final InterruptedException | JMSException | NamingException e) {
191                     logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
192                             e);
193                 } finally {
194                     latch.countDown();
195                 }
196             }
197         }
198 
199         public void shutdown() {
200             shutdown = true;
201         }
202 
203     }
204 
205     static final JmsManagerFactory FACTORY = new JmsManagerFactory();
206 
207     /**
208      * Gets a JmsManager using the specified configuration parameters.
209      *
210      * @param name
211      *            The name to use for this JmsManager.
212      * @param connectionFactoryName
213      *            The binding name for the {@link javax.jms.ConnectionFactory}.
214      * @param destinationName
215      *            The binding name for the {@link javax.jms.Destination}.
216      * @param userName
217      *            The userName to connect with or {@code null} for no authentication.
218      * @param password
219      *            The password to use with the given userName or {@code null} for no authentication.
220      * @param immediateFail
221      *            Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS
222      *            fails.
223      * @param reconnectIntervalMillis
224      *            How to log sleep in milliseconds before trying to reconnect to JMS.
225      * @param jndiProperties
226      *            JNDI properties.
227      * @return The JmsManager as configured.
228      */
229     public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
230             final String connectionFactoryName, final String destinationName, final String userName,
231             final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
232         final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
233                 destinationName, userName, password, immediateFail, reconnectIntervalMillis);
234         return getManager(name, FACTORY, configuration);
235     }
236 
237     private final JmsManagerConfiguration configuration;
238 
239     private volatile Reconnector reconnector;
240     private volatile JndiManager jndiManager;
241     private volatile Connection connection;
242     private volatile Session session;
243     private volatile Destination destination;
244     private volatile MessageProducer messageProducer;
245 
246     private JmsManager(final String name, final JmsManagerConfiguration configuration) {
247         super(null, name);
248         this.configuration = configuration;
249         this.jndiManager = configuration.getJndiManager();
250         try {
251             this.connection = createConnection(this.jndiManager);
252             this.session = createSession(this.connection);
253             this.destination = createDestination(this.jndiManager);
254             this.messageProducer = createMessageProducer(this.session, this.destination);
255             this.connection.start();
256         } catch (NamingException | JMSException e) {
257             this.reconnector = createReconnector();
258             this.reconnector.start();
259         }
260     }
261 
262     private boolean closeConnection() {
263         if (connection == null) {
264             return true;
265         }
266         final Connection temp = connection;
267         connection = null;
268         try {
269             temp.close();
270             return true;
271         } catch (final JMSException e) {
272             StatusLogger.getLogger().debug(
273                     "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
274                     e.getLocalizedMessage(), temp, e);
275             return false;
276         }
277     }
278 
279     private boolean closeJndiManager() {
280         if (jndiManager == null) {
281             return true;
282         }
283         final JndiManager tmp = jndiManager;
284         jndiManager = null;
285         tmp.close();
286         return true;
287     }
288 
289     private boolean closeMessageProducer() {
290         if (messageProducer == null) {
291             return true;
292         }
293         final MessageProducer temp = messageProducer;
294         messageProducer = null;
295         try {
296             temp.close();
297             return true;
298         } catch (final JMSException e) {
299             StatusLogger.getLogger().debug(
300                     "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
301                     e.getLocalizedMessage(), temp, e);
302             return false;
303         }
304     }
305 
306     private boolean closeSession() {
307         if (session == null) {
308             return true;
309         }
310         final Session temp = session;
311         session = null;
312         try {
313             temp.close();
314             return true;
315         } catch (final JMSException e) {
316             StatusLogger.getLogger().debug(
317                     "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
318                     e.getLocalizedMessage(), temp, e);
319             return false;
320         }
321     }
322 
323     private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
324         final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
325         if (configuration.getUserName() != null && configuration.getPassword() != null) {
326             return connectionFactory.createConnection(configuration.getUserName(),
327                     configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
328         }
329         return connectionFactory.createConnection();
330 
331     }
332 
333     private Destination createDestination(final JndiManager jndiManager) throws NamingException {
334         return jndiManager.lookup(configuration.getDestinationName());
335     }
336 
337     /**
338      * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object.
339      * <p>
340      * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as
341      * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent}
342      * message will be serialized to a String.
343      * </p>
344      * <p>
345      * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message
346      * will be serialized as a Java object.
347      * </p>
348      * <p>
349      * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message
350      * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage.
351      * </p>
352      *
353      * @param object
354      *            The LogEvent or String message to wrap.
355      * @return A new JMS message containing the provided object.
356      * @throws JMSException if the JMS provider fails to create a message due to some internal error.
357      */
358     public Message createMessage(final Serializable object) throws JMSException {
359         if (object instanceof String) {
360             return this.session.createTextMessage((String) object);
361         } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
362             return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
363         }
364         return this.session.createObjectMessage(object);
365     }
366 
367     private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
368         final Message message = createMessage(serializable);
369         message.setJMSTimestamp(event.getTimeMillis());
370         messageProducer.send(message);
371     }
372 
373     /**
374      * Creates a MessageConsumer on this Destination using the current Session.
375      *
376      * @return A MessageConsumer on this Destination.
377      * @throws JMSException if the session fails to create a consumer due to some internal error.
378      */
379     public MessageConsumer createMessageConsumer() throws JMSException {
380         return this.session.createConsumer(this.destination);
381     }
382 
383     /**
384      * Creates a MessageProducer on this Destination using the current Session.
385      *
386      * @param session
387      *            The JMS Session to use to create the MessageProducer
388      * @param destination
389      *            The JMS Destination for the MessageProducer
390      * @return A MessageProducer on this Destination.
391      * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
392      */
393     public MessageProducer createMessageProducer(final Session session, final Destination destination)
394             throws JMSException {
395         return session.createProducer(destination);
396     }
397 
398     private Reconnector createReconnector() {
399         final Reconnector recon = new Reconnector(this);
400         recon.setDaemon(true);
401         recon.setPriority(Thread.MIN_PRIORITY);
402         return recon;
403     }
404 
405     private Session createSession(final Connection connection) throws JMSException {
406         return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
407     }
408 
409     public JmsManagerConfiguration getJmsManagerConfiguration() {
410         return configuration;
411     }
412 
413     JndiManager getJndiManager() {
414         return configuration.getJndiManager();
415     }
416 
417     <T> T lookup(final String destinationName) throws NamingException {
418         return this.jndiManager.lookup(destinationName);
419     }
420 
421     private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
422             final MapMessage jmsMapMessage) {
423         // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map.
424         log4jMapMessage.forEach(new BiConsumer<String, Object>() {
425             @Override
426             public void accept(final String key, final Object value) {
427                 try {
428                     jmsMapMessage.setObject(key, value);
429                 } catch (final JMSException e) {
430                     throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
431                             e.getClass(), key, value, e.getLocalizedMessage()), e);
432                 }
433             }
434         });
435         return jmsMapMessage;
436     }
437 
438     @Override
439     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
440         if (reconnector != null) {
441             reconnector.shutdown();
442             reconnector.interrupt();
443             reconnector = null;
444         }
445         boolean closed = false;
446         closed &= closeJndiManager();
447         closed &= closeMessageProducer();
448         closed &= closeSession();
449         closed &= closeConnection();
450         return closed && this.jndiManager.stop(timeout, timeUnit);
451     }
452 
453     void send(final LogEvent event, final Serializable serializable) {
454         if (messageProducer == null) {
455             if (reconnector != null && !configuration.isImmediateFail()) {
456                 reconnector.latch();
457                 if (messageProducer == null) {
458                     throw new AppenderLoggingException(
459                             "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
460                 }
461             }
462         }
463         synchronized (this) {
464             try {
465                 createMessageAndSend(event, serializable);
466             } catch (final JMSException causeEx) {
467                 if (configuration.isRetry() && reconnector == null) {
468                     reconnector = createReconnector();
469                     try {
470                         closeJndiManager();
471                         reconnector.reconnect();
472                     } catch (NamingException | JMSException reconnEx) {
473                         logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
474                                 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
475                         reconnector.start();
476                         throw new AppenderLoggingException(
477                                 String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx);
478                     }
479                     try {
480                         createMessageAndSend(event, serializable);
481                     } catch (final JMSException e) {
482                         throw new AppenderLoggingException(
483                                 String.format("Error sending to %s after reestablishing JMS connection for %s",
484                                         getName(), configuration),
485                                 causeEx);
486                     }
487                 }
488             }
489         }
490     }
491 
492 }