View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.zip.GZIPOutputStream;
26  
27  import org.apache.flume.event.SimpleEvent;
28  import org.apache.logging.log4j.Level;
29  import org.apache.logging.log4j.LoggingException;
30  import org.apache.logging.log4j.Marker;
31  import org.apache.logging.log4j.ThreadContext;
32  import org.apache.logging.log4j.core.LogEvent;
33  import org.apache.logging.log4j.core.helpers.UUIDUtil;
34  import org.apache.logging.log4j.message.MapMessage;
35  import org.apache.logging.log4j.message.Message;
36  import org.apache.logging.log4j.message.StructuredDataId;
37  import org.apache.logging.log4j.message.StructuredDataMessage;
38  
39  /**
40   * Class that is both a Flume and Log4j Event.
41   */
42  public class FlumeEvent extends SimpleEvent implements LogEvent {
43  
44      static final String GUID = "guId";
45      /**
46       * Generated serial version ID.
47       */
48      private static final long serialVersionUID = -8988674608627854140L;
49  
50      private static final String DEFAULT_MDC_PREFIX = "";
51  
52      private static final String DEFAULT_EVENT_PREFIX = "";
53  
54      private static final String EVENT_TYPE = "eventType";
55  
56      private static final String EVENT_ID = "eventId";
57  
58      private static final String TIMESTAMP = "timeStamp";
59  
60      private final LogEvent event;
61  
62      private final Map<String, String> ctx = new HashMap<String, String>();
63  
64      private final boolean compress;
65  
66      /**
67       * Construct the FlumeEvent.
68       * @param event The Log4j LogEvent.
69       * @param includes A comma separated list of MDC elements to include.
70       * @param excludes A comma separated list of MDC elements to exclude.
71       * @param required A comma separated list of MDC elements that are required to be defined.
72       * @param mdcPrefix The value to prefix to MDC keys.
73       * @param eventPrefix The value to prefix to event keys.
74       * @param compress If true the event body should be compressed.
75       */
76      public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
77                        String mdcPrefix, String eventPrefix, final boolean compress) {
78          this.event = event;
79          this.compress = compress;
80          final Map<String, String> headers = getHeaders();
81          headers.put(TIMESTAMP, Long.toString(event.getMillis()));
82          if (mdcPrefix == null) {
83              mdcPrefix = DEFAULT_MDC_PREFIX;
84          }
85          if (eventPrefix == null) {
86              eventPrefix = DEFAULT_EVENT_PREFIX;
87          }
88          final Map<String, String> mdc = event.getContextMap();
89          if (includes != null) {
90              final String[] array = includes.split(",");
91              if (array.length > 0) {
92                  for (String str : array) {
93                      str = str.trim();
94                      if (mdc.containsKey(str)) {
95                          ctx.put(str, mdc.get(str));
96                      }
97                  }
98              }
99          } else if (excludes != null) {
100             final String[] array = excludes.split(",");
101             if (array.length > 0) {
102                 final List<String> list = new ArrayList<String>(array.length);
103                 for (final String value : array) {
104                     list.add(value.trim());
105                 }
106                 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
107                     if (!list.contains(entry.getKey())) {
108                         ctx.put(entry.getKey(), entry.getValue());
109                     }
110                 }
111             }
112         } else {
113             ctx.putAll(mdc);
114         }
115 
116         if (required != null) {
117             final String[] array = required.split(",");
118             if (array.length > 0) {
119                 for (String str : array) {
120                     str = str.trim();
121                     if (!mdc.containsKey(str)) {
122                         throw new LoggingException("Required key " + str + " is missing from the MDC");
123                     }
124                 }
125             }
126         }
127         final String guid =  UUIDUtil.getTimeBasedUUID().toString();
128         final Message message = event.getMessage();
129         if (message instanceof MapMessage) {
130             // Add the guid to the Map so that it can be included in the Layout.
131             ((MapMessage) message).put(GUID, guid);
132             if (message instanceof StructuredDataMessage) {
133                 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
134             }
135             addMapData(eventPrefix, headers, (MapMessage) message);
136         } else {
137             headers.put(GUID, guid);
138         }
139 
140         addContextData(mdcPrefix, headers, ctx);
141     }
142 
143     protected void addStructuredData(final String prefix, final Map<String, String> fields,
144                                      final StructuredDataMessage msg) {
145         fields.put(prefix + EVENT_TYPE, msg.getType());
146         final StructuredDataId id = msg.getId();
147         fields.put(prefix + EVENT_ID, id.getName());
148     }
149 
150     protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
151         final Map<String, String> data = msg.getData();
152         for (final Map.Entry<String, String> entry : data.entrySet()) {
153             fields.put(prefix + entry.getKey(), entry.getValue());
154         }
155     }
156 
157     protected void addContextData(final String prefix, final Map<String, String> fields,
158                                   final Map<String, String> context) {
159         Map<String, String> map = new HashMap<String, String>();
160         for (final Map.Entry<String, String> entry : context.entrySet()) {
161             if (entry.getKey() != null && entry.getValue() != null) {
162                 fields.put(prefix + entry.getKey(), entry.getValue());
163                 map.put(prefix + entry.getKey(), entry.getValue());
164             }
165         }
166         context.clear();
167         context.putAll(map);
168     }
169 
170     /**
171      * Set the body in the event.
172      * @param body The body to add to the event.
173      */
174     @Override
175     public void setBody(final byte[] body) {
176         if (body == null || body.length == 0) {
177             super.setBody(new byte[0]);
178             return;
179         }
180         if (compress) {
181             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
182             try {
183                 final GZIPOutputStream os = new GZIPOutputStream(baos);
184                 os.write(body);
185                 os.close();
186             } catch (final IOException ioe) {
187                 throw new LoggingException("Unable to compress message", ioe);
188             }
189             super.setBody(baos.toByteArray());
190         } else {
191             super.setBody(body);
192         }
193     }
194 
195     /**
196      * Get the Frequently Qualified Class Name.
197      * @return the FQCN String.
198      */
199     @Override
200     public String getFQCN() {
201         return event.getFQCN();
202     }
203 
204     /**
205      * Returns the logging Level.
206      * @return the Level.
207      */
208     @Override
209     public Level getLevel() {
210         return event.getLevel();
211     }
212 
213     /**
214      * Returns the logger name.
215      * @return the logger name.
216      */
217     @Override
218     public String getLoggerName() {
219         return event.getLoggerName();
220     }
221 
222     /**
223      * Returns the StackTraceElement for the caller of the logging API.
224      * @return the StackTraceElement of the caller.
225      */
226     @Override
227     public StackTraceElement getSource() {
228         return event.getSource();
229     }
230 
231     /**
232      * Returns the Message.
233      * @return the Message.
234      */
235     @Override
236     public Message getMessage() {
237         return event.getMessage();
238     }
239 
240     /**
241      * Returns the Marker.
242      * @return the Marker.
243      */
244     @Override
245     public Marker getMarker() {
246         return event.getMarker();
247     }
248 
249     /**
250      * Returns the name of the Thread.
251      * @return the name of the Thread.
252      */
253     @Override
254     public String getThreadName() {
255         return event.getThreadName();
256     }
257 
258     /**
259      * Returns the event timestamp.
260      * @return the event timestamp.
261      */
262     @Override
263     public long getMillis() {
264         return event.getMillis();
265     }
266 
267     /**
268      * Returns the Throwable associated with the event, if any.
269      * @return the Throwable.
270      */
271     @Override
272     public Throwable getThrown() {
273         return event.getThrown();
274     }
275 
276     /**
277      * Returns a copy of the context Map.
278      * @return a copy of the context Map.
279      */
280     @Override
281     public Map<String, String> getContextMap() {
282         return ctx;
283     }
284 
285     /**
286      * Returns a copy of the context stack.
287      * @return a copy of the context stack.
288      */
289     @Override
290     public ThreadContext.ContextStack getContextStack() {
291         return event.getContextStack();
292     }
293 
294     @Override
295     public boolean isIncludeLocation() {
296         return event.isIncludeLocation();
297     }
298 
299     @Override
300     public void setIncludeLocation(final boolean includeLocation) {
301         event.setIncludeLocation(includeLocation);
302     }
303 
304     @Override
305     public boolean isEndOfBatch() {
306         return event.isEndOfBatch();
307     }
308 
309     @Override
310     public void setEndOfBatch(final boolean endOfBatch) {
311         event.setEndOfBatch(endOfBatch);
312     }
313 }