View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.Properties;
20  
21  import org.apache.flume.Event;
22  import org.apache.flume.api.RpcClient;
23  import org.apache.flume.api.RpcClientFactory;
24  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
25  import org.apache.logging.log4j.core.appender.ManagerFactory;
26  
27  /**
28   * Manager for FlumeAvroAppenders.
29   */
30  public class FlumeAvroManager extends AbstractFlumeManager {
31  
32      private static final int MAX_RECONNECTS = 3;
33      private static final int MINIMUM_TIMEOUT = 1000;
34  
35      private static AvroManagerFactory factory = new AvroManagerFactory();
36  
37      private final Agent[] agents;
38  
39      private final int batchSize;
40  
41      private final int retries;
42  
43      private final int connectTimeoutMillis;
44  
45      private final int requestTimeoutMillis;
46  
47      private final int current = 0;
48  
49      private RpcClient rpcClient = null;
50  
51      /**
52       * Constructor
53       * @param name The unique name of this manager.
54       * @param agents An array of Agents.
55       * @param batchSize The number of events to include in a batch.
56       * @param retries The number of times to retry connecting before giving up.
57       * @param connectTimeout The connection timeout in ms.
58       * @param requestTimeout The request timeout in ms.
59       *
60       */
61      protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
62                                 final int retries, final int connectTimeout, final int requestTimeout) {
63          super(name);
64          this.agents = agents;
65          this.batchSize = batchSize;
66          this.retries = retries;
67          this.connectTimeoutMillis = connectTimeout;
68          this.requestTimeoutMillis = requestTimeout;
69          this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
70      }
71  
72      /**
73       * Returns a FlumeAvroManager.
74       * @param name The name of the manager.
75       * @param agents The agents to use.
76       * @param batchSize The number of events to include in a batch.
77       * @param retries The number of times to retry connecting before giving up.
78       * @param connectTimeoutMillis The connection timeout in ms.
79       * @param requestTimeoutMillis The request timeout in ms.
80       * @return A FlumeAvroManager.
81       */
82      public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
83                                                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
84          if (agents == null || agents.length == 0) {
85              throw new IllegalArgumentException("At least one agent is required");
86          }
87  
88          if (batchSize <= 0) {
89              batchSize = 1;
90          }
91  
92          final StringBuilder sb = new StringBuilder("FlumeAvro[");
93          boolean first = true;
94          for (final Agent agent : agents) {
95              if (!first) {
96                  sb.append(',');
97              }
98              sb.append(agent.getHost()).append(':').append(agent.getPort());
99              first = false;
100         }
101         sb.append(']');
102         return getManager(sb.toString(), factory,
103                 new FactoryData(name, agents, batchSize, retries, connectTimeoutMillis, requestTimeoutMillis));
104     }
105 
106     /**
107      * Returns the agents.
108      * @return The agent array.
109      */
110     public Agent[] getAgents() {
111         return agents;
112     }
113 
114     /**
115      * Returns the index of the current agent.
116      * @return The index for the current agent.
117      */
118     public int getCurrent() {
119         return current;
120     }
121 
122     public int getRetries() {
123         return retries;
124     }
125 
126     public int getConnectTimeoutMillis() {
127         return connectTimeoutMillis;
128     }
129 
130     public int getRequestTimeoutMillis() {
131         return requestTimeoutMillis;
132     }
133 
134     public int getBatchSize() {
135         return batchSize;
136     }
137 
138     public synchronized void send(final BatchEvent events) {
139         if (rpcClient == null) {
140             rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
141         }
142 
143         if (rpcClient != null) {
144             try {
145                 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
146                 rpcClient.appendBatch(events.getEvents());
147             } catch (final Exception ex) {
148                 rpcClient.close();
149                 rpcClient = null;
150                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
151                     agents[current].getPort();
152                 LOGGER.warn(msg, ex);
153                 throw new AppenderLoggingException("No Flume agents are available");
154             }
155         }  else {
156             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
157                 agents[current].getPort();
158             LOGGER.warn(msg);
159             throw new AppenderLoggingException("No Flume agents are available");
160         }
161     }
162 
163     @Override
164     public synchronized void send(final Event event)  {
165         if (rpcClient == null) {
166             rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
167         }
168 
169         if (rpcClient != null) {
170             try {
171                 rpcClient.append(event);
172             } catch (final Exception ex) {
173                 rpcClient.close();
174                 rpcClient = null;
175                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
176                     agents[current].getPort();
177                 LOGGER.warn(msg, ex);
178                 throw new AppenderLoggingException("No Flume agents are available");
179             }
180         } else {
181             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
182                 agents[current].getPort();
183             LOGGER.warn(msg);
184             throw new AppenderLoggingException("No Flume agents are available");
185         }
186     }
187 
188     /**
189      * There is a very good chance that this will always return the first agent even if it isn't available.
190      * @param agents The list of agents to choose from
191      * @return The FlumeEventAvroServer.
192      */
193     private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
194         try {
195             final Properties props = new Properties();
196 
197             props.put("client.type", "default_failover");
198 
199             int count = 1;
200             final StringBuilder sb = new StringBuilder();
201             for (final Agent agent : agents) {
202                 if (sb.length() > 0) {
203                     sb.append(' ');
204                 }
205                 final String hostName = "host" + count++;
206                 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
207                 sb.append(hostName);
208             }
209             props.put("hosts", sb.toString());
210             if (batchSize > 0) {
211                 props.put("batch-size", Integer.toString(batchSize));
212             }
213             if (retries > 1) {
214                 if (retries > MAX_RECONNECTS) {
215                     retries = MAX_RECONNECTS;
216                 }
217                 props.put("max-attempts", Integer.toString(retries * agents.length));
218             }
219             if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
220                 props.put("request-timeout", Integer.toString(requestTimeoutMillis));
221             }
222             if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
223                 props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
224             }
225             return RpcClientFactory.getInstance(props);
226         } catch (final Exception ex) {
227             LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
228             return null;
229         }
230     }
231 
232     @Override
233     protected void releaseSub() {
234         if (rpcClient != null) {
235             try {
236                 rpcClient.close();
237             } catch (final Exception ex) {
238                 LOGGER.error("Attempt to close RPC client failed", ex);
239             }
240         }
241         rpcClient = null;
242     }
243 
244     /**
245      * Factory data.
246      */
247     private static class FactoryData {
248         private final String name;
249         private final Agent[] agents;
250         private final int batchSize;
251         private final int retries;
252         private final int conntectTimeoutMillis;
253         private final int requestTimeoutMillis;
254 
255         /**
256          * Constructor.
257          * @param name The name of the Appender.
258          * @param agents The agents.
259          * @param batchSize The number of events to include in a batch.
260          */
261         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
262                            final int connectTimeoutMillis, final int requestTimeoutMillis) {
263             this.name = name;
264             this.agents = agents;
265             this.batchSize = batchSize;
266             this.retries = retries;
267             this.conntectTimeoutMillis = connectTimeoutMillis;
268             this.requestTimeoutMillis = requestTimeoutMillis;
269         }
270     }
271 
272     /**
273      * Avro Manager Factory.
274      */
275     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
276 
277         /**
278          * Create the FlumeAvroManager.
279          * @param name The name of the entity to manage.
280          * @param data The data required to create the entity.
281          * @return The FlumeAvroManager.
282          */
283         @Override
284         public FlumeAvroManager createManager(final String name, final FactoryData data) {
285             try {
286 
287                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
288                     data.conntectTimeoutMillis, data.requestTimeoutMillis);
289             } catch (final Exception ex) {
290                 LOGGER.error("Could not create FlumeAvroManager", ex);
291             }
292             return null;
293         }
294     }
295 
296 }