View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.log4j.db;
19  
20  import org.apache.log4j.Level;
21  import org.apache.log4j.Logger;
22  import org.apache.log4j.plugins.Pauseable;
23  import org.apache.log4j.plugins.Receiver;
24  import org.apache.log4j.scheduler.Job;
25  import org.apache.log4j.scheduler.Scheduler;
26  import org.apache.log4j.spi.LocationInfo;
27  import org.apache.log4j.spi.LoggerRepositoryEx;
28  import org.apache.log4j.spi.LoggingEvent;
29  import org.apache.log4j.spi.ThrowableInformation;
30  import org.apache.log4j.xml.DOMConfigurator;
31  import org.apache.log4j.xml.UnrecognizedElementHandler;
32  import org.w3c.dom.Element;
33  
34  import java.sql.Connection;
35  import java.sql.ResultSet;
36  import java.sql.SQLException;
37  import java.sql.Statement;
38  import java.util.Hashtable;
39  import java.util.Properties;
40  import java.util.StringTokenizer;
41  
42  /**
43   * Converts log data stored in a database into LoggingEvents.
44   * <p>
45   * <b>NOTE:</b> This receiver cannot yet be created through Chainsaw's receiver panel.
46   * It must be created through an XML configuration file.
47   * <p>
48   * This receiver supports database configuration via ConnectionSource, in the
49   * org.apache.log4j.db package: DriverManagerConnectionSource,
50   * DataSourceConnectionSource, JNDIConnectionSource
51   * <p>
52   * This database receiver differs from DBReceiver in that this receiver relies
53   * on custom SQL to retrieve logging event data, where DBReceiver requires the
54   * use of a log4j-defined schema.
55   * <p>
56   * A 'refreshMillis' int parameter controls SQL execution. If 'refreshMillis' is
57   * zero (the default), the receiver will run only one time. If it is set to any
58   * other numeric value, the SQL will be executed on a recurring basis every
59   * 'refreshMillis' milliseconds.
60   * <p>
61   * The receiver closes the connection and acquires a new connection on each
62   * execution of the SQL (use pooled connections if possible).
63   * <p>
64   * If the SQL will be executing on a recurring basis, specify the IDField param -
65   * the column name holding the unique identifier (int) representing the logging
66   * event.
67   * <p>
68   * As events are retrieved, the column represented by IDField is examined and
69   * the largest value is held and used by the next execution of the SQL statement
70   * to avoid retrieving previously processed events.
71   * <p>
72   * As an example, the IDField references a 'COUNTER' (int, auto-increment,
73   * unique) column. The first execution of the SQL statement returns 500 rows,
74   * with a final value in the COUNTER field of 500.
75   * <p>
76   * The SQL statement is manipulated prior to the next execution, adding ' WHERE
77   * COUNTER &gt; 500' to the statement to avoid retrieval of previously processed
78   * events.
79   * <p>
80   * The select statement must provide ALL fields which define a LoggingEvent.
81   * <p>
82   * The SQL statement MUST include the columns: LOGGER, TIMESTAMP, LEVEL, THREAD,
83   * MESSAGE, NDC, MDC, CLASS, METHOD, FILE, LINE, PROPERTIES, THROWABLE
84   * <p>
85   * Use ' AS ' in the SQL statement to alias the SQL's column names to match your
86   * database schema. (see example below).
87   * <p>
88   * Include all fields in the SQL statement, even if you don't have data for the
89   * field (specify an empty string as the value for columns which you don't have
90   * data).
91   * <p>
92   * The TIMESTAMP column must be a datetime.
93   * <p>
94   * Both a PROPERTIES column and an MDC column are supported. These fields
95   * represent Maps on the logging event, but require the use of string
96   * concatenation database functions to hold the (possibly multiple) name/value
97   * pairs in the column.
98   * <p>
99   * For example, to include both 'userid' and 'lastname' properties in the
100  * logging event (from either the PROPERTIES or MDC columns), the name/value
101  * pairs must be concatenated together by your database.
102  * <p>
103  * The resulting PROPERTIES or MDC column must have data in this format: {{name,
104  * value, name2, value2}}
105  * <p>
106  * The resulting PROPERTIES column would contain this text: {{userid, someone,
107  * lastname, mylastname}}
108  * <p>
109  * Here is an example of concatenating a PROPERTIES or MDC column using MySQL's
110  * concat function, where the 'application' and 'hostname' parameters were fixed
111  * text, but the 'log4jid' key's value is the value of the COUNTER column:
112  * <p>
113  * concat("{{application,databaselogs,hostname,mymachine,log4jid,", COUNTER,
114  * "}}") as PROPERTIES
115  * <p>
116  * log4jid is a special property that is used by Chainsaw to represent an 'ID'
117  * field. Specify this property to ensure you can map events in Chainsaw to
118  * events in the database if you need to go back and view events at a later time
119  * or save the events to XML for later analysis.
120  * <p>
121  * Here is a complete MySQL SQL statement which can be used to provide events to
122  * Chainsaw (note how in the example below, there is no column in logtable representing the throwable, so an
123  * empty string is passed in and an ALIAS is still defined):
124  * <p>
125  * select myloggercolumn as LOGGER, mytimestampcolumn as TIMESTAMP, mylevelcolumn as LEVEL, mythreadcolumn as
126  * THREAD, mymessagecolumn as MESSAGE, myndccolumn as NDC, mymdccolumn as MDC, myclasscolumn as CLASS, mymethodcolumn as
127  * METHOD, myfilecolumn as FILE, mylinecolumn as LINE,
128  * concat("{{application,databaselogs,hostname,mymachine, log4jid,",
129  * COUNTER,"}}") as PROPERTIES, "" as THROWABLE from logtable
130  * <p>
131  *
132  * @author Scott Deboy &lt;sdeboy@apache.org&gt;
133  * <p>
134  */
135 public class CustomSQLDBReceiver extends Receiver implements Pauseable, UnrecognizedElementHandler {
136 
137     protected volatile Connection connection = null;
138 
139     protected String sqlStatement = "";
140 
141     /**
142      * By default we refresh data every 1000 milliseconds.
143      *
144      * @see #setRefreshMillis
145      */
146     static int DEFAULT_REFRESH_MILLIS = 1000;
147 
148     int refreshMillis = DEFAULT_REFRESH_MILLIS;
149 
150     protected String idField = null;
151 
152     int lastID = -1;
153 
154     private static final String WHERE_CLAUSE = " WHERE ";
155 
156     private static final String AND_CLAUSE = " AND ";
157 
158     private boolean whereExists = false;
159 
160     private boolean paused = false;
161 
162     private ConnectionSource connectionSource;
163 
164     public static final String LOG4J_ID_KEY = "log4jid";
165 
166     private Job customReceiverJob;
167 
168     public void activateOptions() {
169 
170         if (connectionSource == null) {
171             throw new IllegalStateException(
172                 "CustomSQLDBReceiver cannot function without a connection source");
173         }
174         whereExists = (sqlStatement.toUpperCase().contains(WHERE_CLAUSE));
175 
176         customReceiverJob = new CustomReceiverJob();
177 
178         if (this.repository == null) {
179             throw new IllegalStateException(
180                 "CustomSQLDBReceiver cannot function without a reference to its owning repository");
181         }
182 
183 
184         if (repository instanceof LoggerRepositoryEx) {
185             Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
186 
187             scheduler.schedule(
188                 customReceiverJob, System.currentTimeMillis() + 500, refreshMillis);
189         }
190 
191     }
192 
193     void closeConnection() {
194         if (connection != null) {
195             try {
196                 // LogLog.warn("closing the connection. ", new Exception("x"));
197                 connection.close();
198             } catch (SQLException sqle) {
199                 // nothing we can do here
200             }
201         }
202     }
203 
204     public void setRefreshMillis(int refreshMillis) {
205         this.refreshMillis = refreshMillis;
206     }
207 
208     public int getRefreshMillis() {
209         return refreshMillis;
210     }
211 
212     /**
213      * @return Returns the connectionSource.
214      */
215     public ConnectionSource getConnectionSource() {
216         return connectionSource;
217     }
218 
219     /**
220      * @param connectionSource The connectionSource to set.
221      */
222     public void setConnectionSource(ConnectionSource connectionSource) {
223         this.connectionSource = connectionSource;
224     }
225 
226     public void close() {
227         try {
228             if ((connection != null) && !connection.isClosed()) {
229                 connection.close();
230             }
231         } catch (SQLException e) {
232             e.printStackTrace();
233         } finally {
234             connection = null;
235         }
236     }
237 
238     public void finalize() throws Throwable {
239         super.finalize();
240         close();
241     }
242 
243     /*
244      * (non-Javadoc)
245      *
246      * @see org.apache.log4j.plugins.Plugin#shutdown()
247      */
248     public void shutdown() {
249         getLogger().info("removing receiverJob from the Scheduler.");
250 
251         if (this.repository instanceof LoggerRepositoryEx) {
252             Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
253             scheduler.delete(customReceiverJob);
254         }
255 
256         lastID = -1;
257     }
258 
259     public void setSql(String s) {
260         sqlStatement = s;
261     }
262 
263     public String getSql() {
264         return sqlStatement;
265     }
266 
267     public void setIDField(String id) {
268         idField = id;
269     }
270 
271     public String getIDField() {
272         return idField;
273     }
274 
275     public synchronized void setPaused(boolean p) {
276         paused = p;
277     }
278 
279     public synchronized boolean isPaused() {
280         return paused;
281     }
282 
283     class CustomReceiverJob implements Job {
284         public void execute() {
285             int oldLastID = lastID;
286             try {
287                 connection = connectionSource.getConnection();
288                 Statement statement = connection.createStatement();
289 
290                 Logger eventLogger;
291                 long timeStamp;
292                 String level;
293                 String threadName;
294                 Object message;
295                 String ndc;
296                 Hashtable<String, String> mdc;
297                 String[] throwable;
298                 String className;
299                 String methodName;
300                 String fileName;
301                 String lineNumber;
302                 Hashtable<String, String> properties;
303 
304                 String currentSQLStatement;
305                 if (whereExists) {
306                     currentSQLStatement = sqlStatement + AND_CLAUSE + idField
307                         + " > " + lastID;
308                 } else {
309                     currentSQLStatement = sqlStatement + WHERE_CLAUSE + idField
310                         + " > " + lastID;
311                 }
312 
313                 ResultSet rs = statement.executeQuery(currentSQLStatement);
314 
315                 int i = 0;
316                 while (rs.next()) {
317                     // add a small break every 1000 received events
318                     if (++i == 1000) {
319                         synchronized (this) {
320                             try {
321                                 // add a delay
322                                 wait(300);
323                             } catch (InterruptedException ie) {
324                             }
325                             i = 0;
326                         }
327                     }
328                     eventLogger = Logger.getLogger(rs.getString("LOGGER"));
329                     timeStamp = rs.getTimestamp("TIMESTAMP").getTime();
330 
331                     level = rs.getString("LEVEL");
332                     threadName = rs.getString("THREAD");
333                     message = rs.getString("MESSAGE");
334                     ndc = rs.getString("NDC");
335 
336                     String mdcString = rs.getString("MDC");
337                     mdc = new Hashtable<>();
338 
339                     if (mdcString != null) {
340                         // support MDC being wrapped in {{name, value}}
341                         // or
342                         // just name, value
343                         if ((mdcString.contains("{{"))
344                             && (mdcString.contains("}}"))) {
345                             mdcString = mdcString
346                                 .substring(mdcString.indexOf("{{") + 2,
347                                     mdcString.indexOf("}}"));
348                         }
349 
350                         StringTokenizer tok = new StringTokenizer(mdcString,
351                             ",");
352 
353                         while (tok.countTokens() > 1) {
354                             mdc.put(tok.nextToken(), tok.nextToken());
355                         }
356                     }
357 
358                     throwable = new String[]{rs.getString("THROWABLE")};
359                     className = rs.getString("CLASS");
360                     methodName = rs.getString("METHOD");
361                     fileName = rs.getString("FILE");
362                     lineNumber = rs.getString("LINE");
363 
364                     // if properties are provided in the
365                     // SQL they can be used here (for example, to route
366                     // events to a unique tab in
367                     // Chainsaw if the machinename and/or appname
368                     // property
369                     // are set)
370                     String propertiesString = rs.getString("PROPERTIES");
371                     properties = new Hashtable<>();
372 
373                     if (propertiesString != null) {
374                         // support properties being wrapped in {{name,
375                         // value}} or just name, value
376                         if ((propertiesString.contains("{{"))
377                             && (propertiesString.contains("}}"))) {
378                             propertiesString = propertiesString.substring(
379                                 propertiesString.indexOf("{{") + 2,
380                                 propertiesString.indexOf("}}"));
381                         }
382 
383                         StringTokenizer tok2 = new StringTokenizer(
384                             propertiesString, ",");
385                         while (tok2.countTokens() > 1) {
386                             String tokenName = tok2.nextToken();
387                             String value = tok2.nextToken();
388                             if (tokenName.equals(LOG4J_ID_KEY)) {
389                                 try {
390                                     int thisInt = Integer.parseInt(value);
391                                     value = String.valueOf(thisInt);
392                                     if (thisInt > lastID) {
393                                         lastID = thisInt;
394                                     }
395                                 } catch (Exception e) {
396                                 }
397                             }
398                             properties.put(tokenName, value);
399                         }
400                     }
401 
402                     Level levelImpl = Level.toLevel(level);
403 
404 
405                     LocationInfo locationInfo = new LocationInfo(fileName,
406                         className, methodName, lineNumber);
407 
408                     ThrowableInformation throwableInfo = new ThrowableInformation(
409                         throwable);
410 
411                     properties.putAll(mdc);
412 
413                     LoggingEvent event = new LoggingEvent(eventLogger.getName(),
414                         eventLogger, timeStamp, levelImpl, message,
415                         threadName,
416                         throwableInfo,
417                         ndc,
418                         locationInfo,
419                         properties);
420 
421                     doPost(event);
422                 }
423                 //log when rows are retrieved
424                 if (lastID != oldLastID) {
425                     getLogger().debug("lastID: " + lastID);
426                 }
427 
428                 statement.close();
429             } catch (SQLException sqle) {
430                 getLogger()
431                     .error("*************Problem receiving events", sqle);
432             } finally {
433                 closeConnection();
434             }
435 
436             // if paused, loop prior to executing sql query
437             synchronized (this) {
438                 while (isPaused()) {
439                     try {
440                         wait(1000);
441                     } catch (InterruptedException ie) {
442                     }
443                 }
444             }
445         }
446     }
447 
448     /**
449      * {@inheritDoc}
450      */
451     public boolean parseUnrecognizedElement(Element element, Properties props) throws Exception {
452         if ("connectionSource".equals(element.getNodeName())) {
453             Object instance =
454                 DOMConfigurator.parseElement(element, props, ConnectionSource.class);
455             if (instance instanceof ConnectionSource) {
456                 ConnectionSource source = (ConnectionSource) instance;
457                 source.activateOptions();
458                 setConnectionSource(source);
459             }
460             return true;
461         }
462         return false;
463     }
464 
465 }