1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.util.HashMap;
20 import java.util.Locale;
21 import java.util.Map;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.flume.Event;
25 import org.apache.flume.EventDeliveryException;
26 import org.apache.flume.agent.embedded.EmbeddedAgent;
27 import org.apache.logging.log4j.LoggingException;
28 import org.apache.logging.log4j.core.appender.ManagerFactory;
29 import org.apache.logging.log4j.core.config.ConfigurationException;
30 import org.apache.logging.log4j.core.config.Property;
31 import org.apache.logging.log4j.core.util.NameUtil;
32 import org.apache.logging.log4j.util.PropertiesUtil;
33 import org.apache.logging.log4j.util.Strings;
34
35
36
37
38 public class FlumeEmbeddedManager extends AbstractFlumeManager {
39
40 private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
41
42 private static final String IN_MEMORY = "InMemory";
43
44 private static FlumeManagerFactory factory = new FlumeManagerFactory();
45
46 private final EmbeddedAgent agent;
47
48 private final String shortName;
49
50
51
52
53
54
55
56
57 protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
58 super(name);
59 this.agent = agent;
60 this.shortName = shortName;
61 }
62
63
64
65
66
67
68
69
70
71
72 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
73 int batchSize, final String dataDir) {
74
75 if (batchSize <= 0) {
76 batchSize = 1;
77 }
78
79 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
80 throw new IllegalArgumentException("Either an Agent or properties are required");
81 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
82 throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
83 }
84
85 final StringBuilder sb = new StringBuilder();
86 boolean first = true;
87
88 if (agents != null && agents.length > 0) {
89 sb.append(name).append('[');
90 for (final Agent agent : agents) {
91 if (!first) {
92 sb.append('_');
93 }
94 sb.append(agent.getHost()).append('-').append(agent.getPort());
95 first = false;
96 }
97 sb.append(']');
98 } else {
99 String sep = Strings.EMPTY;
100 sb.append(name).append('-');
101 final StringBuilder props = new StringBuilder();
102 for (final Property prop : properties) {
103 props.append(sep);
104 props.append(prop.getName()).append('=').append(prop.getValue());
105 sep = "_";
106 }
107 sb.append(NameUtil.md5(props.toString()));
108 }
109 return getManager(sb.toString(), factory,
110 new FactoryData(name, agents, properties, batchSize, dataDir));
111 }
112
113 @Override
114 public void send(final Event event) {
115 try {
116 agent.put(event);
117 } catch (final EventDeliveryException ex) {
118 throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);
119 }
120 }
121
122 @Override
123 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
124 agent.stop();
125 return true;
126 }
127
128
129
130
131 private static class FactoryData {
132 private final Agent[] agents;
133 private final Property[] properties;
134 private final int batchSize;
135 private final String dataDir;
136 private final String name;
137
138
139
140
141
142
143
144
145
146 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
147 final String dataDir) {
148 this.name = name;
149 this.agents = agents;
150 this.batchSize = batchSize;
151 this.properties = properties;
152 this.dataDir = dataDir;
153 }
154 }
155
156
157
158
159 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
160
161
162
163
164
165
166
167 @Override
168 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
169 try {
170 final Map<String, String> props = createProperties(data.name, data.agents, data.properties,
171 data.batchSize, data.dataDir);
172 final EmbeddedAgent agent = new EmbeddedAgent(name);
173 agent.configure(props);
174 agent.start();
175 LOGGER.debug("Created Agent " + name);
176 return new FlumeEmbeddedManager(name, data.name, agent);
177 } catch (final Exception ex) {
178 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
179 }
180 return null;
181 }
182
183 private Map<String, String> createProperties(final String name, final Agent[] agents,
184 final Property[] properties, final int batchSize, String dataDir) {
185 final Map<String, String> props = new HashMap<>();
186
187 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
188 LOGGER.error("No Flume configuration provided");
189 throw new ConfigurationException("No Flume configuration provided");
190 }
191
192 if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
193 LOGGER.error("Agents and Flume configuration cannot both be specified");
194 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
195 }
196
197 if (agents != null && agents.length > 0) {
198
199 if (Strings.isNotEmpty(dataDir)) {
200 if (dataDir.equals(IN_MEMORY)) {
201 props.put("channel.type", "memory");
202 } else {
203 props.put("channel.type", "file");
204
205 if (!dataDir.endsWith(FILE_SEP)) {
206 dataDir = dataDir + FILE_SEP;
207 }
208
209 props.put("channel.checkpointDir", dataDir + "checkpoint");
210 props.put("channel.dataDirs", dataDir + "data");
211 }
212
213 } else {
214 props.put("channel.type", "file");
215 }
216
217 final StringBuilder sb = new StringBuilder();
218 String leading = Strings.EMPTY;
219 final int priority = agents.length;
220 for (int i = 0; i < priority; ++i) {
221 sb.append(leading).append("agent").append(i);
222 leading = " ";
223 final String prefix = "agent" + i;
224 props.put(prefix + ".type", "avro");
225 props.put(prefix + ".hostname", agents[i].getHost());
226 props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
227 props.put(prefix + ".batch-size", Integer.toString(batchSize));
228 props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
229 }
230 props.put("sinks", sb.toString());
231 props.put("processor.type", "failover");
232 } else {
233 String[] sinks = null;
234
235 for (final Property property : properties) {
236 final String key = property.getName();
237
238 if (Strings.isEmpty(key)) {
239 final String msg = "A property name must be provided";
240 LOGGER.error(msg);
241 throw new ConfigurationException(msg);
242 }
243
244 final String upperKey = key.toUpperCase(Locale.ENGLISH);
245
246 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
247 final String msg =
248 "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
249 LOGGER.error(msg);
250 throw new ConfigurationException(msg);
251 }
252
253 final String value = property.getValue();
254 if (Strings.isEmpty(value)) {
255 final String msg = "A value for property " + key + " must be provided";
256 LOGGER.error(msg);
257 throw new ConfigurationException(msg);
258 }
259
260 if (upperKey.equals("SINKS")) {
261 sinks = value.trim().split(" ");
262 }
263
264 props.put(key, value);
265 }
266
267 if (sinks == null || sinks.length == 0) {
268 final String msg = "At least one Sink must be specified";
269 LOGGER.error(msg);
270 throw new ConfigurationException(msg);
271 }
272 }
273 return props;
274 }
275
276 }
277
278 }