1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom;
19
20 import java.io.Serializable;
21 import java.util.Properties;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24
25 import javax.jms.Connection;
26 import javax.jms.ConnectionFactory;
27 import javax.jms.Destination;
28 import javax.jms.JMSException;
29 import javax.jms.MapMessage;
30 import javax.jms.Message;
31 import javax.jms.MessageConsumer;
32 import javax.jms.MessageProducer;
33 import javax.jms.Session;
34 import javax.naming.NamingException;
35
36 import org.apache.logging.log4j.core.LogEvent;
37 import org.apache.logging.log4j.core.appender.AbstractManager;
38 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
39 import org.apache.logging.log4j.core.appender.ManagerFactory;
40 import org.apache.logging.log4j.core.net.JndiManager;
41 import org.apache.logging.log4j.core.util.Log4jThread;
42 import org.apache.logging.log4j.status.StatusLogger;
43 import org.apache.logging.log4j.util.BiConsumer;
44
45
46
47
48
49
50
51
52
53 public class JmsManager extends AbstractManager {
54
55 public static class JmsManagerConfiguration {
56 private final Properties jndiProperties;
57 private final String connectionFactoryName;
58 private final String destinationName;
59 private final String userName;
60 private final char[] password;
61 private final boolean immediateFail;
62 private final boolean retry;
63 private final long reconnectIntervalMillis;
64
65 JmsManagerConfiguration(final Properties jndiProperties, final String connectionFactoryName,
66 final String destinationName, final String userName, final char[] password, final boolean immediateFail,
67 final long reconnectIntervalMillis) {
68 this.jndiProperties = jndiProperties;
69 this.connectionFactoryName = connectionFactoryName;
70 this.destinationName = destinationName;
71 this.userName = userName;
72 this.password = password;
73 this.immediateFail = immediateFail;
74 this.reconnectIntervalMillis = reconnectIntervalMillis;
75 this.retry = reconnectIntervalMillis > 0;
76 }
77
78 public String getConnectionFactoryName() {
79 return connectionFactoryName;
80 }
81
82 public String getDestinationName() {
83 return destinationName;
84 }
85
86 public JndiManager getJndiManager() {
87 return JndiManager.getJndiManager(getJndiProperties());
88 }
89
90 public Properties getJndiProperties() {
91 return jndiProperties;
92 }
93
94 public char[] getPassword() {
95 return password;
96 }
97
98 public long getReconnectIntervalMillis() {
99 return reconnectIntervalMillis;
100 }
101
102 public String getUserName() {
103 return userName;
104 }
105
106 public boolean isImmediateFail() {
107 return immediateFail;
108 }
109
110 public boolean isRetry() {
111 return retry;
112 }
113
114 @Override
115 public String toString() {
116 return "JmsManagerConfiguration [jndiProperties=" + jndiProperties + ", connectionFactoryName="
117 + connectionFactoryName + ", destinationName=" + destinationName + ", userName=" + userName
118 + ", immediateFail=" + immediateFail + ", retry=" + retry + ", reconnectIntervalMillis="
119 + reconnectIntervalMillis + "]";
120 }
121
122 }
123
124 private static class JmsManagerFactory implements ManagerFactory<JmsManager, JmsManagerConfiguration> {
125
126 @Override
127 public JmsManager createManager(final String name, final JmsManagerConfiguration data) {
128 if (JndiManager.isJndiJmsEnabled()) {
129 try {
130 return new JmsManager(name, data);
131 } catch (final Exception e) {
132 logger().error("Error creating JmsManager using JmsManagerConfiguration [{}]", data, e);
133 return null;
134 }
135 }
136 logger().error("JNDI must be enabled by setting log4j2.enableJndiJms=true");
137 return null;
138 }
139 }
140
141
142
143
144 private class Reconnector extends Log4jThread {
145
146 private final CountDownLatch latch = new CountDownLatch(1);
147
148 private volatile boolean shutdown = false;
149
150 private final Object owner;
151
152 private Reconnector(final Object owner) {
153 super("JmsManager-Reconnector");
154 this.owner = owner;
155 }
156
157 public void latch() {
158 try {
159 latch.await();
160 } catch (final InterruptedException ex) {
161
162 }
163 }
164
165 void reconnect() throws NamingException, JMSException {
166 final JndiManager jndiManager2 = getJndiManager();
167 final Connection connection2 = createConnection(jndiManager2);
168 final Session session2 = createSession(connection2);
169 final Destination destination2 = createDestination(jndiManager2);
170 final MessageProducer messageProducer2 = createMessageProducer(session2, destination2);
171 connection2.start();
172 synchronized (owner) {
173 jndiManager = jndiManager2;
174 connection = connection2;
175 session = session2;
176 destination = destination2;
177 messageProducer = messageProducer2;
178 reconnector = null;
179 shutdown = true;
180 }
181 logger().debug("Connection reestablished to {}", configuration);
182 }
183
184 @Override
185 public void run() {
186 while (!shutdown) {
187 try {
188 sleep(configuration.getReconnectIntervalMillis());
189 reconnect();
190 } catch (final InterruptedException | JMSException | NamingException e) {
191 logger().debug("Cannot reestablish JMS connection to {}: {}", configuration, e.getLocalizedMessage(),
192 e);
193 } finally {
194 latch.countDown();
195 }
196 }
197 }
198
199 public void shutdown() {
200 shutdown = true;
201 }
202
203 }
204
205 static final JmsManagerFactory FACTORY = new JmsManagerFactory();
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 public static JmsManager getJmsManager(final String name, final Properties jndiProperties,
230 final String connectionFactoryName, final String destinationName, final String userName,
231 final char[] password, final boolean immediateFail, final long reconnectIntervalMillis) {
232 final JmsManagerConfiguration configuration = new JmsManagerConfiguration(jndiProperties, connectionFactoryName,
233 destinationName, userName, password, immediateFail, reconnectIntervalMillis);
234 return getManager(name, FACTORY, configuration);
235 }
236
237 private final JmsManagerConfiguration configuration;
238
239 private volatile Reconnector reconnector;
240 private volatile JndiManager jndiManager;
241 private volatile Connection connection;
242 private volatile Session session;
243 private volatile Destination destination;
244 private volatile MessageProducer messageProducer;
245
246 private JmsManager(final String name, final JmsManagerConfiguration configuration) {
247 super(null, name);
248 this.configuration = configuration;
249 this.jndiManager = configuration.getJndiManager();
250 try {
251 this.connection = createConnection(this.jndiManager);
252 this.session = createSession(this.connection);
253 this.destination = createDestination(this.jndiManager);
254 this.messageProducer = createMessageProducer(this.session, this.destination);
255 this.connection.start();
256 } catch (NamingException | JMSException e) {
257 this.reconnector = createReconnector();
258 this.reconnector.start();
259 }
260 }
261
262 private boolean closeConnection() {
263 if (connection == null) {
264 return true;
265 }
266 final Connection temp = connection;
267 connection = null;
268 try {
269 temp.close();
270 return true;
271 } catch (final JMSException e) {
272 StatusLogger.getLogger().debug(
273 "Caught exception closing JMS Connection: {} ({}); continuing JMS manager shutdown",
274 e.getLocalizedMessage(), temp, e);
275 return false;
276 }
277 }
278
279 private boolean closeJndiManager() {
280 if (jndiManager == null) {
281 return true;
282 }
283 final JndiManager tmp = jndiManager;
284 jndiManager = null;
285 tmp.close();
286 return true;
287 }
288
289 private boolean closeMessageProducer() {
290 if (messageProducer == null) {
291 return true;
292 }
293 final MessageProducer temp = messageProducer;
294 messageProducer = null;
295 try {
296 temp.close();
297 return true;
298 } catch (final JMSException e) {
299 StatusLogger.getLogger().debug(
300 "Caught exception closing JMS MessageProducer: {} ({}); continuing JMS manager shutdown",
301 e.getLocalizedMessage(), temp, e);
302 return false;
303 }
304 }
305
306 private boolean closeSession() {
307 if (session == null) {
308 return true;
309 }
310 final Session temp = session;
311 session = null;
312 try {
313 temp.close();
314 return true;
315 } catch (final JMSException e) {
316 StatusLogger.getLogger().debug(
317 "Caught exception closing JMS Session: {} ({}); continuing JMS manager shutdown",
318 e.getLocalizedMessage(), temp, e);
319 return false;
320 }
321 }
322
323 private Connection createConnection(final JndiManager jndiManager) throws NamingException, JMSException {
324 final ConnectionFactory connectionFactory = jndiManager.lookup(configuration.getConnectionFactoryName());
325 if (configuration.getUserName() != null && configuration.getPassword() != null) {
326 return connectionFactory.createConnection(configuration.getUserName(),
327 configuration.getPassword() == null ? null : String.valueOf(configuration.getPassword()));
328 }
329 return connectionFactory.createConnection();
330
331 }
332
333 private Destination createDestination(final JndiManager jndiManager) throws NamingException {
334 return jndiManager.lookup(configuration.getDestinationName());
335 }
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358 public Message createMessage(final Serializable object) throws JMSException {
359 if (object instanceof String) {
360 return this.session.createTextMessage((String) object);
361 } else if (object instanceof org.apache.logging.log4j.message.MapMessage) {
362 return map((org.apache.logging.log4j.message.MapMessage<?, ?>) object, this.session.createMapMessage());
363 }
364 return this.session.createObjectMessage(object);
365 }
366
367 private void createMessageAndSend(final LogEvent event, final Serializable serializable) throws JMSException {
368 final Message message = createMessage(serializable);
369 message.setJMSTimestamp(event.getTimeMillis());
370 messageProducer.send(message);
371 }
372
373
374
375
376
377
378
379 public MessageConsumer createMessageConsumer() throws JMSException {
380 return this.session.createConsumer(this.destination);
381 }
382
383
384
385
386
387
388
389
390
391
392
393 public MessageProducer createMessageProducer(final Session session, final Destination destination)
394 throws JMSException {
395 return session.createProducer(destination);
396 }
397
398 private Reconnector createReconnector() {
399 final Reconnector recon = new Reconnector(this);
400 recon.setDaemon(true);
401 recon.setPriority(Thread.MIN_PRIORITY);
402 return recon;
403 }
404
405 private Session createSession(final Connection connection) throws JMSException {
406 return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
407 }
408
409 public JmsManagerConfiguration getJmsManagerConfiguration() {
410 return configuration;
411 }
412
413 JndiManager getJndiManager() {
414 return configuration.getJndiManager();
415 }
416
417 <T> T lookup(final String destinationName) throws NamingException {
418 return this.jndiManager.lookup(destinationName);
419 }
420
421 private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
422 final MapMessage jmsMapMessage) {
423
424 log4jMapMessage.forEach(new BiConsumer<String, Object>() {
425 @Override
426 public void accept(final String key, final Object value) {
427 try {
428 jmsMapMessage.setObject(key, value);
429 } catch (final JMSException e) {
430 throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
431 e.getClass(), key, value, e.getLocalizedMessage()), e);
432 }
433 }
434 });
435 return jmsMapMessage;
436 }
437
438 @Override
439 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
440 if (reconnector != null) {
441 reconnector.shutdown();
442 reconnector.interrupt();
443 reconnector = null;
444 }
445 boolean closed = false;
446 closed &= closeJndiManager();
447 closed &= closeMessageProducer();
448 closed &= closeSession();
449 closed &= closeConnection();
450 return closed && this.jndiManager.stop(timeout, timeUnit);
451 }
452
453 void send(final LogEvent event, final Serializable serializable) {
454 if (messageProducer == null) {
455 if (reconnector != null && !configuration.isImmediateFail()) {
456 reconnector.latch();
457 if (messageProducer == null) {
458 throw new AppenderLoggingException(
459 "Error sending to JMS Manager '" + getName() + "': JMS message producer not available");
460 }
461 }
462 }
463 synchronized (this) {
464 try {
465 createMessageAndSend(event, serializable);
466 } catch (final JMSException causeEx) {
467 if (configuration.isRetry() && reconnector == null) {
468 reconnector = createReconnector();
469 try {
470 closeJndiManager();
471 reconnector.reconnect();
472 } catch (NamingException | JMSException reconnEx) {
473 logger().debug("Cannot reestablish JMS connection to {}: {}; starting reconnector thread {}",
474 configuration, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
475 reconnector.start();
476 throw new AppenderLoggingException(
477 String.format("JMS exception sending to %s for %s", getName(), configuration), causeEx);
478 }
479 try {
480 createMessageAndSend(event, serializable);
481 } catch (final JMSException e) {
482 throw new AppenderLoggingException(
483 String.format("Error sending to %s after reestablishing JMS connection for %s",
484 getName(), configuration),
485 causeEx);
486 }
487 }
488 }
489 }
490 }
491
492 }