View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.Properties;
20  import java.util.concurrent.TimeUnit;
21  
22  import org.apache.flume.Event;
23  import org.apache.flume.api.RpcClient;
24  import org.apache.flume.api.RpcClientFactory;
25  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
26  import org.apache.logging.log4j.core.appender.ManagerFactory;
27  
28  /**
29   * Manager for FlumeAvroAppenders.
30   */
31  public class FlumeAvroManager extends AbstractFlumeManager {
32  
33      private static final int MAX_RECONNECTS = 3;
34      private static final int MINIMUM_TIMEOUT = 1000;
35  
36      private static AvroManagerFactory factory = new AvroManagerFactory();
37  
38      private final Agent[] agents;
39  
40      private final int batchSize;
41  
42      private final long delayNanos;
43      private final int delayMillis;
44  
45      private final int retries;
46  
47      private final int connectTimeoutMillis;
48  
49      private final int requestTimeoutMillis;
50  
51      private final int current = 0;
52  
53      private volatile RpcClient rpcClient = null;
54  
55      private BatchEvent batchEvent = new BatchEvent();
56      private long nextSend = 0;
57  
58      /**
59       * Constructor
60       * @param name The unique name of this manager.
61       * @param agents An array of Agents.
62       * @param batchSize The number of events to include in a batch.
63       * @param retries The number of times to retry connecting before giving up.
64       * @param connectTimeout The connection timeout in ms.
65       * @param requestTimeout The request timeout in ms.
66       *
67       */
68      protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
69                                 final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
70          super(name);
71          this.agents = agents;
72          this.batchSize = batchSize;
73          this.delayMillis = delayMillis;
74          this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
75          this.retries = retries;
76          this.connectTimeoutMillis = connectTimeout;
77          this.requestTimeoutMillis = requestTimeout;
78          this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
79      }
80  
81      /**
82       * Returns a FlumeAvroManager.
83       * @param name The name of the manager.
84       * @param agents The agents to use.
85       * @param batchSize The number of events to include in a batch.
86       * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
87       * @param retries The number of times to retry connecting before giving up.
88       * @param connectTimeoutMillis The connection timeout in ms.
89       * @param requestTimeoutMillis The request timeout in ms.
90       * @return A FlumeAvroManager.
91       */
92      public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
93                                                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
94          if (agents == null || agents.length == 0) {
95              throw new IllegalArgumentException("At least one agent is required");
96          }
97  
98          if (batchSize <= 0) {
99              batchSize = 1;
100         };
101         final StringBuilder sb = new StringBuilder(name);
102         sb.append(" FlumeAvro[");
103         boolean first = true;
104         for (final Agent agent : agents) {
105             if (!first) {
106                 sb.append(',');
107             }
108             sb.append(agent.getHost()).append(':').append(agent.getPort());
109             first = false;
110         }
111         sb.append(']');
112         return getManager(sb.toString(), factory,
113                 new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
114     }
115 
116     /**
117      * Returns the agents.
118      * @return The agent array.
119      */
120     public Agent[] getAgents() {
121         return agents;
122     }
123 
124     /**
125      * Returns the index of the current agent.
126      * @return The index for the current agent.
127      */
128     public int getCurrent() {
129         return current;
130     }
131 
132     public int getRetries() {
133         return retries;
134     }
135 
136     public int getConnectTimeoutMillis() {
137         return connectTimeoutMillis;
138     }
139 
140     public int getRequestTimeoutMillis() {
141         return requestTimeoutMillis;
142     }
143 
144     public int getBatchSize() {
145         return batchSize;
146     }
147 
148     public int getDelayMillis() {
149         return delayMillis;
150     }
151 
152     public void send(final BatchEvent events) {
153         if (rpcClient == null) {
154             synchronized (this) {
155                 if (rpcClient == null) {
156                     rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
157                 }
158             }
159         }
160 
161         if (rpcClient != null) {
162             try {
163                 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
164                 rpcClient.appendBatch(events.getEvents());
165             } catch (final Exception ex) {
166                 rpcClient.close();
167                 rpcClient = null;
168                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
169                     agents[current].getPort();
170                 LOGGER.warn(msg, ex);
171                 throw new AppenderLoggingException("No Flume agents are available");
172             }
173         }  else {
174             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
175                 agents[current].getPort();
176             LOGGER.warn(msg);
177             throw new AppenderLoggingException("No Flume agents are available");
178         }
179     }
180 
181     @Override
182     public void send(final Event event)  {
183         if (batchSize == 1) {
184             if (rpcClient == null) {
185                 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
186             }
187 
188             if (rpcClient != null) {
189                 try {
190                     rpcClient.append(event);
191                 } catch (final Exception ex) {
192                     rpcClient.close();
193                     rpcClient = null;
194                     final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
195                             agents[current].getPort();
196                     LOGGER.warn(msg, ex);
197                     throw new AppenderLoggingException("No Flume agents are available");
198                 }
199             } else {
200                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
201                         agents[current].getPort();
202                 LOGGER.warn(msg);
203                 throw new AppenderLoggingException("No Flume agents are available");
204             }
205         } else {
206             int eventCount;
207             BatchEvent batch = null;
208             synchronized(batchEvent) {
209                 batchEvent.addEvent(event);
210                 eventCount = batchEvent.size();
211                 long now = System.nanoTime();
212                 if (eventCount == 1) {
213                     nextSend = now + delayNanos;
214                 }
215                 if (eventCount >= batchSize || now >= nextSend) {
216                     batch = batchEvent;
217                     batchEvent = new BatchEvent();
218                 }
219             }
220             if (batch != null) {
221                 send(batch);
222             }
223         }
224     }
225 
226     /**
227      * There is a very good chance that this will always return the first agent even if it isn't available.
228      * @param agents The list of agents to choose from
229      * @return The FlumeEventAvroServer.
230      */
231     private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
232         try {
233             final Properties props = new Properties();
234 
235             props.put("client.type", "default_failover");
236 
237             int agentCount = 1;
238             final StringBuilder sb = new StringBuilder();
239             for (final Agent agent : agents) {
240                 if (sb.length() > 0) {
241                     sb.append(' ');
242                 }
243                 final String hostName = "host" + agentCount++;
244                 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
245                 sb.append(hostName);
246             }
247             props.put("hosts", sb.toString());
248             if (batchSize > 0) {
249                 props.put("batch-size", Integer.toString(batchSize));
250             }
251             if (retries > 1) {
252                 if (retries > MAX_RECONNECTS) {
253                     retries = MAX_RECONNECTS;
254                 }
255                 props.put("max-attempts", Integer.toString(retries * agents.length));
256             }
257             if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
258                 props.put("request-timeout", Integer.toString(requestTimeoutMillis));
259             }
260             if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
261                 props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
262             }
263             return RpcClientFactory.getInstance(props);
264         } catch (final Exception ex) {
265             LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
266             return null;
267         }
268     }
269 
270     @Override
271     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
272     	boolean closed = true;
273         if (rpcClient != null) {
274             try {
275                 synchronized(this) {
276                     try {
277                         if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
278                             send(batchEvent);
279                         }
280                     } catch (final Exception ex) {
281                         LOGGER.error("Error sending final batch: {}", ex.getMessage());
282                         closed = false;
283                     }
284                 }
285                 rpcClient.close();
286             } catch (final Exception ex) {
287                 LOGGER.error("Attempt to close RPC client failed", ex);
288                 closed = false;
289             }
290         }
291         rpcClient = null;
292         return closed;
293     }
294 
295     /**
296      * Factory data.
297      */
298     private static class FactoryData {
299         private final String name;
300         private final Agent[] agents;
301         private final int batchSize;
302         private final int delayMillis;
303         private final int retries;
304         private final int conntectTimeoutMillis;
305         private final int requestTimeoutMillis;
306 
307         /**
308          * Constructor.
309          * @param name The name of the Appender.
310          * @param agents The agents.
311          * @param batchSize The number of events to include in a batch.
312          */
313         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
314                 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
315             this.name = name;
316             this.agents = agents;
317             this.batchSize = batchSize;
318             this.delayMillis = delayMillis;
319             this.retries = retries;
320             this.conntectTimeoutMillis = connectTimeoutMillis;
321             this.requestTimeoutMillis = requestTimeoutMillis;
322         }
323     }
324 
325     /**
326      * Avro Manager Factory.
327      */
328     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
329 
330         /**
331          * Create the FlumeAvroManager.
332          * @param name The name of the entity to manage.
333          * @param data The data required to create the entity.
334          * @return The FlumeAvroManager.
335          */
336         @Override
337         public FlumeAvroManager createManager(final String name, final FactoryData data) {
338             try {
339 
340                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
341                         data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
342             } catch (final Exception ex) {
343                 LOGGER.error("Could not create FlumeAvroManager", ex);
344             }
345             return null;
346         }
347     }
348 
349 }