001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom; 019 020import java.io.Serializable; 021import java.util.Properties; 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.TimeUnit; 024 025import javax.jms.Connection; 026import javax.jms.ConnectionFactory; 027import javax.jms.Destination; 028import javax.jms.JMSException; 029import javax.jms.MapMessage; 030import javax.jms.Message; 031import javax.jms.MessageConsumer; 032import javax.jms.MessageProducer; 033import javax.jms.Session; 034import javax.naming.NamingException; 035 036import org.apache.logging.log4j.core.LogEvent; 037import org.apache.logging.log4j.core.appender.AbstractManager; 038import org.apache.logging.log4j.core.appender.AppenderLoggingException; 039import org.apache.logging.log4j.core.appender.ManagerFactory; 040import org.apache.logging.log4j.core.net.JndiManager; 041import org.apache.logging.log4j.core.util.Log4jThread; 042import org.apache.logging.log4j.status.StatusLogger; 043 044/** 045 * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests. 046 * 047 * <p> 048 * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects 049 * involving a configured ConnectionFactory and Destination. 050 * </p> 051 */ 052public class JmsManager extends AbstractManager { 053 054 public static class JmsManagerConfiguration { 055 private final Properties jndiProperties; 056 private final String connectionFactoryName; 057 private final String destinationName; 058 private final String userName; 059 private final char[] password; 060 private final boolean immediateFail; 061 private final boolean retry; 062 private final long reconnectIntervalMillis; 063 064 JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName, 065 final String destinationName, final String userName, final char[] password, final boolean immediateFail, 066 final long reconnectIntervalMillis) { 067 this.jndiProperties = jndiProperties; 068 this.connectionFactoryName = connectionFactoryName; 069 this.destinationName = destinationName; 070 this.userName = userName; 071 this.password = password; 072 this.immediateFail = immediateFail; 073 this.reconnectIntervalMillis = reconnectIntervalMillis; 074 this.retry = reconnectIntervalMillis > 0; 075 } 076 077 public String getConnectionFactoryName() { 078 return connectionFactoryName; 079 } 080 081 public String getDestinationName() { 082 return destinationName; 083 } 084 085 public JndiManager getJndiManager() { 086 return JndiManager.getJndiManager(getJndiProperties()); 087 } 088 089 public Properties getJndiProperties() { 090 return jndiProperties; 091 } 092 093 public char[] getPassword() { 094 return password; 095 } 096 097 public long getReconnectIntervalMillis() { 098 return reconnectIntervalMillis; 099 } 100 101 public String getUserName() { 102 return userName; 103 } 104 105 public boolean isImmediateFail() { 106 return immediateFail; 107 } 108 109 public boolean isRetry() { 110 return retry; 111 } 112 113 @Override 114 public String toString() { 115 return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName=" 116 + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName 117 + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis=" 118 + reconnectIntervalMillis + "]"; 119 } 120 121 } 122 123 private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> { 124 125 @Override 126 public JmsManager createManager(final String name, final JmsManagerConfiguration data) { 127 if (JndiManager.isJndiEnabled()) { 128 try { 129 return new JmsManager(name, data); 130 } catch (final Exception e) { 131 logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e); 132 return null; 133 } 134 } else { 135 logger().error("JNDI has not been enabled. The log4j2.enableJndi property must be set to true"); 136 return null; 137 } 138 } 139 } 140 141 /** 142 * Handles reconnecting to JMS on a Thread. 143 */ 144 private class Reconnector extends Log4jThread { 145 146 private final CountDownLatch latch = new CountDownLatch(1); 147 148 private volatile boolean shutdown; 149 150 private final Object owner; 151 152 private Reconnector(final Object owner) { 153 super("JmsManager-Reconnector"); 154 this.owner = owner; 155 } 156 157 public void latch() { 158 try { 159 latch.await(); 160 } catch (final InterruptedException ex) { 161 // Ignore the exception. 162 } 163 } 164 165 void reconnect() throws NamingException, JMSException { 166 final JndiManager jndiManager2 = getJndiManager(); 167 final Connection connection2 = createConnection(jndiManager2); 168 final Session session2 = createSession(connection2); 169 final Destination destination2 = createDestination(jndiManager2); 170 final MessageProducer messageProducer2 = createMessageProducer(session2, destination2); 171 connection2.start(); 172 synchronized (owner) { 173 jndiManager = jndiManager2; 174 connection = connection2; 175 session = session2; 176 destination = destination2; 177 messageProducer = messageProducer2; 178 reconnector = null; 179 shutdown = true; 180 } 181 logger().debug("Connection reestablished to {}", configuration); 182 } 183 184 @Override 185 public void run() { 186 while (!shutdown) { 187 try { 188 sleep(configuration.getReconnectIntervalMillis()); 189 reconnect(); 190 } catch (final InterruptedException | JMSException | NamingException e) { 191 logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(), 192 e); 193 } finally { 194 latch.countDown(); 195 } 196 } 197 } 198 199 public void shutdown() { 200 shutdown = true; 201 } 202 203 } 204 205 static final JmsManagerFactory FACTORY = new JmsManagerFactory(); 206 207 /** 208 * Gets a JmsManager using the specified configuration parameters. 209 * 210 * @param name 211 * The name to use for this JmsManager. 212 * @param connectionFactoryName 213 * The binding name for the {@link javax.jms.ConnectionFactory}. 214 * @param destinationName 215 * The binding name for the {@link javax.jms.Destination}. 216 * @param userName 217 * The userName to connect with or {@code null} for no authentication. 218 * @param password 219 * The password to use with the given userName or {@code null} for no authentication. 220 * @param immediateFail 221 * Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS 222 * fails. 223 * @param reconnectIntervalMillis 224 * How to log sleep in milliseconds before trying to reconnect to JMS. 225 * @param jndiProperties 226 * JNDI properties. 227 * @return The JmsManager as configured. 228 */ 229 public static JmsManager getJmsManager(final String name, final Properties jndiProperties, 230 final String connectionFactoryName, final String destinationName, final String userName, 231 final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) { 232 final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName, 233 destinationName, userName, password, immediateFail, reconnectIntervalMillis); 234 return getManager(name, FACTORY, configuration); 235 } 236 237 private final JmsManagerConfiguration configuration; 238 239 private volatile Reconnector reconnector; 240 private volatile JndiManager jndiManager; 241 private volatile Connection connection; 242 private volatile Session session; 243 private volatile Destination destination; 244 private volatile MessageProducer messageProducer; 245 246 private JmsManager(final String name, final JmsManagerConfiguration configuration) { 247 super(null, name); 248 this.configuration = configuration; 249 this.jndiManager = configuration.getJndiManager(); 250 try { 251 this.connection = createConnection(this.jndiManager); 252 this.session = createSession(this.connection); 253 this.destination = createDestination(this.jndiManager); 254 this.messageProducer = createMessageProducer(this.session, this.destination); 255 this.connection.start(); 256 } catch (NamingException | JMSException e) { 257 this.reconnector = createReconnector(); 258 this.reconnector.start(); 259 } 260 } 261 262 private boolean closeConnection() { 263 if (connection == null) { 264 return true; 265 } 266 final Connection temp = connection; 267 connection = null; 268 try { 269 temp.close(); 270 return true; 271 } catch (final JMSException e) { 272 StatusLogger.getLogger().debug( 273 "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown", 274 e.getLocalizedMessage(), temp, e); 275 return false; 276 } 277 } 278 279 private boolean closeJndiManager() { 280 if (jndiManager == null) { 281 return true; 282 } 283 final JndiManager tmp = jndiManager; 284 jndiManager = null; 285 tmp.close(); 286 return true; 287 } 288 289 private boolean closeMessageProducer() { 290 if (messageProducer == null) { 291 return true; 292 } 293 final MessageProducer temp = messageProducer; 294 messageProducer = null; 295 try { 296 temp.close(); 297 return true; 298 } catch (final JMSException e) { 299 StatusLogger.getLogger().debug( 300 "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown", 301 e.getLocalizedMessage(), temp, e); 302 return false; 303 } 304 } 305 306 private boolean closeSession() { 307 if (session == null) { 308 return true; 309 } 310 final Session temp = session; 311 session = null; 312 try { 313 temp.close(); 314 return true; 315 } catch (final JMSException e) { 316 StatusLogger.getLogger().debug( 317 "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown", 318 e.getLocalizedMessage(), temp, e); 319 return false; 320 } 321 } 322 323 private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException { 324 final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName()); 325 if (configuration.getUserName() != null && configuration.getPassword() != null) { 326 return connectionFactory.createConnection(configuration.getUserName(), 327 configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword())); 328 } 329 return connectionFactory.createConnection(); 330 331 } 332 333 private Destination createDestination(final JndiManager jndiManager) throws NamingException { 334 return jndiManager.lookup(configuration.getDestinationName()); 335 } 336 337 /** 338 * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object. 339 * <p> 340 * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as 341 * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent} 342 * message will be serialized to a String. 343 * </p> 344 * <p> 345 * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message 346 * will be serialized as a Java object. 347 * </p> 348 * <p> 349 * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message 350 * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage. 351 * </p> 352 * 353 * @param object 354 * The LogEvent or String message to wrap. 355 * @return A new JMS message containing the provided object. 356 * @throws JMSException 357 */ 358 public Message createMessage(final Serializable object) throws JMSException { 359 if (object instanceof String) { 360 return this.session.createTextMessage((String) object); 361 } else if (object instanceof org.apache.logging.log4j.message.MapMessage) { 362 return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage()); 363 } 364 return this.session.createObjectMessage(object); 365 } 366 367 private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException { 368 final Message message = createMessage(serializable); 369 message.setJMSTimestamp(event.getTimeMillis()); 370 messageProducer.send(message); 371 } 372 373 /** 374 * Creates a MessageConsumer on this Destination using the current Session. 375 * 376 * @return A MessageConsumer on this Destination. 377 * @throws JMSException 378 */ 379 public MessageConsumer createMessageConsumer() throws JMSException { 380 return this.session.createConsumer(this.destination); 381 } 382 383 /** 384 * Creates a MessageProducer on this Destination using the current Session. 385 * 386 * @param session 387 * The JMS Session to use to create the MessageProducer 388 * @param destination 389 * The JMS Destination for the MessageProducer 390 * @return A MessageProducer on this Destination. 391 * @throws JMSException 392 */ 393 public MessageProducer createMessageProducer(final Session session, final Destination destination) 394 throws JMSException { 395 return session.createProducer(destination); 396 } 397 398 private Reconnector createReconnector() { 399 final Reconnector recon = new Reconnector(this); 400 recon.setDaemon(true); 401 recon.setPriority(Thread.MIN_PRIORITY); 402 return recon; 403 } 404 405 private Session createSession(final Connection connection) throws JMSException { 406 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 407 } 408 409 public JmsManagerConfiguration getJmsManagerConfiguration() { 410 return configuration; 411 } 412 413 JndiManager getJndiManager() { 414 return configuration.getJndiManager(); 415 } 416 417 <T> T lookup(final String destinationName) throws NamingException { 418 return this.jndiManager.lookup(destinationName); 419 } 420 421 private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage, 422 final MapMessage jmsMapMessage) { 423 // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map. 424 log4jMapMessage.forEach((key, value) -> { 425 try { 426 jmsMapMessage.setObject(key, value); 427 } catch (final JMSException e) { 428 throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s", 429 e.getClass(), key, value, e.getLocalizedMessage()), e); 430 } 431 }); 432 return jmsMapMessage; 433 } 434 435 @Override 436 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 437 if (reconnector != null) { 438 reconnector.shutdown(); 439 reconnector.interrupt(); 440 reconnector = null; 441 } 442 boolean closed = false; 443 closed &= closeJndiManager(); 444 closed &= closeMessageProducer(); 445 closed &= closeSession(); 446 closed &= closeConnection(); 447 return closed && this.jndiManager.stop(timeout, timeUnit); 448 } 449 450 void send(final LogEvent event, final Serializable serializable) { 451 if (messageProducer == null) { 452 if (reconnector != null && !configuration.isImmediateFail()) { 453 reconnector.latch(); 454 if (messageProducer == null) { 455 throw new AppenderLoggingException( 456 "Error sending to JMS Manager '" + getName() + "': JMS message producer not available"); 457 } 458 } 459 } 460 synchronized (this) { 461 try { 462 createMessageAndSend(event, serializable); 463 } catch (final JMSException causeEx) { 464 if (configuration.isRetry() && reconnector == null) { 465 reconnector = createReconnector(); 466 try { 467 closeJndiManager(); 468 reconnector.reconnect(); 469 } catch (NamingException | JMSException reconnEx) { 470 logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}", 471 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx); 472 reconnector.start(); 473 throw new AppenderLoggingException( 474 String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx); 475 } 476 try { 477 createMessageAndSend(event, serializable); 478 } catch (final JMSException e) { 479 throw new AppenderLoggingException( 480 String.format("Error sending to %s after reestablishing JMS connection for %s", 481 getName(), configuration), 482 causeEx); 483 } 484 } 485 } 486 } 487 } 488 489}