1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.File;
24 import java.nio.charset.Charset;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import javax.crypto.Cipher;
37 import javax.crypto.SecretKey;
38
39 import org.apache.flume.Event;
40 import org.apache.flume.event.SimpleEvent;
41 import org.apache.logging.log4j.LoggingException;
42 import org.apache.logging.log4j.core.appender.ManagerFactory;
43 import org.apache.logging.log4j.core.config.Property;
44 import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
45 import org.apache.logging.log4j.core.config.plugins.util.PluginType;
46 import org.apache.logging.log4j.core.util.FileUtils;
47 import org.apache.logging.log4j.core.util.SecretKeyProvider;
48 import org.apache.logging.log4j.util.Strings;
49
50 import com.sleepycat.je.Cursor;
51 import com.sleepycat.je.CursorConfig;
52 import com.sleepycat.je.Database;
53 import com.sleepycat.je.DatabaseConfig;
54 import com.sleepycat.je.DatabaseEntry;
55 import com.sleepycat.je.Environment;
56 import com.sleepycat.je.EnvironmentConfig;
57 import com.sleepycat.je.LockConflictException;
58 import com.sleepycat.je.LockMode;
59 import com.sleepycat.je.OperationStatus;
60 import com.sleepycat.je.StatsConfig;
61 import com.sleepycat.je.Transaction;
62
63
64
65
66 public class FlumePersistentManager extends FlumeAvroManager {
67
68
69 public static final String KEY_PROVIDER = "keyProvider";
70
71 private static final Charset UTF8 = Charset.forName("UTF-8");
72
73 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74
75 private static final int SHUTDOWN_WAIT = 60;
76
77 private static final int MILLIS_PER_SECOND = 1000;
78
79 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
80
81 private static BDBManagerFactory factory = new BDBManagerFactory();
82
83 private final Database database;
84
85 private final Environment environment;
86
87 private final WriterThread worker;
88
89 private final Gate gate = new Gate();
90
91 private final SecretKey secretKey;
92
93 private final int delayMillis;
94
95 private final int lockTimeoutRetryCount;
96
97 private final ExecutorService threadPool;
98
99 private final AtomicLong dbCount = new AtomicLong();
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117 final int batchSize, final int retries, final int connectionTimeout,
118 final int requestTimeout, final int delay, final Database database,
119 final Environment environment, final SecretKey secretKey,
120 final int lockTimeoutRetryCount) {
121 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
122 this.delayMillis = delay;
123 this.database = database;
124 this.environment = environment;
125 dbCount.set(database.count());
126 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
127 lockTimeoutRetryCount);
128 this.worker.start();
129 this.secretKey = secretKey;
130 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
131 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public static FlumePersistentManager getManager(final String name, final Agent[] agents,
150 final Property[] properties, int batchSize, final int retries,
151 final int connectionTimeout, final int requestTimeout,
152 final int delayMillis, final int lockTimeoutRetryCount,
153 final String dataDir) {
154 if (agents == null || agents.length == 0) {
155 throw new IllegalArgumentException("At least one agent is required");
156 }
157
158 if (batchSize <= 0) {
159 batchSize = 1;
160 }
161 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
162
163 final StringBuilder sb = new StringBuilder("FlumePersistent[");
164 boolean first = true;
165 for (final Agent agent : agents) {
166 if (!first) {
167 sb.append(',');
168 }
169 sb.append(agent.getHost()).append(':').append(agent.getPort());
170 first = false;
171 }
172 sb.append(']');
173 sb.append(' ').append(dataDirectory);
174 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
175 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
176 }
177
178 @Override
179 public void send(final Event event) {
180 if (worker.isShutdown()) {
181 throw new LoggingException("Unable to record event");
182 }
183
184 final Map<String, String> headers = event.getHeaders();
185 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
186 try {
187 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
188 final DataOutputStream daos = new DataOutputStream(baos);
189 daos.writeInt(event.getBody().length);
190 daos.write(event.getBody(), 0, event.getBody().length);
191 daos.writeInt(event.getHeaders().size());
192 for (final Map.Entry<String, String> entry : headers.entrySet()) {
193 daos.writeUTF(entry.getKey());
194 daos.writeUTF(entry.getValue());
195 }
196 byte[] eventData = baos.toByteArray();
197 if (secretKey != null) {
198 final Cipher cipher = Cipher.getInstance("AES");
199 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
200 eventData = cipher.doFinal(eventData);
201 }
202 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
203 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
204 boolean interrupted = false;
205 int count = 0;
206 do {
207 try {
208 future.get();
209 } catch (final InterruptedException ie) {
210 interrupted = true;
211 ++count;
212 }
213 } while (interrupted && count <= 1);
214
215 } catch (final Exception ex) {
216 throw new LoggingException("Exception occurred writing log event", ex);
217 }
218 }
219
220 @Override
221 protected void releaseSub() {
222 LOGGER.debug("Shutting down FlumePersistentManager");
223 worker.shutdown();
224 try {
225 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
226 } catch (final InterruptedException ie) {
227
228 }
229 threadPool.shutdown();
230 try {
231 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
232 } catch (final InterruptedException ie) {
233 LOGGER.warn("PersistentManager Thread pool failed to shut down");
234 }
235 try {
236 worker.join();
237 } catch (final InterruptedException ex) {
238 LOGGER.debug("Interrupted while waiting for worker to complete");
239 }
240 try {
241 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
242 database.close();
243 } catch (final Exception ex) {
244 LOGGER.warn("Failed to close database", ex);
245 }
246 try {
247 environment.cleanLog();
248 environment.close();
249 } catch (final Exception ex) {
250 LOGGER.warn("Failed to close environment", ex);
251 }
252 super.releaseSub();
253 }
254
255 private void doSend(final SimpleEvent event) {
256 LOGGER.debug("Sending event to Flume");
257 super.send(event);
258 }
259
260
261
262
263 private static class BDBWriter implements Callable<Integer> {
264 private final byte[] eventData;
265 private final byte[] keyData;
266 private final Environment environment;
267 private final Database database;
268 private final Gate gate;
269 private final AtomicLong dbCount;
270 private final long batchSize;
271 private final int lockTimeoutRetryCount;
272
273 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
274 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
275 final int lockTimeoutRetryCount) {
276 this.keyData = keyData;
277 this.eventData = eventData;
278 this.environment = environment;
279 this.database = database;
280 this.gate = gate;
281 this.dbCount = dbCount;
282 this.batchSize = batchSize;
283 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
284 }
285
286 @Override
287 public Integer call() throws Exception {
288 final DatabaseEntry key = new DatabaseEntry(keyData);
289 final DatabaseEntry data = new DatabaseEntry(eventData);
290 Exception exception = null;
291 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
292 Transaction txn = null;
293 try {
294 txn = environment.beginTransaction(null, null);
295 try {
296 database.put(txn, key, data);
297 txn.commit();
298 txn = null;
299 if (dbCount.incrementAndGet() >= batchSize) {
300 gate.open();
301 }
302 exception = null;
303 break;
304 } catch (final LockConflictException lce) {
305 exception = lce;
306
307 } catch (final Exception ex) {
308 if (txn != null) {
309 txn.abort();
310 }
311 throw ex;
312 } finally {
313 if (txn != null) {
314 txn.abort();
315 txn = null;
316 }
317 }
318 } catch (final LockConflictException lce) {
319 exception = lce;
320 if (txn != null) {
321 try {
322 txn.abort();
323 txn = null;
324 } catch (final Exception ex) {
325 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
326 }
327 }
328
329 }
330 try {
331 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
332 } catch (final InterruptedException ie) {
333
334 }
335 }
336 if (exception != null) {
337 throw exception;
338 }
339 return eventData.length;
340 }
341 }
342
343
344
345
346 private static class FactoryData {
347 private final String name;
348 private final Agent[] agents;
349 private final int batchSize;
350 private final String dataDir;
351 private final int retries;
352 private final int connectionTimeout;
353 private final int requestTimeout;
354 private final int delayMillis;
355 private final int lockTimeoutRetryCount;
356 private final Property[] properties;
357
358
359
360
361
362
363
364
365 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
366 final int connectionTimeout, final int requestTimeout, final int delayMillis,
367 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
368 this.name = name;
369 this.agents = agents;
370 this.batchSize = batchSize;
371 this.dataDir = dataDir;
372 this.retries = retries;
373 this.connectionTimeout = connectionTimeout;
374 this.requestTimeout = requestTimeout;
375 this.delayMillis = delayMillis;
376 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
377 this.properties = properties;
378 }
379 }
380
381
382
383
384 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
385
386
387
388
389
390
391
392 @Override
393 public FlumePersistentManager createManager(final String name, final FactoryData data) {
394 SecretKey secretKey = null;
395 Database database = null;
396 Environment environment = null;
397
398 final Map<String, String> properties = new HashMap<String, String>();
399 if (data.properties != null) {
400 for (final Property property : data.properties) {
401 properties.put(property.getName(), property.getValue());
402 }
403 }
404
405 try {
406 final File dir = new File(data.dataDir);
407 FileUtils.mkdir(dir, true);
408 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
409 dbEnvConfig.setTransactional(true);
410 dbEnvConfig.setAllowCreate(true);
411 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
412 environment = new Environment(dir, dbEnvConfig);
413 final DatabaseConfig dbConfig = new DatabaseConfig();
414 dbConfig.setTransactional(true);
415 dbConfig.setAllowCreate(true);
416 database = environment.openDatabase(null, name, dbConfig);
417 } catch (final Exception ex) {
418 LOGGER.error("Could not create FlumePersistentManager", ex);
419
420
421
422 if (database != null) {
423 database.close();
424 database = null;
425 }
426 if (environment != null) {
427 environment.close();
428 environment = null;
429 }
430 return null;
431 }
432
433 try {
434 String key = null;
435 for (final Map.Entry<String, String> entry : properties.entrySet()) {
436 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
437 key = entry.getValue();
438 break;
439 }
440 }
441 if (key != null) {
442 final PluginManager manager = new PluginManager("KeyProvider");
443 manager.collectPlugins();
444 final Map<String, PluginType<?>> plugins = manager.getPlugins();
445 if (plugins != null) {
446 boolean found = false;
447 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
448 if (entry.getKey().equalsIgnoreCase(key)) {
449 found = true;
450 final Class<?> cl = entry.getValue().getPluginClass();
451 try {
452 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
453 secretKey = provider.getSecretKey();
454 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
455 } catch (final Exception ex) {
456 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
457 cl.getName());
458 }
459 break;
460 }
461 }
462 if (!found) {
463 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
464 }
465 } else {
466 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
467 }
468 }
469 } catch (final Exception ex) {
470 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
471 }
472 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
473 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
474 data.lockTimeoutRetryCount);
475 }
476 }
477
478
479
480
481 private static class WriterThread extends Thread {
482 private volatile boolean shutdown = false;
483 private final Database database;
484 private final Environment environment;
485 private final FlumePersistentManager manager;
486 private final Gate gate;
487 private final SecretKey secretKey;
488 private final int batchSize;
489 private final AtomicLong dbCounter;
490 private final int lockTimeoutRetryCount;
491
492 public WriterThread(final Database database, final Environment environment,
493 final FlumePersistentManager manager, final Gate gate, final int batchsize,
494 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
495 this.database = database;
496 this.environment = environment;
497 this.manager = manager;
498 this.gate = gate;
499 this.batchSize = batchsize;
500 this.secretKey = secretKey;
501 this.setDaemon(true);
502 this.dbCounter = dbCount;
503 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
504 }
505
506 public void shutdown() {
507 LOGGER.debug("Writer thread shutting down");
508 this.shutdown = true;
509 gate.open();
510 }
511
512 public boolean isShutdown() {
513 return shutdown;
514 }
515
516 @Override
517 public void run() {
518 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.delayMillis);
519 long nextBatchMillis = System.currentTimeMillis() + manager.delayMillis;
520 while (!shutdown) {
521 final long nowMillis = System.currentTimeMillis();
522 final long dbCount = database.count();
523 dbCounter.set(dbCount);
524 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
525 nextBatchMillis = nowMillis + manager.delayMillis;
526 try {
527 boolean errors = false;
528 final DatabaseEntry key = new DatabaseEntry();
529 final DatabaseEntry data = new DatabaseEntry();
530
531 gate.close();
532 OperationStatus status;
533 if (batchSize > 1) {
534 try {
535 errors = sendBatch(key, data);
536 } catch (final Exception ex) {
537 break;
538 }
539 } else {
540 Exception exception = null;
541 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
542 exception = null;
543 Transaction txn = null;
544 Cursor cursor = null;
545 try {
546 txn = environment.beginTransaction(null, null);
547 cursor = database.openCursor(txn, null);
548 try {
549 status = cursor.getFirst(key, data, LockMode.RMW);
550 while (status == OperationStatus.SUCCESS) {
551 final SimpleEvent event = createEvent(data);
552 if (event != null) {
553 try {
554 manager.doSend(event);
555 } catch (final Exception ioe) {
556 errors = true;
557 LOGGER.error("Error sending event", ioe);
558 break;
559 }
560 try {
561 cursor.delete();
562 } catch (final Exception ex) {
563 LOGGER.error("Unable to delete event", ex);
564 }
565 }
566 status = cursor.getNext(key, data, LockMode.RMW);
567 }
568 if (cursor != null) {
569 cursor.close();
570 cursor = null;
571 }
572 txn.commit();
573 txn = null;
574 dbCounter.decrementAndGet();
575 exception = null;
576 break;
577 } catch (final LockConflictException lce) {
578 exception = lce;
579
580 } catch (final Exception ex) {
581 LOGGER.error("Error reading or writing to database", ex);
582 shutdown = true;
583 break;
584 } finally {
585 if (cursor != null) {
586 cursor.close();
587 cursor = null;
588 }
589 if (txn != null) {
590 txn.abort();
591 txn = null;
592 }
593 }
594 } catch (final LockConflictException lce) {
595 exception = lce;
596 if (cursor != null) {
597 try {
598 cursor.close();
599 cursor = null;
600 } catch (final Exception ex) {
601 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
602 }
603 }
604 if (txn != null) {
605 try {
606 txn.abort();
607 txn = null;
608 } catch (final Exception ex) {
609 LOGGER.trace("Ignored exception aborting tx during lock conflict.");
610 }
611 }
612 }
613 try {
614 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
615 } catch (final InterruptedException ie) {
616
617 }
618 }
619 if (exception != null) {
620 LOGGER.error("Unable to read or update data base", exception);
621 }
622 }
623 if (errors) {
624 Thread.sleep(manager.delayMillis);
625 continue;
626 }
627 } catch (final Exception ex) {
628 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
629 }
630 } else {
631 if (nextBatchMillis <= nowMillis) {
632 nextBatchMillis = nowMillis + manager.delayMillis;
633 }
634 try {
635 final long interval = nextBatchMillis - nowMillis;
636 gate.waitForOpen(interval);
637 } catch (final InterruptedException ie) {
638 LOGGER.warn("WriterThread interrupted, continuing");
639 } catch (final Exception ex) {
640 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
641 break;
642 }
643 }
644 }
645
646 if (batchSize > 1 && database.count() > 0) {
647 final DatabaseEntry key = new DatabaseEntry();
648 final DatabaseEntry data = new DatabaseEntry();
649 try {
650 sendBatch(key, data);
651 } catch (final Exception ex) {
652 LOGGER.warn("Unable to write final batch");
653 }
654 }
655 LOGGER.trace("WriterThread exiting");
656 }
657
658 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
659 boolean errors = false;
660 OperationStatus status;
661 Cursor cursor = null;
662 try {
663 final BatchEvent batch = new BatchEvent();
664 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
665 try {
666 cursor = database.openCursor(null, CursorConfig.DEFAULT);
667 status = cursor.getFirst(key, data, null);
668
669 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
670 final SimpleEvent event = createEvent(data);
671 if (event != null) {
672 batch.addEvent(event);
673 }
674 status = cursor.getNext(key, data, null);
675 }
676 break;
677 } catch (final LockConflictException lce) {
678 if (cursor != null) {
679 try {
680 cursor.close();
681 cursor = null;
682 } catch (final Exception ex) {
683 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
684 }
685 }
686 }
687 }
688
689 try {
690 manager.send(batch);
691 } catch (final Exception ioe) {
692 LOGGER.error("Error sending events", ioe);
693 errors = true;
694 }
695 if (!errors) {
696 if (cursor != null) {
697 cursor.close();
698 cursor = null;
699 }
700 Transaction txn = null;
701 Exception exception = null;
702 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
703 try {
704 txn = environment.beginTransaction(null, null);
705 try {
706 for (final Event event : batch.getEvents()) {
707 try {
708 final Map<String, String> headers = event.getHeaders();
709 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
710 database.delete(txn, key);
711 } catch (final Exception ex) {
712 LOGGER.error("Error deleting key from database", ex);
713 }
714 }
715 txn.commit();
716 long count = dbCounter.get();
717 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
718 count = dbCounter.get();
719 }
720 exception = null;
721 break;
722 } catch (final LockConflictException lce) {
723 exception = lce;
724 if (cursor != null) {
725 try {
726 cursor.close();
727 cursor = null;
728 } catch (final Exception ex) {
729 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
730 }
731 }
732 if (txn != null) {
733 try {
734 txn.abort();
735 txn = null;
736 } catch (final Exception ex) {
737 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
738 }
739 }
740 } catch (final Exception ex) {
741 LOGGER.error("Unable to commit transaction", ex);
742 if (txn != null) {
743 txn.abort();
744 }
745 }
746 } catch (final LockConflictException lce) {
747 exception = lce;
748 if (cursor != null) {
749 try {
750 cursor.close();
751 cursor = null;
752 } catch (final Exception ex) {
753 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
754 }
755 }
756 if (txn != null) {
757 try {
758 txn.abort();
759 txn = null;
760 } catch (final Exception ex) {
761 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
762 }
763 }
764 } finally {
765 if (cursor != null) {
766 cursor.close();
767 cursor = null;
768 }
769 if (txn != null) {
770 txn.abort();
771 txn = null;
772 }
773 }
774 try {
775 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
776 } catch (final InterruptedException ie) {
777
778 }
779 }
780 if (exception != null) {
781 LOGGER.error("Unable to delete events from data base", exception);
782 }
783 }
784 } catch (final Exception ex) {
785 LOGGER.error("Error reading database", ex);
786 shutdown = true;
787 throw ex;
788 } finally {
789 if (cursor != null) {
790 cursor.close();
791 }
792 }
793
794 return errors;
795 }
796
797 private SimpleEvent createEvent(final DatabaseEntry data) {
798 final SimpleEvent event = new SimpleEvent();
799 try {
800 byte[] eventData = data.getData();
801 if (secretKey != null) {
802 final Cipher cipher = Cipher.getInstance("AES");
803 cipher.init(Cipher.DECRYPT_MODE, secretKey);
804 eventData = cipher.doFinal(eventData);
805 }
806 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
807 final DataInputStream dais = new DataInputStream(bais);
808 int length = dais.readInt();
809 final byte[] bytes = new byte[length];
810 dais.read(bytes, 0, length);
811 event.setBody(bytes);
812 length = dais.readInt();
813 final Map<String, String> map = new HashMap<String, String>(length);
814 for (int i = 0; i < length; ++i) {
815 final String headerKey = dais.readUTF();
816 final String value = dais.readUTF();
817 map.put(headerKey, value);
818 }
819 event.setHeaders(map);
820 return event;
821 } catch (final Exception ex) {
822 LOGGER.error("Error retrieving event", ex);
823 return null;
824 }
825 }
826
827 }
828
829
830
831
832 private static class DaemonThreadFactory implements ThreadFactory {
833 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
834 private final ThreadGroup group;
835 private final AtomicInteger threadNumber = new AtomicInteger(1);
836 private final String namePrefix;
837
838 public DaemonThreadFactory() {
839 final SecurityManager securityManager = System.getSecurityManager();
840 group = securityManager != null ? securityManager.getThreadGroup() :
841 Thread.currentThread().getThreadGroup();
842 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
843 }
844
845 @Override
846 public Thread newThread(final Runnable r) {
847 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
848 thread.setDaemon(true);
849 if (thread.getPriority() != Thread.NORM_PRIORITY) {
850 thread.setPriority(Thread.NORM_PRIORITY);
851 }
852 return thread;
853 }
854 }
855
856
857
858
859 private static class Gate {
860
861 private boolean isOpen = false;
862
863 public boolean isOpen() {
864 return isOpen;
865 }
866
867 public synchronized void open() {
868 isOpen = true;
869 notifyAll();
870 }
871
872 public synchronized void close() {
873 isOpen = false;
874 }
875
876 public synchronized void waitForOpen(final long timeout) throws InterruptedException {
877 wait(timeout);
878 }
879 }
880 }