1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TransferQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.logging.log4j.core.AbstractLogEvent;
28 import org.apache.logging.log4j.core.Appender;
29 import org.apache.logging.log4j.core.Core;
30 import org.apache.logging.log4j.core.Filter;
31 import org.apache.logging.log4j.core.LogEvent;
32 import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
33 import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
34 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
35 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
36 import org.apache.logging.log4j.core.async.BlockingQueueFactory;
37 import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
38 import org.apache.logging.log4j.core.async.EventRoute;
39 import org.apache.logging.log4j.core.async.InternalAsyncUtil;
40 import org.apache.logging.log4j.core.config.AppenderControl;
41 import org.apache.logging.log4j.core.config.AppenderRef;
42 import org.apache.logging.log4j.core.config.Configuration;
43 import org.apache.logging.log4j.core.config.ConfigurationException;
44 import org.apache.logging.log4j.core.config.Property;
45 import org.apache.logging.log4j.core.config.plugins.Plugin;
46 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
47 import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
48 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
49 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
50 import org.apache.logging.log4j.core.config.plugins.PluginElement;
51 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
52 import org.apache.logging.log4j.core.filter.AbstractFilterable;
53 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
54 import org.apache.logging.log4j.core.util.Log4jThread;
55 import org.apache.logging.log4j.spi.AbstractLogger;
56
57
58
59
60
61
62 @Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
63 public final class AsyncAppender extends AbstractAppender {
64
65 private static final int DEFAULT_QUEUE_SIZE = 1024;
66 private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
67 private static final long serialVersionUID = -1761035149477086330L;
68 };
69
70 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
71
72 private final BlockingQueue<LogEvent> queue;
73 private final int queueSize;
74 private final boolean blocking;
75 private final long shutdownTimeout;
76 private final Configuration config;
77 private final AppenderRef[] appenderRefs;
78 private final String errorRef;
79 private final boolean includeLocation;
80 private AppenderControl errorAppender;
81 private AsyncThread thread;
82 private AsyncQueueFullPolicy asyncQueueFullPolicy;
83
84 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
85 final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
86 final long shutdownTimeout, final Configuration config, final boolean includeLocation,
87 final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) {
88 super(name, filter, null, ignoreExceptions, properties);
89 this.queue = blockingQueueFactory.create(queueSize);
90 this.queueSize = queueSize;
91 this.blocking = blocking;
92 this.shutdownTimeout = shutdownTimeout;
93 this.config = config;
94 this.appenderRefs = appenderRefs;
95 this.errorRef = errorRef;
96 this.includeLocation = includeLocation;
97 }
98
99 @Override
100 public void start() {
101 final Map<String, Appender> map = config.getAppenders();
102 final List<AppenderControl> appenders = new ArrayList<>();
103 for (final AppenderRef appenderRef : appenderRefs) {
104 final Appender appender = map.get(appenderRef.getRef());
105 if (appender != null) {
106 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
107 } else {
108 LOGGER.error("No appender named {} was configured", appenderRef);
109 }
110 }
111 if (errorRef != null) {
112 final Appender appender = map.get(errorRef);
113 if (appender != null) {
114 errorAppender = new AppenderControl(appender, null, null);
115 } else {
116 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
117 }
118 }
119 if (appenders.size() > 0) {
120 thread = new AsyncThread(appenders, queue);
121 thread.setName("AsyncAppender-" + getName());
122 } else if (errorRef == null) {
123 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
124 }
125 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
126
127 thread.start();
128 super.start();
129 }
130
131 @Override
132 public boolean stop(final long timeout, final TimeUnit timeUnit) {
133 setStopping();
134 super.stop(timeout, timeUnit, false);
135 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
136 thread.shutdown();
137 try {
138 thread.join(shutdownTimeout);
139 } catch (final InterruptedException ex) {
140 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
141 }
142 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
143
144 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
145 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
146 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
147 }
148 setStopped();
149 return true;
150 }
151
152
153
154
155
156
157 @Override
158 public void append(final LogEvent logEvent) {
159 if (!isStarted()) {
160 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
161 }
162 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
163 InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
164 if (!transfer(memento)) {
165 if (blocking) {
166 if (AbstractLogger.getRecursionDepth() > 1) {
167
168 AsyncQueueFullMessageUtil.logWarningToStatusLogger();
169 logMessageInCurrentThread(logEvent);
170 } else {
171
172 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
173 route.logMessage(this, memento);
174 }
175 } else {
176 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
177 logToErrorAppenderIfNecessary(false, memento);
178 }
179 }
180 }
181
182 private boolean transfer(final LogEvent memento) {
183 return queue instanceof TransferQueue
184 ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
185 : queue.offer(memento);
186 }
187
188
189
190
191
192
193 public void logMessageInCurrentThread(final LogEvent logEvent) {
194 logEvent.setEndOfBatch(queue.isEmpty());
195 final boolean appendSuccessful = thread.callAppenders(logEvent);
196 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
197 }
198
199
200
201
202
203
204 public void logMessageInBackgroundThread(final LogEvent logEvent) {
205 try {
206
207 queue.put(logEvent);
208 } catch (final InterruptedException e) {
209 final boolean appendSuccessful = handleInterruptedException(logEvent);
210 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
211 }
212 }
213
214
215
216
217
218
219
220
221
222
223
224
225 private boolean handleInterruptedException(final LogEvent memento) {
226 final boolean appendSuccessful = queue.offer(memento);
227 if (!appendSuccessful) {
228 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
229 getName());
230 }
231
232 Thread.currentThread().interrupt();
233 return appendSuccessful;
234 }
235
236 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
237 if (!appendSuccessful && errorAppender != null) {
238 errorAppender.callAppender(logEvent);
239 }
240 }
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262 @Deprecated
263 public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
264 final boolean blocking, final long shutdownTimeout, final int size,
265 final String name, final boolean includeLocation, final Filter filter,
266 final Configuration config, final boolean ignoreExceptions) {
267 if (name == null) {
268 LOGGER.error("No name provided for AsyncAppender");
269 return null;
270 }
271 if (appenderRefs == null) {
272 LOGGER.error("No appender references provided to AsyncAppender {}", name);
273 }
274
275 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
276 shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null);
277 }
278
279 @PluginBuilderFactory
280 public static Builder newBuilder() {
281 return new Builder();
282 }
283
284 public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B>
285 implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
286
287 @PluginElement("AppenderRef")
288 @Required(message = "No appender references provided to AsyncAppender")
289 private AppenderRef[] appenderRefs;
290
291 @PluginBuilderAttribute
292 @PluginAliases("error-ref")
293 private String errorRef;
294
295 @PluginBuilderAttribute
296 private boolean blocking = true;
297
298 @PluginBuilderAttribute
299 private long shutdownTimeout = 0L;
300
301 @PluginBuilderAttribute
302 private int bufferSize = DEFAULT_QUEUE_SIZE;
303
304 @PluginBuilderAttribute
305 @Required(message = "No name provided for AsyncAppender")
306 private String name;
307
308 @PluginBuilderAttribute
309 private boolean includeLocation = false;
310
311 @PluginConfiguration
312 private Configuration configuration;
313
314 @PluginBuilderAttribute
315 private boolean ignoreExceptions = true;
316
317 @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
318 private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
319
320 public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
321 this.appenderRefs = appenderRefs;
322 return this;
323 }
324
325 public Builder setErrorRef(final String errorRef) {
326 this.errorRef = errorRef;
327 return this;
328 }
329
330 public Builder setBlocking(final boolean blocking) {
331 this.blocking = blocking;
332 return this;
333 }
334
335 public Builder setShutdownTimeout(final long shutdownTimeout) {
336 this.shutdownTimeout = shutdownTimeout;
337 return this;
338 }
339
340 public Builder setBufferSize(final int bufferSize) {
341 this.bufferSize = bufferSize;
342 return this;
343 }
344
345 public Builder setName(final String name) {
346 this.name = name;
347 return this;
348 }
349
350 public Builder setIncludeLocation(final boolean includeLocation) {
351 this.includeLocation = includeLocation;
352 return this;
353 }
354
355 public Builder setConfiguration(final Configuration configuration) {
356 this.configuration = configuration;
357 return this;
358 }
359
360 public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
361 this.ignoreExceptions = ignoreExceptions;
362 return this;
363 }
364
365 public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
366 this.blockingQueueFactory = blockingQueueFactory;
367 return this;
368 }
369
370 @Override
371 public AsyncAppender build() {
372 return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
373 shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray());
374 }
375 }
376
377
378
379
380 private class AsyncThread extends Log4jThread {
381
382 private volatile boolean shutdown = false;
383 private final List<AppenderControl> appenders;
384 private final BlockingQueue<LogEvent> queue;
385
386 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
387 super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
388 this.appenders = appenders;
389 this.queue = queue;
390 setDaemon(true);
391 }
392
393 @Override
394 public void run() {
395 while (!shutdown) {
396 LogEvent event;
397 try {
398 event = queue.take();
399 if (event == SHUTDOWN_LOG_EVENT) {
400 shutdown = true;
401 continue;
402 }
403 } catch (final InterruptedException ex) {
404 break;
405 }
406 event.setEndOfBatch(queue.isEmpty());
407 final boolean success = callAppenders(event);
408 if (!success && errorAppender != null) {
409 try {
410 errorAppender.callAppender(event);
411 } catch (final Exception ex) {
412
413 }
414 }
415 }
416
417 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
418 queue.size());
419 int count = 0;
420 int ignored = 0;
421 while (!queue.isEmpty()) {
422 try {
423 final LogEvent event = queue.take();
424 if (event instanceof Log4jLogEvent) {
425 final Log4jLogEvent logEvent = (Log4jLogEvent) event;
426 logEvent.setEndOfBatch(queue.isEmpty());
427 callAppenders(logEvent);
428 count++;
429 } else {
430 ignored++;
431 LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
432 }
433 } catch (final InterruptedException ex) {
434
435
436 }
437 }
438 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
439 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
440 }
441
442
443
444
445
446
447
448
449
450 boolean callAppenders(final LogEvent event) {
451 boolean success = false;
452 for (final AppenderControl control : appenders) {
453 try {
454 control.callAppender(event);
455 success = true;
456 } catch (final Exception ex) {
457
458 }
459 }
460 return success;
461 }
462
463 public void shutdown() {
464 shutdown = true;
465 if (queue.isEmpty()) {
466 queue.offer(SHUTDOWN_LOG_EVENT);
467 }
468 if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
469 this.interrupt();
470 }
471 }
472 }
473
474
475
476
477
478
479 public String[] getAppenderRefStrings() {
480 final String[] result = new String[appenderRefs.length];
481 for (int i = 0; i < result.length; i++) {
482 result[i] = appenderRefs[i].getRef();
483 }
484 return result;
485 }
486
487
488
489
490
491
492
493 public boolean isIncludeLocation() {
494 return includeLocation;
495 }
496
497
498
499
500
501
502
503 public boolean isBlocking() {
504 return blocking;
505 }
506
507
508
509
510
511
512 public String getErrorRef() {
513 return errorRef;
514 }
515
516 public int getQueueCapacity() {
517 return queueSize;
518 }
519
520 public int getQueueRemainingCapacity() {
521 return queue.remainingCapacity();
522 }
523
524
525
526
527
528
529
530 public int getQueueSize() {
531 return queue.size();
532 }
533 }