View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.zip.GZIPOutputStream;
26  
27  import org.apache.flume.event.SimpleEvent;
28  import org.apache.logging.log4j.Level;
29  import org.apache.logging.log4j.LoggingException;
30  import org.apache.logging.log4j.Marker;
31  import org.apache.logging.log4j.ThreadContext;
32  import org.apache.logging.log4j.util.ReadOnlyStringMap;
33  import org.apache.logging.log4j.core.LogEvent;
34  import org.apache.logging.log4j.core.impl.ThrowableProxy;
35  import org.apache.logging.log4j.core.util.Patterns;
36  import org.apache.logging.log4j.core.util.UuidUtil;
37  import org.apache.logging.log4j.message.MapMessage;
38  import org.apache.logging.log4j.message.Message;
39  import org.apache.logging.log4j.message.StructuredDataId;
40  import org.apache.logging.log4j.message.StructuredDataMessage;
41  import org.apache.logging.log4j.util.Strings;
42  
43  /**
44   * Class that is both a Flume and Log4j Event.
45   */
46  public class FlumeEvent extends SimpleEvent implements LogEvent {
47  
48      static final String GUID = "guId";
49      /**
50       * Generated serial version ID.
51       */
52      private static final long serialVersionUID = -8988674608627854140L;
53  
54      private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
55  
56      private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
57  
58      private static final String EVENT_TYPE = "eventType";
59  
60      private static final String EVENT_ID = "eventId";
61  
62      private static final String TIMESTAMP = "timeStamp";
63  
64      private final LogEvent event;
65  
66      private final Map<String, String> contextMap = new HashMap<>();
67  
68      private final boolean compress;
69  
70      /**
71       * Construct the FlumeEvent.
72       * @param event The Log4j LogEvent.
73       * @param includes A comma separated list of MDC elements to include.
74       * @param excludes A comma separated list of MDC elements to exclude.
75       * @param required A comma separated list of MDC elements that are required to be defined.
76       * @param mdcPrefix The value to prefix to MDC keys.
77       * @param eventPrefix The value to prefix to event keys.
78       * @param compress If true the event body should be compressed.
79       */
80      public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
81                        String mdcPrefix, String eventPrefix, final boolean compress) {
82          this.event = event;
83          this.compress = compress;
84          final Map<String, String> headers = getHeaders();
85          headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
86          if (mdcPrefix == null) {
87              mdcPrefix = DEFAULT_MDC_PREFIX;
88          }
89          if (eventPrefix == null) {
90              eventPrefix = DEFAULT_EVENT_PREFIX;
91          }
92          final Map<String, String> mdc = event.getContextMap();
93          if (includes != null) {
94              final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
95              if (array.length > 0) {
96                  for (String str : array) {
97                      str = str.trim();
98                      if (mdc.containsKey(str)) {
99                          contextMap.put(str, mdc.get(str));
100                     }
101                 }
102             }
103         } else if (excludes != null) {
104             final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
105             if (array.length > 0) {
106                 final List<String> list = new ArrayList<>(array.length);
107                 for (final String value : array) {
108                     list.add(value.trim());
109                 }
110                 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
111                     if (!list.contains(entry.getKey())) {
112                         contextMap.put(entry.getKey(), entry.getValue());
113                     }
114                 }
115             }
116         } else {
117             contextMap.putAll(mdc);
118         }
119 
120         if (required != null) {
121             final String[] array = required.split(Patterns.COMMA_SEPARATOR);
122             if (array.length > 0) {
123                 for (String str : array) {
124                     str = str.trim();
125                     if (!mdc.containsKey(str)) {
126                         throw new LoggingException("Required key " + str + " is missing from the MDC");
127                     }
128                 }
129             }
130         }
131         final String guid =  UuidUtil.getTimeBasedUuid().toString();
132         final Message message = event.getMessage();
133         if (message instanceof MapMessage) {
134             // Add the guid to the Map so that it can be included in the Layout.
135             ((MapMessage) message).put(GUID, guid);
136             if (message instanceof StructuredDataMessage) {
137                 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
138             }
139             addMapData(eventPrefix, headers, (MapMessage) message);
140         } else {
141             headers.put(GUID, guid);
142         }
143 
144         addContextData(mdcPrefix, headers, contextMap);
145     }
146 
147     protected void addStructuredData(final String prefix, final Map<String, String> fields,
148                                      final StructuredDataMessage msg) {
149         fields.put(prefix + EVENT_TYPE, msg.getType());
150         final StructuredDataId id = msg.getId();
151         fields.put(prefix + EVENT_ID, id.getName());
152     }
153 
154     protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
155         final Map<String, String> data = msg.getData();
156         for (final Map.Entry<String, String> entry : data.entrySet()) {
157             fields.put(prefix + entry.getKey(), entry.getValue());
158         }
159     }
160 
161     protected void addContextData(final String prefix, final Map<String, String> fields,
162                                   final Map<String, String> context) {
163         final Map<String, String> map = new HashMap<>();
164         for (final Map.Entry<String, String> entry : context.entrySet()) {
165             if (entry.getKey() != null && entry.getValue() != null) {
166                 fields.put(prefix + entry.getKey(), entry.getValue());
167                 map.put(prefix + entry.getKey(), entry.getValue());
168             }
169         }
170         context.clear();
171         context.putAll(map);
172     }
173 
174     /**
175      * Set the body in the event.
176      * @param body The body to add to the event.
177      */
178     @Override
179     public void setBody(final byte[] body) {
180         if (body == null || body.length == 0) {
181             super.setBody(new byte[0]);
182             return;
183         }
184         if (compress) {
185             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
186             try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
187                 os.write(body);
188             } catch (final IOException ioe) {
189                 throw new LoggingException("Unable to compress message", ioe);
190             }
191             super.setBody(baos.toByteArray());
192         } else {
193             super.setBody(body);
194         }
195     }
196 
197     /**
198      * Get the Frequently Qualified Class Name.
199      * @return the FQCN String.
200      */
201     @Override
202     public String getLoggerFqcn() {
203         return event.getLoggerFqcn();
204     }
205 
206     /**
207      * Returns the logging Level.
208      * @return the Level.
209      */
210     @Override
211     public Level getLevel() {
212         return event.getLevel();
213     }
214 
215     /**
216      * Returns the logger name.
217      * @return the logger name.
218      */
219     @Override
220     public String getLoggerName() {
221         return event.getLoggerName();
222     }
223 
224     /**
225      * Returns the StackTraceElement for the caller of the logging API.
226      * @return the StackTraceElement of the caller.
227      */
228     @Override
229     public StackTraceElement getSource() {
230         return event.getSource();
231     }
232 
233     /**
234      * Returns the Message.
235      * @return the Message.
236      */
237     @Override
238     public Message getMessage() {
239         return event.getMessage();
240     }
241 
242     /**
243      * Returns the Marker.
244      * @return the Marker.
245      */
246     @Override
247     public Marker getMarker() {
248         return event.getMarker();
249     }
250 
251     /**
252      * Returns the ID of the Thread.
253      * @return the ID of the Thread.
254      */
255     @Override
256     public long getThreadId() {
257         return event.getThreadId();
258     }
259 
260     /**
261      * Returns the priority of the Thread.
262      * @return the priority of the Thread.
263      */
264     @Override
265     public int getThreadPriority() {
266         return event.getThreadPriority();
267     }
268 
269     /**
270      * Returns the name of the Thread.
271      * @return the name of the Thread.
272      */
273     @Override
274     public String getThreadName() {
275         return event.getThreadName();
276     }
277 
278     /**
279      * Returns the event timestamp.
280      * @return the event timestamp.
281      */
282     @Override
283     public long getTimeMillis() {
284         return event.getTimeMillis();
285     }
286 
287     /**
288      * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created,
289      * or a dummy value if it is known that this value will not be used downstream.
290      * @return the event nanosecond timestamp.
291      */
292     @Override
293     public long getNanoTime() {
294         return event.getNanoTime();
295     }
296 
297     /**
298      * Returns the Throwable associated with the event, if any.
299      * @return the Throwable.
300      */
301     @Override
302     public Throwable getThrown() {
303         return event.getThrown();
304     }
305 
306     /**
307      * Returns the Throwable associated with the event, if any.
308      * @return the Throwable.
309      */
310     @Override
311     public ThrowableProxy getThrownProxy() {
312         return event.getThrownProxy();
313     }
314 
315     /**
316      * Returns a copy of the context Map.
317      * @return a copy of the context Map.
318      */
319     @Override
320     public Map<String, String> getContextMap() {
321         return contextMap;
322     }
323 
324     /**
325      * Returns the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
326      * @return the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
327      */
328     @Override
329     public ReadOnlyStringMap getContextData() {
330         return event.getContextData();
331     }
332 
333     /**
334      * Returns a copy of the context stack.
335      * @return a copy of the context stack.
336      */
337     @Override
338     public ThreadContext.ContextStack getContextStack() {
339         return event.getContextStack();
340     }
341 
342     @Override
343     public boolean isIncludeLocation() {
344         return event.isIncludeLocation();
345     }
346 
347     @Override
348     public void setIncludeLocation(final boolean includeLocation) {
349         event.setIncludeLocation(includeLocation);
350     }
351 
352     @Override
353     public boolean isEndOfBatch() {
354         return event.isEndOfBatch();
355     }
356 
357     @Override
358     public void setEndOfBatch(final boolean endOfBatch) {
359         event.setEndOfBatch(endOfBatch);
360     }
361 }