View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.HashMap;
20  import java.util.Locale;
21  import java.util.Map;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.flume.Event;
25  import org.apache.flume.EventDeliveryException;
26  import org.apache.flume.agent.embedded.EmbeddedAgent;
27  import org.apache.logging.log4j.LoggingException;
28  import org.apache.logging.log4j.core.appender.ManagerFactory;
29  import org.apache.logging.log4j.core.config.ConfigurationException;
30  import org.apache.logging.log4j.core.config.Property;
31  import org.apache.logging.log4j.core.util.NameUtil;
32  import org.apache.logging.log4j.util.PropertiesUtil;
33  import org.apache.logging.log4j.util.Strings;
34  
35  /**
36   *
37   */
38  public class FlumeEmbeddedManager extends AbstractFlumeManager {
39  
40      private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
41  
42      private static final String IN_MEMORY = "InMemory";
43  
44      private static FlumeManagerFactory factory = new FlumeManagerFactory();
45  
46      private final EmbeddedAgent agent;
47  
48      private final String shortName;
49  
50  
51      /**
52       * Constructor
53       * @param name The unique name of this manager.
54       * @param shortName The short version of the agent name.
55       * @param agent The embedded agent.
56       */
57      protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
58          super(name);
59          this.agent = agent;
60          this.shortName = shortName;
61      }
62  
63      /**
64       * Returns a FlumeEmbeddedManager.
65       * @param name The name of the manager.
66       * @param agents The agents to use.
67       * @param properties Properties for the embedded manager.
68       * @param batchSize The number of events to include in a batch.
69       * @param dataDir The directory where the Flume FileChannel should write to.
70       * @return A FlumeAvroManager.
71       */
72      public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
73                                                    int batchSize, final String dataDir) {
74  
75          if (batchSize <= 0) {
76              batchSize = 1;
77          }
78  
79          if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
80              throw new IllegalArgumentException("Either an Agent or properties are required");
81          } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
82              throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
83          }
84  
85          final StringBuilder sb = new StringBuilder();
86          boolean first = true;
87  
88          if (agents != null && agents.length > 0) {
89              sb.append(name).append('[');
90              for (final Agent agent : agents) {
91                  if (!first) {
92                      sb.append('_');
93                  }
94                  sb.append(agent.getHost()).append('-').append(agent.getPort());
95                  first = false;
96              }
97              sb.append(']');
98          } else {
99              String sep = Strings.EMPTY;
100             sb.append(name).append('-');
101             final StringBuilder props = new StringBuilder();
102             for (final Property prop : properties) {
103                 props.append(sep);
104                 props.append(prop.getName()).append('=').append(prop.getValue());
105                 sep = "_";
106             }
107             sb.append(NameUtil.md5(props.toString()));
108         }
109         return getManager(sb.toString(), factory,
110                 new FactoryData(name, agents, properties, batchSize, dataDir));
111     }
112 
113     @Override
114     public void send(final Event event) {
115         try {
116             agent.put(event);
117         } catch (final EventDeliveryException ex) {
118             throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);
119         }
120     }
121 
122     @Override
123     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
124         agent.stop();
125         return true;
126     }
127 
128     /**
129      * Factory data.
130      */
131     private static class FactoryData {
132         private final Agent[] agents;
133         private final Property[] properties;
134         private final int batchSize;
135         private final String dataDir;
136         private final String name;
137 
138         /**
139          * Constructor.
140          * @param name The name of the Appender.
141          * @param agents The agents.
142          * @param properties The Flume configuration properties.
143          * @param batchSize The number of events to include in a batch.
144          * @param dataDir The directory where Flume should write to.
145          */
146         public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
147                            final String dataDir) {
148             this.name = name;
149             this.agents = agents;
150             this.batchSize = batchSize;
151             this.properties = properties;
152             this.dataDir = dataDir;
153         }
154     }
155 
156     /**
157      * Avro Manager Factory.
158      */
159     private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
160 
161         /**
162          * Create the FlumeAvroManager.
163          * @param name The name of the entity to manage.
164          * @param data The data required to create the entity.
165          * @return The FlumeAvroManager.
166          */
167         @Override
168         public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
169             try {
170                 final Map<String, String> props = createProperties(data.name, data.agents, data.properties,
171                     data.batchSize, data.dataDir);
172                 final EmbeddedAgent agent = new EmbeddedAgent(name);
173                 agent.configure(props);
174                 agent.start();
175                 LOGGER.debug("Created Agent " + name);
176                 return new FlumeEmbeddedManager(name, data.name, agent);
177             } catch (final Exception ex) {
178                 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
179             }
180             return null;
181         }
182 
183         private Map<String, String> createProperties(final String name, final Agent[] agents,
184                                                      final Property[] properties, final int batchSize, String dataDir) {
185             final Map<String, String> props = new HashMap<>();
186 
187             if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
188                 LOGGER.error("No Flume configuration provided");
189                 throw new ConfigurationException("No Flume configuration provided");
190             }
191 
192             if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
193                 LOGGER.error("Agents and Flume configuration cannot both be specified");
194                 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
195             }
196 
197             if (agents != null && agents.length > 0) {
198 
199                 if (Strings.isNotEmpty(dataDir)) {
200                     if (dataDir.equals(IN_MEMORY)) {
201                         props.put("channel.type", "memory");
202                     } else {
203                         props.put("channel.type", "file");
204 
205                         if (!dataDir.endsWith(FILE_SEP)) {
206                             dataDir = dataDir + FILE_SEP;
207                         }
208 
209                         props.put("channel.checkpointDir", dataDir + "checkpoint");
210                         props.put("channel.dataDirs", dataDir + "data");
211                     }
212 
213                 } else {
214                     props.put("channel.type", "file");
215                 }
216 
217                 final StringBuilder sb = new StringBuilder();
218                 String leading = Strings.EMPTY;
219                 final int priority = agents.length;
220                 for (int i = 0; i < priority; ++i) {
221                     sb.append(leading).append("agent").append(i);
222                     leading = " ";
223                     final String prefix = "agent" + i;
224                     props.put(prefix + ".type", "avro");
225                     props.put(prefix + ".hostname", agents[i].getHost());
226                     props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
227                     props.put(prefix + ".batch-size", Integer.toString(batchSize));
228                     props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
229                 }
230                 props.put("sinks", sb.toString());
231                 props.put("processor.type", "failover");
232             } else {
233                 String[] sinks = null;
234 
235                 for (final Property property : properties) {
236                     final String key = property.getName();
237 
238                     if (Strings.isEmpty(key)) {
239                         final String msg = "A property name must be provided";
240                         LOGGER.error(msg);
241                         throw new ConfigurationException(msg);
242                     }
243 
244                     final String upperKey = key.toUpperCase(Locale.ENGLISH);
245 
246                     if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
247                         final String msg =
248                             "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
249                         LOGGER.error(msg);
250                         throw new ConfigurationException(msg);
251                     }
252 
253                     final String value = property.getValue();
254                     if (Strings.isEmpty(value)) {
255                         final String msg = "A value for property " + key + " must be provided";
256                         LOGGER.error(msg);
257                         throw new ConfigurationException(msg);
258                     }
259 
260                     if (upperKey.equals("SINKS")) {
261                         sinks = value.trim().split(" ");
262                     }
263 
264                     props.put(key, value);
265                 }
266 
267                 if (sinks == null || sinks.length == 0) {
268                     final String msg = "At least one Sink must be specified";
269                     LOGGER.error(msg);
270                     throw new ConfigurationException(msg);
271                 }
272             }
273             return props;
274         }
275 
276     }
277 
278 }