001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom; 019 020import java.io.Serializable; 021import java.util.Properties; 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.TimeUnit; 024 025import javax.jms.Connection; 026import javax.jms.ConnectionFactory; 027import javax.jms.Destination; 028import javax.jms.JMSException; 029import javax.jms.MapMessage; 030import javax.jms.Message; 031import javax.jms.MessageConsumer; 032import javax.jms.MessageProducer; 033import javax.jms.Session; 034import javax.naming.NamingException; 035 036import org.apache.logging.log4j.core.LogEvent; 037import org.apache.logging.log4j.core.appender.AbstractManager; 038import org.apache.logging.log4j.core.appender.AppenderLoggingException; 039import org.apache.logging.log4j.core.appender.ManagerFactory; 040import org.apache.logging.log4j.core.net.JndiManager; 041import org.apache.logging.log4j.core.util.Log4jThread; 042import org.apache.logging.log4j.status.StatusLogger; 043import org.apache.logging.log4j.util.BiConsumer; 044 045/** 046 * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests. 047 * 048 * <p> 049 * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects 050 * involving a configured ConnectionFactory and Destination. 051 * </p> 052 */ 053public class JmsManager extends AbstractManager { 054 055 public static class JmsManagerConfiguration { 056 private final Properties jndiProperties; 057 private final String connectionFactoryName; 058 private final String destinationName; 059 private final String userName; 060 private final char[] password; 061 private final boolean immediateFail; 062 private final boolean retry; 063 private final long reconnectIntervalMillis; 064 065 JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName, 066 final String destinationName, final String userName, final char[] password, final boolean immediateFail, 067 final long reconnectIntervalMillis) { 068 this.jndiProperties = jndiProperties; 069 this.connectionFactoryName = connectionFactoryName; 070 this.destinationName = destinationName; 071 this.userName = userName; 072 this.password = password; 073 this.immediateFail = immediateFail; 074 this.reconnectIntervalMillis = reconnectIntervalMillis; 075 this.retry = reconnectIntervalMillis > 0; 076 } 077 078 public String getConnectionFactoryName() { 079 return connectionFactoryName; 080 } 081 082 public String getDestinationName() { 083 return destinationName; 084 } 085 086 public JndiManager getJndiManager() { 087 return JndiManager.getJndiManager(getJndiProperties()); 088 } 089 090 public Properties getJndiProperties() { 091 return jndiProperties; 092 } 093 094 public char[] getPassword() { 095 return password; 096 } 097 098 public long getReconnectIntervalMillis() { 099 return reconnectIntervalMillis; 100 } 101 102 public String getUserName() { 103 return userName; 104 } 105 106 public boolean isImmediateFail() { 107 return immediateFail; 108 } 109 110 public boolean isRetry() { 111 return retry; 112 } 113 114 @Override 115 public String toString() { 116 return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName=" 117 + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName 118 + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis=" 119 + reconnectIntervalMillis + "]"; 120 } 121 122 } 123 124 private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> { 125 126 @Override 127 public JmsManager createManager(final String name, final JmsManagerConfiguration data) { 128 try { 129 return new JmsManager(name, data); 130 } catch (final Exception e) { 131 logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e); 132 return null; 133 } 134 } 135 } 136 137 /** 138 * Handles reconnecting to JMS on a Thread. 139 */ 140 private class Reconnector extends Log4jThread { 141 142 private final CountDownLatch latch = new CountDownLatch(1); 143 144 private volatile boolean shutdown = false; 145 146 private final Object owner; 147 148 private Reconnector(final Object owner) { 149 super("JmsManager-Reconnector"); 150 this.owner = owner; 151 } 152 153 public void latch() { 154 try { 155 latch.await(); 156 } catch (final InterruptedException ex) { 157 // Ignore the exception. 158 } 159 } 160 161 void reconnect() throws NamingException, JMSException { 162 final JndiManager jndiManager2 = getJndiManager(); 163 final Connection connection2 = createConnection(jndiManager2); 164 final Session session2 = createSession(connection2); 165 final Destination destination2 = createDestination(jndiManager2); 166 final MessageProducer messageProducer2 = createMessageProducer(session2, destination2); 167 connection2.start(); 168 synchronized (owner) { 169 jndiManager = jndiManager2; 170 connection = connection2; 171 session = session2; 172 destination = destination2; 173 messageProducer = messageProducer2; 174 reconnector = null; 175 shutdown = true; 176 } 177 logger().debug("Connection reestablished to {}", configuration); 178 } 179 180 @Override 181 public void run() { 182 while (!shutdown) { 183 try { 184 sleep(configuration.getReconnectIntervalMillis()); 185 reconnect(); 186 } catch (final InterruptedException | JMSException | NamingException e) { 187 logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(), 188 e); 189 } finally { 190 latch.countDown(); 191 } 192 } 193 } 194 195 public void shutdown() { 196 shutdown = true; 197 } 198 199 } 200 201 static final JmsManagerFactory FACTORY = new JmsManagerFactory(); 202 203 /** 204 * Gets a JmsManager using the specified configuration parameters. 205 * 206 * @param name 207 * The name to use for this JmsManager. 208 * @param connectionFactoryName 209 * The binding name for the {@link javax.jms.ConnectionFactory}. 210 * @param destinationName 211 * The binding name for the {@link javax.jms.Destination}. 212 * @param userName 213 * The userName to connect with or {@code null} for no authentication. 214 * @param password 215 * The password to use with the given userName or {@code null} for no authentication. 216 * @param immediateFail 217 * Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS 218 * fails. 219 * @param reconnectIntervalMillis 220 * How to log sleep in milliseconds before trying to reconnect to JMS. 221 * @param jndiProperties 222 * JNDI properties. 223 * @return The JmsManager as configured. 224 */ 225 public static JmsManager getJmsManager(final String name, final Properties jndiProperties, 226 final String connectionFactoryName, final String destinationName, final String userName, 227 final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) { 228 final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName, 229 destinationName, userName, password, immediateFail, reconnectIntervalMillis); 230 return getManager(name, FACTORY, configuration); 231 } 232 233 private final JmsManagerConfiguration configuration; 234 235 private volatile Reconnector reconnector; 236 private volatile JndiManager jndiManager; 237 private volatile Connection connection; 238 private volatile Session session; 239 private volatile Destination destination; 240 private volatile MessageProducer messageProducer; 241 242 private JmsManager(final String name, final JmsManagerConfiguration configuration) { 243 super(null, name); 244 this.configuration = configuration; 245 this.jndiManager = configuration.getJndiManager(); 246 try { 247 this.connection = createConnection(this.jndiManager); 248 this.session = createSession(this.connection); 249 this.destination = createDestination(this.jndiManager); 250 this.messageProducer = createMessageProducer(this.session, this.destination); 251 this.connection.start(); 252 } catch (NamingException | JMSException e) { 253 this.reconnector = createReconnector(); 254 this.reconnector.start(); 255 } 256 } 257 258 private boolean closeConnection() { 259 if (connection == null) { 260 return true; 261 } 262 final Connection temp = connection; 263 connection = null; 264 try { 265 temp.close(); 266 return true; 267 } catch (final JMSException e) { 268 StatusLogger.getLogger().debug( 269 "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown", 270 e.getLocalizedMessage(), temp, e); 271 return false; 272 } 273 } 274 275 private boolean closeJndiManager() { 276 if (jndiManager == null) { 277 return true; 278 } 279 final JndiManager tmp = jndiManager; 280 jndiManager = null; 281 tmp.close(); 282 return true; 283 } 284 285 private boolean closeMessageProducer() { 286 if (messageProducer == null) { 287 return true; 288 } 289 final MessageProducer temp = messageProducer; 290 messageProducer = null; 291 try { 292 temp.close(); 293 return true; 294 } catch (final JMSException e) { 295 StatusLogger.getLogger().debug( 296 "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown", 297 e.getLocalizedMessage(), temp, e); 298 return false; 299 } 300 } 301 302 private boolean closeSession() { 303 if (session == null) { 304 return true; 305 } 306 final Session temp = session; 307 session = null; 308 try { 309 temp.close(); 310 return true; 311 } catch (final JMSException e) { 312 StatusLogger.getLogger().debug( 313 "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown", 314 e.getLocalizedMessage(), temp, e); 315 return false; 316 } 317 } 318 319 private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException { 320 final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName()); 321 if (configuration.getUserName() != null && configuration.getPassword() != null) { 322 return connectionFactory.createConnection(configuration.getUserName(), 323 configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword())); 324 } 325 return connectionFactory.createConnection(); 326 327 } 328 329 private Destination createDestination(final JndiManager jndiManager) throws NamingException { 330 return jndiManager.lookup(configuration.getDestinationName()); 331 } 332 333 /** 334 * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object. 335 * <p> 336 * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as 337 * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent} 338 * message will be serialized to a String. 339 * </p> 340 * <p> 341 * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message 342 * will be serialized as a Java object. 343 * </p> 344 * <p> 345 * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message 346 * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage. 347 * </p> 348 * 349 * @param object 350 * The LogEvent or String message to wrap. 351 * @return A new JMS message containing the provided object. 352 * @throws JMSException 353 */ 354 public Message createMessage(final Serializable object) throws JMSException { 355 if (object instanceof String) { 356 return this.session.createTextMessage((String) object); 357 } else if (object instanceof org.apache.logging.log4j.message.MapMessage) { 358 return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage()); 359 } 360 return this.session.createObjectMessage(object); 361 } 362 363 private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException { 364 final Message message = createMessage(serializable); 365 message.setJMSTimestamp(event.getTimeMillis()); 366 messageProducer.send(message); 367 } 368 369 /** 370 * Creates a MessageConsumer on this Destination using the current Session. 371 * 372 * @return A MessageConsumer on this Destination. 373 * @throws JMSException 374 */ 375 public MessageConsumer createMessageConsumer() throws JMSException { 376 return this.session.createConsumer(this.destination); 377 } 378 379 /** 380 * Creates a MessageProducer on this Destination using the current Session. 381 * 382 * @param session 383 * The JMS Session to use to create the MessageProducer 384 * @param destination 385 * The JMS Destination for the MessageProducer 386 * @return A MessageProducer on this Destination. 387 * @throws JMSException 388 */ 389 public MessageProducer createMessageProducer(final Session session, final Destination destination) 390 throws JMSException { 391 return session.createProducer(destination); 392 } 393 394 private Reconnector createReconnector() { 395 final Reconnector recon = new Reconnector(this); 396 recon.setDaemon(true); 397 recon.setPriority(Thread.MIN_PRIORITY); 398 return recon; 399 } 400 401 private Session createSession(final Connection connection) throws JMSException { 402 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 403 } 404 405 public JmsManagerConfiguration getJmsManagerConfiguration() { 406 return configuration; 407 } 408 409 JndiManager getJndiManager() { 410 return configuration.getJndiManager(); 411 } 412 413 <T> T lookup(final String destinationName) throws NamingException { 414 return this.jndiManager.lookup(destinationName); 415 } 416 417 private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage, 418 final MapMessage jmsMapMessage) { 419 // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map. 420 log4jMapMessage.forEach(new BiConsumer<String, Object>() { 421 @Override 422 public void accept(final String key, final Object value) { 423 try { 424 jmsMapMessage.setObject(key, value); 425 } catch (final JMSException e) { 426 throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s", 427 e.getClass(), key, value, e.getLocalizedMessage()), e); 428 } 429 } 430 }); 431 return jmsMapMessage; 432 } 433 434 @Override 435 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 436 if (reconnector != null) { 437 reconnector.shutdown(); 438 reconnector.interrupt(); 439 reconnector = null; 440 } 441 boolean closed = false; 442 closed &= closeJndiManager(); 443 closed &= closeMessageProducer(); 444 closed &= closeSession(); 445 closed &= closeConnection(); 446 return closed && this.jndiManager.stop(timeout, timeUnit); 447 } 448 449 void send(final LogEvent event, final Serializable serializable) { 450 if (messageProducer == null) { 451 if (reconnector != null && !configuration.isImmediateFail()) { 452 reconnector.latch(); 453 if (messageProducer == null) { 454 throw new AppenderLoggingException( 455 "Error sending to JMS Manager '" + getName() + "': JMS message producer not available"); 456 } 457 } 458 } 459 synchronized (this) { 460 try { 461 createMessageAndSend(event, serializable); 462 } catch (final JMSException causeEx) { 463 if (configuration.isRetry() && reconnector == null) { 464 reconnector = createReconnector(); 465 try { 466 closeJndiManager(); 467 reconnector.reconnect(); 468 } catch (NamingException | JMSException reconnEx) { 469 logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}", 470 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx); 471 reconnector.start(); 472 throw new AppenderLoggingException( 473 String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx); 474 } 475 try { 476 createMessageAndSend(event, serializable); 477 } catch (final JMSException e) { 478 throw new AppenderLoggingException( 479 String.format("Error sending to %s after reestablishing JMS connection for %s", 480 getName(), configuration), 481 causeEx); 482 } 483 } 484 } 485 } 486 } 487 488}