1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.zip.GZIPOutputStream;
26
27 import org.apache.flume.event.SimpleEvent;
28 import org.apache.logging.log4j.Level;
29 import org.apache.logging.log4j.LoggingException;
30 import org.apache.logging.log4j.Marker;
31 import org.apache.logging.log4j.ThreadContext;
32 import org.apache.logging.log4j.core.LogEvent;
33 import org.apache.logging.log4j.core.impl.ThrowableProxy;
34 import org.apache.logging.log4j.core.util.Patterns;
35 import org.apache.logging.log4j.core.util.UuidUtil;
36 import org.apache.logging.log4j.message.MapMessage;
37 import org.apache.logging.log4j.message.Message;
38 import org.apache.logging.log4j.message.StructuredDataId;
39 import org.apache.logging.log4j.message.StructuredDataMessage;
40 import org.apache.logging.log4j.util.Strings;
41
42
43
44
45 public class FlumeEvent extends SimpleEvent implements LogEvent {
46
47 static final String GUID = "guId";
48
49
50
51 private static final long serialVersionUID = -8988674608627854140L;
52
53 private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
54
55 private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
56
57 private static final String EVENT_TYPE = "eventType";
58
59 private static final String EVENT_ID = "eventId";
60
61 private static final String TIMESTAMP = "timeStamp";
62
63 private final LogEvent event;
64
65 private final Map<String, String> contextMap = new HashMap<String, String>();
66
67 private final boolean compress;
68
69
70
71
72
73
74
75
76
77
78
79 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
80 String mdcPrefix, String eventPrefix, final boolean compress) {
81 this.event = event;
82 this.compress = compress;
83 final Map<String, String> headers = getHeaders();
84 headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
85 if (mdcPrefix == null) {
86 mdcPrefix = DEFAULT_MDC_PREFIX;
87 }
88 if (eventPrefix == null) {
89 eventPrefix = DEFAULT_EVENT_PREFIX;
90 }
91 final Map<String, String> mdc = event.getContextMap();
92 if (includes != null) {
93 final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
94 if (array.length > 0) {
95 for (String str : array) {
96 str = str.trim();
97 if (mdc.containsKey(str)) {
98 contextMap.put(str, mdc.get(str));
99 }
100 }
101 }
102 } else if (excludes != null) {
103 final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
104 if (array.length > 0) {
105 final List<String> list = new ArrayList<String>(array.length);
106 for (final String value : array) {
107 list.add(value.trim());
108 }
109 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
110 if (!list.contains(entry.getKey())) {
111 contextMap.put(entry.getKey(), entry.getValue());
112 }
113 }
114 }
115 } else {
116 contextMap.putAll(mdc);
117 }
118
119 if (required != null) {
120 final String[] array = required.split(Patterns.COMMA_SEPARATOR);
121 if (array.length > 0) {
122 for (String str : array) {
123 str = str.trim();
124 if (!mdc.containsKey(str)) {
125 throw new LoggingException("Required key " + str + " is missing from the MDC");
126 }
127 }
128 }
129 }
130 final String guid = UuidUtil.getTimeBasedUuid().toString();
131 final Message message = event.getMessage();
132 if (message instanceof MapMessage) {
133
134 ((MapMessage) message).put(GUID, guid);
135 if (message instanceof StructuredDataMessage) {
136 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
137 }
138 addMapData(eventPrefix, headers, (MapMessage) message);
139 } else {
140 headers.put(GUID, guid);
141 }
142
143 addContextData(mdcPrefix, headers, contextMap);
144 }
145
146 protected void addStructuredData(final String prefix, final Map<String, String> fields,
147 final StructuredDataMessage msg) {
148 fields.put(prefix + EVENT_TYPE, msg.getType());
149 final StructuredDataId id = msg.getId();
150 fields.put(prefix + EVENT_ID, id.getName());
151 }
152
153 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
154 final Map<String, String> data = msg.getData();
155 for (final Map.Entry<String, String> entry : data.entrySet()) {
156 fields.put(prefix + entry.getKey(), entry.getValue());
157 }
158 }
159
160 protected void addContextData(final String prefix, final Map<String, String> fields,
161 final Map<String, String> context) {
162 final Map<String, String> map = new HashMap<String, String>();
163 for (final Map.Entry<String, String> entry : context.entrySet()) {
164 if (entry.getKey() != null && entry.getValue() != null) {
165 fields.put(prefix + entry.getKey(), entry.getValue());
166 map.put(prefix + entry.getKey(), entry.getValue());
167 }
168 }
169 context.clear();
170 context.putAll(map);
171 }
172
173
174
175
176
177 @Override
178 public void setBody(final byte[] body) {
179 if (body == null || body.length == 0) {
180 super.setBody(new byte[0]);
181 return;
182 }
183 if (compress) {
184 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
185 try {
186 final GZIPOutputStream os = new GZIPOutputStream(baos);
187 os.write(body);
188 os.close();
189 } catch (final IOException ioe) {
190 throw new LoggingException("Unable to compress message", ioe);
191 }
192 super.setBody(baos.toByteArray());
193 } else {
194 super.setBody(body);
195 }
196 }
197
198
199
200
201
202 @Override
203 public String getLoggerFqcn() {
204 return event.getLoggerFqcn();
205 }
206
207
208
209
210
211 @Override
212 public Level getLevel() {
213 return event.getLevel();
214 }
215
216
217
218
219
220 @Override
221 public String getLoggerName() {
222 return event.getLoggerName();
223 }
224
225
226
227
228
229 @Override
230 public StackTraceElement getSource() {
231 return event.getSource();
232 }
233
234
235
236
237
238 @Override
239 public Message getMessage() {
240 return event.getMessage();
241 }
242
243
244
245
246
247 @Override
248 public Marker getMarker() {
249 return event.getMarker();
250 }
251
252
253
254
255
256 @Override
257 public String getThreadName() {
258 return event.getThreadName();
259 }
260
261
262
263
264
265 @Override
266 public long getTimeMillis() {
267 return event.getTimeMillis();
268 }
269
270
271
272
273
274 @Override
275 public Throwable getThrown() {
276 return event.getThrown();
277 }
278
279
280
281
282
283 @Override
284 public ThrowableProxy getThrownProxy() {
285 return event.getThrownProxy();
286 }
287
288
289
290
291
292 @Override
293 public Map<String, String> getContextMap() {
294 return contextMap;
295 }
296
297
298
299
300
301 @Override
302 public ThreadContext.ContextStack getContextStack() {
303 return event.getContextStack();
304 }
305
306 @Override
307 public boolean isIncludeLocation() {
308 return event.isIncludeLocation();
309 }
310
311 @Override
312 public void setIncludeLocation(final boolean includeLocation) {
313 event.setIncludeLocation(includeLocation);
314 }
315
316 @Override
317 public boolean isEndOfBatch() {
318 return event.isEndOfBatch();
319 }
320
321 @Override
322 public void setEndOfBatch(final boolean endOfBatch) {
323 event.setEndOfBatch(endOfBatch);
324 }
325 }