1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.util.Properties;
20 import java.util.concurrent.TimeUnit;
21
22 import org.apache.flume.Event;
23 import org.apache.flume.api.RpcClient;
24 import org.apache.flume.api.RpcClientFactory;
25 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
26 import org.apache.logging.log4j.core.appender.ManagerFactory;
27
28
29
30
31 public class FlumeAvroManager extends AbstractFlumeManager {
32
33 private static final int MAX_RECONNECTS = 3;
34 private static final int MINIMUM_TIMEOUT = 1000;
35
36 private static AvroManagerFactory factory = new AvroManagerFactory();
37
38 private final Agent[] agents;
39
40 private final int batchSize;
41
42 private final long delayNanos;
43 private final int delayMillis;
44
45 private final int retries;
46
47 private final int connectTimeoutMillis;
48
49 private final int requestTimeoutMillis;
50
51 private final int current = 0;
52
53 private volatile RpcClient rpcClient = null;
54
55 private BatchEvent batchEvent = new BatchEvent();
56 private long nextSend = 0;
57
58
59
60
61
62
63
64
65
66
67
68 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
69 final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
70 super(name);
71 this.agents = agents;
72 this.batchSize = batchSize;
73 this.delayMillis = delayMillis;
74 this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
75 this.retries = retries;
76 this.connectTimeoutMillis = connectTimeout;
77 this.requestTimeoutMillis = requestTimeout;
78 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
93 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
94 if (agents == null || agents.length == 0) {
95 throw new IllegalArgumentException("At least one agent is required");
96 }
97
98 if (batchSize <= 0) {
99 batchSize = 1;
100 };
101 final StringBuilder sb = new StringBuilder(name);
102 sb.append(" FlumeAvro[");
103 boolean first = true;
104 for (final Agent agent : agents) {
105 if (!first) {
106 sb.append(',');
107 }
108 sb.append(agent.getHost()).append(':').append(agent.getPort());
109 first = false;
110 }
111 sb.append(']');
112 return getManager(sb.toString(), factory,
113 new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
114 }
115
116
117
118
119
120 public Agent[] getAgents() {
121 return agents;
122 }
123
124
125
126
127
128 public int getCurrent() {
129 return current;
130 }
131
132 public int getRetries() {
133 return retries;
134 }
135
136 public int getConnectTimeoutMillis() {
137 return connectTimeoutMillis;
138 }
139
140 public int getRequestTimeoutMillis() {
141 return requestTimeoutMillis;
142 }
143
144 public int getBatchSize() {
145 return batchSize;
146 }
147
148 public int getDelayMillis() {
149 return delayMillis;
150 }
151
152 public void send(final BatchEvent events) {
153 if (rpcClient == null) {
154 synchronized (this) {
155 if (rpcClient == null) {
156 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
157 }
158 }
159 }
160
161 if (rpcClient != null) {
162 try {
163 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
164 rpcClient.appendBatch(events.getEvents());
165 } catch (final Exception ex) {
166 rpcClient.close();
167 rpcClient = null;
168 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
169 agents[current].getPort();
170 LOGGER.warn(msg, ex);
171 throw new AppenderLoggingException("No Flume agents are available");
172 }
173 } else {
174 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
175 agents[current].getPort();
176 LOGGER.warn(msg);
177 throw new AppenderLoggingException("No Flume agents are available");
178 }
179 }
180
181 @Override
182 public void send(final Event event) {
183 if (batchSize == 1) {
184 if (rpcClient == null) {
185 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
186 }
187
188 if (rpcClient != null) {
189 try {
190 rpcClient.append(event);
191 } catch (final Exception ex) {
192 rpcClient.close();
193 rpcClient = null;
194 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
195 agents[current].getPort();
196 LOGGER.warn(msg, ex);
197 throw new AppenderLoggingException("No Flume agents are available");
198 }
199 } else {
200 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
201 agents[current].getPort();
202 LOGGER.warn(msg);
203 throw new AppenderLoggingException("No Flume agents are available");
204 }
205 } else {
206 int eventCount;
207 BatchEvent batch = null;
208 synchronized(batchEvent) {
209 batchEvent.addEvent(event);
210 eventCount = batchEvent.size();
211 long now = System.nanoTime();
212 if (eventCount == 1) {
213 nextSend = now + delayNanos;
214 }
215 if (eventCount >= batchSize || now >= nextSend) {
216 batch = batchEvent;
217 batchEvent = new BatchEvent();
218 }
219 }
220 if (batch != null) {
221 send(batch);
222 }
223 }
224 }
225
226
227
228
229
230
231 private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
232 try {
233 final Properties props = new Properties();
234
235 props.put("client.type", "default_failover");
236
237 int agentCount = 1;
238 final StringBuilder sb = new StringBuilder();
239 for (final Agent agent : agents) {
240 if (sb.length() > 0) {
241 sb.append(' ');
242 }
243 final String hostName = "host" + agentCount++;
244 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
245 sb.append(hostName);
246 }
247 props.put("hosts", sb.toString());
248 if (batchSize > 0) {
249 props.put("batch-size", Integer.toString(batchSize));
250 }
251 if (retries > 1) {
252 if (retries > MAX_RECONNECTS) {
253 retries = MAX_RECONNECTS;
254 }
255 props.put("max-attempts", Integer.toString(retries * agents.length));
256 }
257 if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
258 props.put("request-timeout", Integer.toString(requestTimeoutMillis));
259 }
260 if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
261 props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
262 }
263 return RpcClientFactory.getInstance(props);
264 } catch (final Exception ex) {
265 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
266 return null;
267 }
268 }
269
270 @Override
271 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
272 boolean closed = true;
273 if (rpcClient != null) {
274 try {
275 synchronized(this) {
276 try {
277 if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
278 send(batchEvent);
279 }
280 } catch (final Exception ex) {
281 LOGGER.error("Error sending final batch: {}", ex.getMessage());
282 closed = false;
283 }
284 }
285 rpcClient.close();
286 } catch (final Exception ex) {
287 LOGGER.error("Attempt to close RPC client failed", ex);
288 closed = false;
289 }
290 }
291 rpcClient = null;
292 return closed;
293 }
294
295
296
297
298 private static class FactoryData {
299 private final String name;
300 private final Agent[] agents;
301 private final int batchSize;
302 private final int delayMillis;
303 private final int retries;
304 private final int conntectTimeoutMillis;
305 private final int requestTimeoutMillis;
306
307
308
309
310
311
312
313 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
314 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
315 this.name = name;
316 this.agents = agents;
317 this.batchSize = batchSize;
318 this.delayMillis = delayMillis;
319 this.retries = retries;
320 this.conntectTimeoutMillis = connectTimeoutMillis;
321 this.requestTimeoutMillis = requestTimeoutMillis;
322 }
323 }
324
325
326
327
328 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
329
330
331
332
333
334
335
336 @Override
337 public FlumeAvroManager createManager(final String name, final FactoryData data) {
338 try {
339
340 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
341 data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
342 } catch (final Exception ex) {
343 LOGGER.error("Could not create FlumeAvroManager", ex);
344 }
345 return null;
346 }
347 }
348
349 }