View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.nio.charset.StandardCharsets;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  import javax.crypto.Cipher;
35  import javax.crypto.SecretKey;
36  
37  import com.sleepycat.je.Cursor;
38  import com.sleepycat.je.CursorConfig;
39  import com.sleepycat.je.Database;
40  import com.sleepycat.je.DatabaseConfig;
41  import com.sleepycat.je.DatabaseEntry;
42  import com.sleepycat.je.Environment;
43  import com.sleepycat.je.EnvironmentConfig;
44  import com.sleepycat.je.LockConflictException;
45  import com.sleepycat.je.LockMode;
46  import com.sleepycat.je.OperationStatus;
47  import com.sleepycat.je.StatsConfig;
48  import com.sleepycat.je.Transaction;
49  import org.apache.flume.Event;
50  import org.apache.flume.event.SimpleEvent;
51  import org.apache.logging.log4j.LoggingException;
52  import org.apache.logging.log4j.core.appender.ManagerFactory;
53  import org.apache.logging.log4j.core.config.Property;
54  import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
55  import org.apache.logging.log4j.core.config.plugins.util.PluginType;
56  import org.apache.logging.log4j.core.util.ExecutorServices;
57  import org.apache.logging.log4j.core.util.FileUtils;
58  import org.apache.logging.log4j.core.util.Log4jThread;
59  import org.apache.logging.log4j.core.util.Log4jThreadFactory;
60  import org.apache.logging.log4j.core.util.SecretKeyProvider;
61  import org.apache.logging.log4j.util.Strings;
62  
63  /**
64   * Manager that persists data to Berkeley DB before passing it on to Flume.
65   */
66  public class FlumePersistentManager extends FlumeAvroManager {
67  
68      /** Attribute name for the key provider. */
69      public static final String KEY_PROVIDER = "keyProvider";
70  
71      private static final Charset UTF8 = StandardCharsets.UTF_8;
72  
73      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74  
75      private static final long SHUTDOWN_WAIT_MILLIS = 60000;
76  
77      private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
78  
79      private static BDBManagerFactory factory = new BDBManagerFactory();
80  
81      private final Database database;
82  
83      private final Environment environment;
84  
85      private final WriterThread worker;
86  
87      private final Gate gate = new Gate();
88  
89      private final SecretKey secretKey;
90  
91      private final int lockTimeoutRetryCount;
92  
93      private final ExecutorService threadPool;
94  
95      private final AtomicLong dbCount = new AtomicLong();
96  
97      /**
98       * Constructor
99       * @param name The unique name of this manager.
100      * @param shortName Original name for the Manager.
101      * @param agents An array of Agents.
102      * @param batchSize The number of events to include in a batch.
103      * @param retries The number of times to retry connecting before giving up.
104      * @param connectionTimeout The amount of time to wait for a connection to be established.
105      * @param requestTimeout The amount of time to wair for a response to a request.
106      * @param delay The amount of time to wait between retries.
107      * @param database The database to write to.
108      * @param environment The database environment.
109      * @param secretKey The SecretKey to use for encryption.
110      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
111      */
112     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
113                                      final int batchSize, final int retries, final int connectionTimeout,
114                                      final int requestTimeout, final int delay, final Database database,
115                                      final Environment environment, final SecretKey secretKey,
116                                      final int lockTimeoutRetryCount) {
117         super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
118         this.database = database;
119         this.environment = environment;
120         dbCount.set(database.count());
121         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
122             lockTimeoutRetryCount);
123         this.worker.start();
124         this.secretKey = secretKey;
125         this.threadPool = Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
126         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
127     }
128 
129 
130     /**
131      * Returns a FlumeAvroManager.
132      * @param name The name of the manager.
133      * @param agents The agents to use.
134      * @param properties Properties to pass to the Manager.
135      * @param batchSize The number of events to include in a batch.
136      * @param retries The number of times to retry connecting before giving up.
137      * @param connectionTimeout The amount of time to wait to establish a connection.
138      * @param requestTimeout The amount of time to wait for a response to a request.
139      * @param delayMillis Amount of time to delay before delivering a batch.
140      * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
141      * @param dataDir The location of the Berkeley database.
142      * @return A FlumeAvroManager.
143      */
144     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
145                                                     final Property[] properties, int batchSize, final int retries,
146                                                     final int connectionTimeout, final int requestTimeout,
147                                                     final int delayMillis, final int lockTimeoutRetryCount,
148                                                     final String dataDir) {
149         if (agents == null || agents.length == 0) {
150             throw new IllegalArgumentException("At least one agent is required");
151         }
152 
153         if (batchSize <= 0) {
154             batchSize = 1;
155         }
156         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
157 
158         final StringBuilder sb = new StringBuilder("FlumePersistent[");
159         boolean first = true;
160         for (final Agent agent : agents) {
161             if (!first) {
162                 sb.append(',');
163             }
164             sb.append(agent.getHost()).append(':').append(agent.getPort());
165             first = false;
166         }
167         sb.append(']');
168         sb.append(' ').append(dataDirectory);
169         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
170             connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
171     }
172 
173     @Override
174     public void send(final Event event)  {
175         if (worker.isShutdown()) {
176             throw new LoggingException("Unable to record event");
177         }
178 
179         final Map<String, String> headers = event.getHeaders();
180         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
181         try {
182             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
183             final DataOutputStream daos = new DataOutputStream(baos);
184             daos.writeInt(event.getBody().length);
185             daos.write(event.getBody(), 0, event.getBody().length);
186             daos.writeInt(event.getHeaders().size());
187             for (final Map.Entry<String, String> entry : headers.entrySet()) {
188                 daos.writeUTF(entry.getKey());
189                 daos.writeUTF(entry.getValue());
190             }
191             byte[] eventData = baos.toByteArray();
192             if (secretKey != null) {
193                 final Cipher cipher = Cipher.getInstance("AES");
194                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
195                 eventData = cipher.doFinal(eventData);
196             }
197             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
198                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
199             try {
200             	future.get();
201             } catch (final InterruptedException ie) {
202             	// preserve interruption status
203             	Thread.currentThread().interrupt();
204             }
205         } catch (final Exception ex) {
206             throw new LoggingException("Exception occurred writing log event", ex);
207         }
208     }
209 
210     @Override
211     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
212     	boolean closed = true;
213         LOGGER.debug("Shutting down FlumePersistentManager");
214         worker.shutdown();
215         final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
216         final long shutdownWaitMillis = requestedTimeoutMillis > 0 ? requestedTimeoutMillis : SHUTDOWN_WAIT_MILLIS;
217 		try {
218             worker.join(shutdownWaitMillis);
219         } catch (final InterruptedException ie) {
220             // Ignore the exception and shutdown.
221         }
222         ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString());
223         try {
224             worker.join();
225         } catch (final InterruptedException ex) {
226             logDebug("interrupted while waiting for worker to complete", ex);
227         }
228         try {
229             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
230             database.close();
231         } catch (final Exception ex) {
232             logWarn("Failed to close database", ex);
233             closed = false;
234         }
235         try {
236             environment.cleanLog();
237             environment.close();
238         } catch (final Exception ex) {
239             logWarn("Failed to close environment", ex);
240             closed = false;
241         }
242         return closed && super.releaseSub(timeout, timeUnit);
243     }
244 
245     private void doSend(final SimpleEvent event) {
246         LOGGER.debug("Sending event to Flume");
247         super.send(event);
248     }
249 
250     /**
251      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
252      */
253     private static class BDBWriter implements Callable<Integer> {
254         private final byte[] eventData;
255         private final byte[] keyData;
256         private final Environment environment;
257         private final Database database;
258         private final Gate gate;
259         private final AtomicLong dbCount;
260         private final long batchSize;
261         private final int lockTimeoutRetryCount;
262 
263         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
264                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
265                          final int lockTimeoutRetryCount) {
266             this.keyData = keyData;
267             this.eventData = eventData;
268             this.environment = environment;
269             this.database = database;
270             this.gate = gate;
271             this.dbCount = dbCount;
272             this.batchSize = batchSize;
273             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
274         }
275 
276         @Override
277         public Integer call() throws Exception {
278             final DatabaseEntry key = new DatabaseEntry(keyData);
279             final DatabaseEntry data = new DatabaseEntry(eventData);
280             Exception exception = null;
281             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
282                 Transaction txn = null;
283                 try {
284                     txn = environment.beginTransaction(null, null);
285                     try {
286                         database.put(txn, key, data);
287                         txn.commit();
288                         txn = null;
289                         if (dbCount.incrementAndGet() >= batchSize) {
290                             gate.open();
291                         }
292                         exception = null;
293                         break;
294                     } catch (final LockConflictException lce) {
295                         exception = lce;
296                         // Fall through and retry.
297                     } catch (final Exception ex) {
298                         if (txn != null) {
299                             txn.abort();
300                         }
301                         throw ex;
302                     } finally {
303                         if (txn != null) {
304                             txn.abort();
305                             txn = null;
306                         }
307                     }
308                 } catch (final LockConflictException lce) {
309                     exception = lce;
310                     if (txn != null) {
311                         try {
312                             txn.abort();
313                             txn = null;
314                         } catch (final Exception ex) {
315                             LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
316                         }
317                     }
318 
319                 }
320                 try {
321                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
322                 } catch (final InterruptedException ie) {
323                     // Ignore the error
324                 }
325             }
326             if (exception != null) {
327                 throw exception;
328             }
329             return eventData.length;
330         }
331     }
332 
333     /**
334      * Factory data.
335      */
336     private static class FactoryData {
337         private final String name;
338         private final Agent[] agents;
339         private final int batchSize;
340         private final String dataDir;
341         private final int retries;
342         private final int connectionTimeout;
343         private final int requestTimeout;
344         private final int delayMillis;
345         private final int lockTimeoutRetryCount;
346         private final Property[] properties;
347 
348         /**
349          * Constructor.
350          * @param name The name of the Appender.
351          * @param agents The agents.
352          * @param batchSize The number of events to include in a batch.
353          * @param dataDir The directory for data.
354          */
355         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
356                            final int connectionTimeout, final int requestTimeout, final int delayMillis,
357                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
358             this.name = name;
359             this.agents = agents;
360             this.batchSize = batchSize;
361             this.dataDir = dataDir;
362             this.retries = retries;
363             this.connectionTimeout = connectionTimeout;
364             this.requestTimeout = requestTimeout;
365             this.delayMillis = delayMillis;
366             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
367             this.properties = properties;
368         }
369     }
370 
371     /**
372      * Avro Manager Factory.
373      */
374     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
375 
376         /**
377          * Create the FlumeKratiManager.
378          * @param name The name of the entity to manage.
379          * @param data The data required to create the entity.
380          * @return The FlumeKratiManager.
381          */
382         @Override
383         public FlumePersistentManager createManager(final String name, final FactoryData data) {
384             SecretKey secretKey = null;
385             Database database = null;
386             Environment environment = null;
387 
388             final Map<String, String> properties = new HashMap<>();
389             if (data.properties != null) {
390                 for (final Property property : data.properties) {
391                     properties.put(property.getName(), property.getValue());
392                 }
393             }
394 
395             try {
396                 final File dir = new File(data.dataDir);
397                 FileUtils.mkdir(dir, true);
398                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
399                 dbEnvConfig.setTransactional(true);
400                 dbEnvConfig.setAllowCreate(true);
401                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
402                 environment = new Environment(dir, dbEnvConfig);
403                 final DatabaseConfig dbConfig = new DatabaseConfig();
404                 dbConfig.setTransactional(true);
405                 dbConfig.setAllowCreate(true);
406                 database = environment.openDatabase(null, name, dbConfig);
407             } catch (final Exception ex) {
408                 LOGGER.error("Could not create FlumePersistentManager", ex);
409                 // For consistency, close database as well as environment even though it should never happen since the
410                 // database is that last thing in the block above, but this does guard against a future line being
411                 // inserted at the end that would bomb (like some debug logging).
412                 if (database != null) {
413                     database.close();
414                     database = null;
415                 }
416                 if (environment != null) {
417                     environment.close();
418                     environment = null;
419                 }
420                 return null;
421             }
422 
423             try {
424                 String key = null;
425                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
426                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
427                         key = entry.getValue();
428                         break;
429                     }
430                 }
431                 if (key != null) {
432                     final PluginManager manager = new PluginManager("KeyProvider");
433                     manager.collectPlugins();
434                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
435                     if (plugins != null) {
436                         boolean found = false;
437                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
438                             if (entry.getKey().equalsIgnoreCase(key)) {
439                                 found = true;
440                                 final Class<?> cl = entry.getValue().getPluginClass();
441                                 try {
442                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
443                                     secretKey = provider.getSecretKey();
444                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
445                                 } catch (final Exception ex) {
446                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
447                                         cl.getName());
448                                 }
449                                 break;
450                             }
451                         }
452                         if (!found) {
453                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
454                         }
455                     } else {
456                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
457                     }
458                 }
459             } catch (final Exception ex) {
460                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
461             }
462             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
463                 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
464                 data.lockTimeoutRetryCount);
465         }
466     }
467 
468     /**
469      * Thread that sends data to Flume and pulls it from Berkeley DB.
470      */
471     private static class WriterThread extends Log4jThread  {
472         private volatile boolean shutdown = false;
473         private final Database database;
474         private final Environment environment;
475         private final FlumePersistentManager manager;
476         private final Gate gate;
477         private final SecretKey secretKey;
478         private final int batchSize;
479         private final AtomicLong dbCounter;
480         private final int lockTimeoutRetryCount;
481 
482         public WriterThread(final Database database, final Environment environment,
483                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
484                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
485             super("FlumePersistentManager-Writer");
486             this.database = database;
487             this.environment = environment;
488             this.manager = manager;
489             this.gate = gate;
490             this.batchSize = batchsize;
491             this.secretKey = secretKey;
492             this.setDaemon(true);
493             this.dbCounter = dbCount;
494             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
495         }
496 
497         public void shutdown() {
498             LOGGER.debug("Writer thread shutting down");
499             this.shutdown = true;
500             gate.open();
501         }
502 
503         public boolean isShutdown() {
504             return shutdown;
505         }
506 
507         @Override
508         public void run() {
509             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
510             long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
511             while (!shutdown) {
512                 final long nowMillis = System.currentTimeMillis();
513                 final long dbCount = database.count();
514                 dbCounter.set(dbCount);
515                 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
516                     nextBatchMillis = nowMillis + manager.getDelayMillis();
517                     try {
518                         boolean errors = false;
519                         final DatabaseEntry key = new DatabaseEntry();
520                         final DatabaseEntry data = new DatabaseEntry();
521 
522                         gate.close();
523                         OperationStatus status;
524                         if (batchSize > 1) {
525                             try {
526                                 errors = sendBatch(key, data);
527                             } catch (final Exception ex) {
528                                 break;
529                             }
530                         } else {
531                             Exception exception = null;
532                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
533                                 exception = null;
534                                 Transaction txn = null;
535                                 Cursor cursor = null;
536                                 try {
537                                     txn = environment.beginTransaction(null, null);
538                                     cursor = database.openCursor(txn, null);
539                                     try {
540                                         status = cursor.getFirst(key, data, LockMode.RMW);
541                                         while (status == OperationStatus.SUCCESS) {
542                                             final SimpleEvent event = createEvent(data);
543                                             if (event != null) {
544                                                 try {
545                                                     manager.doSend(event);
546                                                 } catch (final Exception ioe) {
547                                                     errors = true;
548                                                     LOGGER.error("Error sending event", ioe);
549                                                     break;
550                                                 }
551                                                 try {
552                                                     cursor.delete();
553                                                 } catch (final Exception ex) {
554                                                     LOGGER.error("Unable to delete event", ex);
555                                                 }
556                                             }
557                                             status = cursor.getNext(key, data, LockMode.RMW);
558                                         }
559                                         if (cursor != null) {
560                                             cursor.close();
561                                             cursor = null;
562                                         }
563                                         txn.commit();
564                                         txn = null;
565                                         dbCounter.decrementAndGet();
566                                         exception = null;
567                                         break;
568                                     } catch (final LockConflictException lce) {
569                                         exception = lce;
570                                         // Fall through and retry.
571                                     } catch (final Exception ex) {
572                                         LOGGER.error("Error reading or writing to database", ex);
573                                         shutdown = true;
574                                         break;
575                                     } finally {
576                                         if (cursor != null) {
577                                             cursor.close();
578                                             cursor = null;
579                                         }
580                                         if (txn != null) {
581                                             txn.abort();
582                                             txn = null;
583                                         }
584                                     }
585                                 } catch (final LockConflictException lce) {
586                                     exception = lce;
587                                     if (cursor != null) {
588                                         try {
589                                             cursor.close();
590                                             cursor = null;
591                                         } catch (final Exception ex) {
592                                             LOGGER.trace("Ignored exception closing cursor during lock conflict.");
593                                         }
594                                     }
595                                     if (txn != null) {
596                                         try {
597                                             txn.abort();
598                                             txn = null;
599                                         } catch (final Exception ex) {
600                                             LOGGER.trace("Ignored exception aborting tx during lock conflict.");
601                                         }
602                                     }
603                                 }
604                                 try {
605                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
606                                 } catch (final InterruptedException ie) {
607                                     // Ignore the error
608                                 }
609                             }
610                             if (exception != null) {
611                                 LOGGER.error("Unable to read or update data base", exception);
612                             }
613                         }
614                         if (errors) {
615                             Thread.sleep(manager.getDelayMillis());
616                             continue;
617                         }
618                     } catch (final Exception ex) {
619                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
620                     }
621                 } else {
622                     if (nextBatchMillis <= nowMillis) {
623                         nextBatchMillis = nowMillis + manager.getDelayMillis();
624                     }
625                     try {
626                         final long interval = nextBatchMillis - nowMillis;
627                         gate.waitForOpen(interval);
628                     } catch (final InterruptedException ie) {
629                         LOGGER.warn("WriterThread interrupted, continuing");
630                     } catch (final Exception ex) {
631                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
632                         break;
633                     }
634                 }
635             }
636 
637             if (batchSize > 1 && database.count() > 0) {
638                 final DatabaseEntry key = new DatabaseEntry();
639                 final DatabaseEntry data = new DatabaseEntry();
640                 try {
641                     sendBatch(key, data);
642                 } catch (final Exception ex) {
643                     LOGGER.warn("Unable to write final batch");
644                 }
645             }
646             LOGGER.trace("WriterThread exiting");
647         }
648 
649         private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
650             boolean errors = false;
651             OperationStatus status;
652             Cursor cursor = null;
653             try {
654             	final BatchEvent batch = new BatchEvent();
655             	for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
656             		try {
657             			cursor = database.openCursor(null, CursorConfig.DEFAULT);
658             			status = cursor.getFirst(key, data, null);
659 
660             			for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
661             				final SimpleEvent event = createEvent(data);
662             				if (event != null) {
663             					batch.addEvent(event);
664             				}
665             				status = cursor.getNext(key, data, null);
666             			}
667             			break;
668             		} catch (final LockConflictException lce) {
669             			if (cursor != null) {
670             				try {
671                                 cursor.close();
672                                 cursor = null;
673                             } catch (final Exception ex) {
674                                 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
675                             }
676                         }
677                     }
678             	}
679 
680                 try {
681                     manager.send(batch);
682                 } catch (final Exception ioe) {
683                     LOGGER.error("Error sending events", ioe);
684                     errors = true;
685                 }
686                 if (!errors) {
687                 	if (cursor != null) {
688 	                    cursor.close();
689 	                    cursor = null;
690                 	}
691                     Transaction txn = null;
692                     Exception exception = null;
693                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
694                         try {
695                             txn = environment.beginTransaction(null, null);
696                             try {
697                                 for (final Event event : batch.getEvents()) {
698                                     try {
699                                         final Map<String, String> headers = event.getHeaders();
700                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
701                                         database.delete(txn, key);
702                                     } catch (final Exception ex) {
703                                         LOGGER.error("Error deleting key from database", ex);
704                                     }
705                                 }
706                                 txn.commit();
707                                 long count = dbCounter.get();
708                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
709                                     count = dbCounter.get();
710                                 }
711                                 exception = null;
712                                 break;
713                             } catch (final LockConflictException lce) {
714                                 exception = lce;
715                                 if (cursor != null) {
716                                     try {
717                                         cursor.close();
718                                         cursor = null;
719                                     } catch (final Exception ex) {
720                                         LOGGER.trace("Ignored exception closing cursor during lock conflict.");
721                                     }
722                                 }
723                                 if (txn != null) {
724                                     try {
725                                         txn.abort();
726                                         txn = null;
727                                     } catch (final Exception ex) {
728                                         LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
729                                     }
730                                 }
731                             } catch (final Exception ex) {
732                                 LOGGER.error("Unable to commit transaction", ex);
733                                 if (txn != null) {
734                                     txn.abort();
735                                 }
736                             }
737                         } catch (final LockConflictException lce) {
738                             exception = lce;
739                             if (cursor != null) {
740                                 try {
741                                     cursor.close();
742                                     cursor = null;
743                                 } catch (final Exception ex) {
744                                     LOGGER.trace("Ignored exception closing cursor during lock conflict.");
745                                 }
746                             }
747                             if (txn != null) {
748                                 try {
749                                     txn.abort();
750                                     txn = null;
751                                 } catch (final Exception ex) {
752                                     LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
753                                 }
754                             }
755                         } finally {
756                             if (cursor != null) {
757                                 cursor.close();
758                                 cursor = null;
759                             }
760                             if (txn != null) {
761                                 txn.abort();
762                                 txn = null;
763                             }
764                         }
765                         try {
766                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
767                         } catch (final InterruptedException ie) {
768                             // Ignore the error
769                         }
770                     }
771                     if (exception != null) {
772                         LOGGER.error("Unable to delete events from data base", exception);
773                     }
774                 }
775             } catch (final Exception ex) {
776                 LOGGER.error("Error reading database", ex);
777                 shutdown = true;
778                 throw ex;
779             } finally {
780                 if (cursor != null) {
781                     cursor.close();
782                 }
783             }
784 
785             return errors;
786         }
787 
788         private SimpleEvent createEvent(final DatabaseEntry data) {
789             final SimpleEvent event = new SimpleEvent();
790             try {
791                 byte[] eventData = data.getData();
792                 if (secretKey != null) {
793                     final Cipher cipher = Cipher.getInstance("AES");
794                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
795                     eventData = cipher.doFinal(eventData);
796                 }
797                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
798                 final DataInputStream dais = new DataInputStream(bais);
799                 int length = dais.readInt();
800                 final byte[] bytes = new byte[length];
801                 dais.read(bytes, 0, length);
802                 event.setBody(bytes);
803                 length = dais.readInt();
804                 final Map<String, String> map = new HashMap<>(length);
805                 for (int i = 0; i < length; ++i) {
806                     final String headerKey = dais.readUTF();
807                     final String value = dais.readUTF();
808                     map.put(headerKey, value);
809                 }
810                 event.setHeaders(map);
811                 return event;
812             } catch (final Exception ex) {
813                 LOGGER.error("Error retrieving event", ex);
814                 return null;
815             }
816         }
817 
818     }
819 
820     /**
821      * An internal class.
822      */
823     private static class Gate {
824 
825         private boolean isOpen = false;
826 
827         public boolean isOpen() {
828             return isOpen;
829         }
830 
831         public synchronized void open() {
832             isOpen = true;
833             notifyAll();
834         }
835 
836         public synchronized void close() {
837             isOpen = false;
838         }
839 
840         public synchronized void waitForOpen(final long timeout) throws InterruptedException {
841             wait(timeout);
842         }
843     }
844 }