001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 018package org.apache.logging.log4j.core.appender.mom; 019 020import java.io.Serializable; 021import java.util.Properties; 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.TimeUnit; 024 025import javax.jms.Connection; 026import javax.jms.ConnectionFactory; 027import javax.jms.Destination; 028import javax.jms.JMSException; 029import javax.jms.MapMessage; 030import javax.jms.Message; 031import javax.jms.MessageConsumer; 032import javax.jms.MessageProducer; 033import javax.jms.Session; 034import javax.naming.NamingException; 035 036import org.apache.logging.log4j.core.LogEvent; 037import org.apache.logging.log4j.core.appender.AbstractManager; 038import org.apache.logging.log4j.core.appender.AppenderLoggingException; 039import org.apache.logging.log4j.core.appender.ManagerFactory; 040import org.apache.logging.log4j.core.net.JndiManager; 041import org.apache.logging.log4j.core.util.Log4jThread; 042import org.apache.logging.log4j.status.StatusLogger; 043import org.apache.logging.log4j.util.BiConsumer; 044 045/** 046 * Consider this class <b>private</b>; it is only <b>public</b> for access by integration tests. 047 * 048 * <p> 049 * JMS connection and session manager. Can be used to access MessageProducer, MessageConsumer, and Message objects 050 * involving a configured ConnectionFactory and Destination. 051 * </p> 052 */ 053public class JmsManager extends AbstractManager { 054 055 public static class JmsManagerConfiguration { 056 private final Properties jndiProperties; 057 private final String connectionFactoryName; 058 private final String destinationName; 059 private final String userName; 060 private final char[] password; 061 private final boolean immediateFail; 062 private final boolean retry; 063 private final long reconnectIntervalMillis; 064 065 JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName, 066 final String destinationName, final String userName, final char[] password, final boolean immediateFail, 067 final long reconnectIntervalMillis) { 068 this.jndiProperties = jndiProperties; 069 this.connectionFactoryName = connectionFactoryName; 070 this.destinationName = destinationName; 071 this.userName = userName; 072 this.password = password; 073 this.immediateFail = immediateFail; 074 this.reconnectIntervalMillis = reconnectIntervalMillis; 075 this.retry = reconnectIntervalMillis > 0; 076 } 077 078 public String getConnectionFactoryName() { 079 return connectionFactoryName; 080 } 081 082 public String getDestinationName() { 083 return destinationName; 084 } 085 086 public JndiManager getJndiManager() { 087 return JndiManager.getJndiManager(getJndiProperties()); 088 } 089 090 public Properties getJndiProperties() { 091 return jndiProperties; 092 } 093 094 public char[] getPassword() { 095 return password; 096 } 097 098 public long getReconnectIntervalMillis() { 099 return reconnectIntervalMillis; 100 } 101 102 public String getUserName() { 103 return userName; 104 } 105 106 public boolean isImmediateFail() { 107 return immediateFail; 108 } 109 110 public boolean isRetry() { 111 return retry; 112 } 113 114 @Override 115 public String toString() { 116 return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName=" 117 + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName 118 + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis=" 119 + reconnectIntervalMillis + "]"; 120 } 121 122 } 123 124 private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> { 125 126 @Override 127 public JmsManager createManager(final String name, final JmsManagerConfiguration data) { 128 if (JndiManager.isJndiJmsEnabled()) { 129 try { 130 return new JmsManager(name, data); 131 } catch (final Exception e) { 132 logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e); 133 return null; 134 } 135 } 136 logger().error("JNDI must be enabled by setting log4j2.enableJndiJms=true"); 137 return null; 138 } 139 } 140 141 /** 142 * Handles reconnecting to JMS on a Thread. 143 */ 144 private class Reconnector extends Log4jThread { 145 146 private final CountDownLatch latch = new CountDownLatch(1); 147 148 private volatile boolean shutdown = false; 149 150 private final Object owner; 151 152 private Reconnector(final Object owner) { 153 super("JmsManager-Reconnector"); 154 this.owner = owner; 155 } 156 157 public void latch() { 158 try { 159 latch.await(); 160 } catch (final InterruptedException ex) { 161 // Ignore the exception. 162 } 163 } 164 165 void reconnect() throws NamingException, JMSException { 166 final JndiManager jndiManager2 = getJndiManager(); 167 final Connection connection2 = createConnection(jndiManager2); 168 final Session session2 = createSession(connection2); 169 final Destination destination2 = createDestination(jndiManager2); 170 final MessageProducer messageProducer2 = createMessageProducer(session2, destination2); 171 connection2.start(); 172 synchronized (owner) { 173 jndiManager = jndiManager2; 174 connection = connection2; 175 session = session2; 176 destination = destination2; 177 messageProducer = messageProducer2; 178 reconnector = null; 179 shutdown = true; 180 } 181 logger().debug("Connection reestablished to {}", configuration); 182 } 183 184 @Override 185 public void run() { 186 while (!shutdown) { 187 try { 188 sleep(configuration.getReconnectIntervalMillis()); 189 reconnect(); 190 } catch (final InterruptedException | JMSException | NamingException e) { 191 logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(), 192 e); 193 } finally { 194 latch.countDown(); 195 } 196 } 197 } 198 199 public void shutdown() { 200 shutdown = true; 201 } 202 203 } 204 205 static final JmsManagerFactory FACTORY = new JmsManagerFactory(); 206 207 /** 208 * Gets a JmsManager using the specified configuration parameters. 209 * 210 * @param name 211 * The name to use for this JmsManager. 212 * @param connectionFactoryName 213 * The binding name for the {@link javax.jms.ConnectionFactory}. 214 * @param destinationName 215 * The binding name for the {@link javax.jms.Destination}. 216 * @param userName 217 * The userName to connect with or {@code null} for no authentication. 218 * @param password 219 * The password to use with the given userName or {@code null} for no authentication. 220 * @param immediateFail 221 * Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting to JMS 222 * fails. 223 * @param reconnectIntervalMillis 224 * How to log sleep in milliseconds before trying to reconnect to JMS. 225 * @param jndiProperties 226 * JNDI properties. 227 * @return The JmsManager as configured. 228 */ 229 public static JmsManager getJmsManager(final String name, final Properties jndiProperties, 230 final String connectionFactoryName, final String destinationName, final String userName, 231 final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) { 232 final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName, 233 destinationName, userName, password, immediateFail, reconnectIntervalMillis); 234 return getManager(name, FACTORY, configuration); 235 } 236 237 private final JmsManagerConfiguration configuration; 238 239 private volatile Reconnector reconnector; 240 private volatile JndiManager jndiManager; 241 private volatile Connection connection; 242 private volatile Session session; 243 private volatile Destination destination; 244 private volatile MessageProducer messageProducer; 245 246 private JmsManager(final String name, final JmsManagerConfiguration configuration) { 247 super(null, name); 248 this.configuration = configuration; 249 this.jndiManager = configuration.getJndiManager(); 250 try { 251 this.connection = createConnection(this.jndiManager); 252 this.session = createSession(this.connection); 253 this.destination = createDestination(this.jndiManager); 254 this.messageProducer = createMessageProducer(this.session, this.destination); 255 this.connection.start(); 256 } catch (NamingException | JMSException e) { 257 this.reconnector = createReconnector(); 258 this.reconnector.start(); 259 } 260 } 261 262 private boolean closeConnection() { 263 if (connection == null) { 264 return true; 265 } 266 final Connection temp = connection; 267 connection = null; 268 try { 269 temp.close(); 270 return true; 271 } catch (final JMSException e) { 272 StatusLogger.getLogger().debug( 273 "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown", 274 e.getLocalizedMessage(), temp, e); 275 return false; 276 } 277 } 278 279 private boolean closeJndiManager() { 280 if (jndiManager == null) { 281 return true; 282 } 283 final JndiManager tmp = jndiManager; 284 jndiManager = null; 285 tmp.close(); 286 return true; 287 } 288 289 private boolean closeMessageProducer() { 290 if (messageProducer == null) { 291 return true; 292 } 293 final MessageProducer temp = messageProducer; 294 messageProducer = null; 295 try { 296 temp.close(); 297 return true; 298 } catch (final JMSException e) { 299 StatusLogger.getLogger().debug( 300 "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown", 301 e.getLocalizedMessage(), temp, e); 302 return false; 303 } 304 } 305 306 private boolean closeSession() { 307 if (session == null) { 308 return true; 309 } 310 final Session temp = session; 311 session = null; 312 try { 313 temp.close(); 314 return true; 315 } catch (final JMSException e) { 316 StatusLogger.getLogger().debug( 317 "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown", 318 e.getLocalizedMessage(), temp, e); 319 return false; 320 } 321 } 322 323 private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException { 324 final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName()); 325 if (configuration.getUserName() != null && configuration.getPassword() != null) { 326 return connectionFactory.createConnection(configuration.getUserName(), 327 configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword())); 328 } 329 return connectionFactory.createConnection(); 330 331 } 332 333 private Destination createDestination(final JndiManager jndiManager) throws NamingException { 334 return jndiManager.lookup(configuration.getDestinationName()); 335 } 336 337 /** 338 * Creates a TextMessage, MapMessage, or ObjectMessage from a Serializable object. 339 * <p> 340 * For instance, when using a text-based {@link org.apache.logging.log4j.core.Layout} such as 341 * {@link org.apache.logging.log4j.core.layout.PatternLayout}, the {@link org.apache.logging.log4j.core.LogEvent} 342 * message will be serialized to a String. 343 * </p> 344 * <p> 345 * When using a layout such as {@link org.apache.logging.log4j.core.layout.SerializedLayout}, the LogEvent message 346 * will be serialized as a Java object. 347 * </p> 348 * <p> 349 * When using a layout such as {@link org.apache.logging.log4j.core.layout.MessageLayout} and the LogEvent message 350 * is a Log4j MapMessage, the message will be serialized as a JMS MapMessage. 351 * </p> 352 * 353 * @param object 354 * The LogEvent or String message to wrap. 355 * @return A new JMS message containing the provided object. 356 * @throws JMSException if the JMS provider fails to create a message due to some internal error. 357 */ 358 public Message createMessage(final Serializable object) throws JMSException { 359 if (object instanceof String) { 360 return this.session.createTextMessage((String) object); 361 } else if (object instanceof org.apache.logging.log4j.message.MapMessage) { 362 return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage()); 363 } 364 return this.session.createObjectMessage(object); 365 } 366 367 private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException { 368 final Message message = createMessage(serializable); 369 message.setJMSTimestamp(event.getTimeMillis()); 370 messageProducer.send(message); 371 } 372 373 /** 374 * Creates a MessageConsumer on this Destination using the current Session. 375 * 376 * @return A MessageConsumer on this Destination. 377 * @throws JMSException if the session fails to create a consumer due to some internal error. 378 */ 379 public MessageConsumer createMessageConsumer() throws JMSException { 380 return this.session.createConsumer(this.destination); 381 } 382 383 /** 384 * Creates a MessageProducer on this Destination using the current Session. 385 * 386 * @param session 387 * The JMS Session to use to create the MessageProducer 388 * @param destination 389 * The JMS Destination for the MessageProducer 390 * @return A MessageProducer on this Destination. 391 * @throws JMSException if the session fails to create a MessageProducer due to some internal error. 392 */ 393 public MessageProducer createMessageProducer(final Session session, final Destination destination) 394 throws JMSException { 395 return session.createProducer(destination); 396 } 397 398 private Reconnector createReconnector() { 399 final Reconnector recon = new Reconnector(this); 400 recon.setDaemon(true); 401 recon.setPriority(Thread.MIN_PRIORITY); 402 return recon; 403 } 404 405 private Session createSession(final Connection connection) throws JMSException { 406 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 407 } 408 409 public JmsManagerConfiguration getJmsManagerConfiguration() { 410 return configuration; 411 } 412 413 JndiManager getJndiManager() { 414 return configuration.getJndiManager(); 415 } 416 417 <T> T lookup(final String destinationName) throws NamingException { 418 return this.jndiManager.lookup(destinationName); 419 } 420 421 private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage, 422 final MapMessage jmsMapMessage) { 423 // Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map. 424 log4jMapMessage.forEach(new BiConsumer<String, Object>() { 425 @Override 426 public void accept(final String key, final Object value) { 427 try { 428 jmsMapMessage.setObject(key, value); 429 } catch (final JMSException e) { 430 throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s", 431 e.getClass(), key, value, e.getLocalizedMessage()), e); 432 } 433 } 434 }); 435 return jmsMapMessage; 436 } 437 438 @Override 439 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 440 if (reconnector != null) { 441 reconnector.shutdown(); 442 reconnector.interrupt(); 443 reconnector = null; 444 } 445 boolean closed = false; 446 closed &= closeJndiManager(); 447 closed &= closeMessageProducer(); 448 closed &= closeSession(); 449 closed &= closeConnection(); 450 return closed && this.jndiManager.stop(timeout, timeUnit); 451 } 452 453 void send(final LogEvent event, final Serializable serializable) { 454 if (messageProducer == null) { 455 if (reconnector != null && !configuration.isImmediateFail()) { 456 reconnector.latch(); 457 if (messageProducer == null) { 458 throw new AppenderLoggingException( 459 "Error sending to JMS Manager '" + getName() + "': JMS message producer not available"); 460 } 461 } 462 } 463 synchronized (this) { 464 try { 465 createMessageAndSend(event, serializable); 466 } catch (final JMSException causeEx) { 467 if (configuration.isRetry() && reconnector == null) { 468 reconnector = createReconnector(); 469 try { 470 closeJndiManager(); 471 reconnector.reconnect(); 472 } catch (NamingException | JMSException reconnEx) { 473 logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}", 474 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx); 475 reconnector.start(); 476 throw new AppenderLoggingException( 477 String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx); 478 } 479 try { 480 createMessageAndSend(event, serializable); 481 } catch (final JMSException e) { 482 throw new AppenderLoggingException( 483 String.format("Error sending to %s after reestablishing JMS connection for %s", 484 getName(), configuration), 485 causeEx); 486 } 487 } 488 } 489 } 490 } 491 492}