1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.log4j.db;
19
20 import java.sql.Connection;
21 import java.sql.ResultSet;
22 import java.sql.SQLException;
23 import java.sql.Statement;
24 import java.util.Hashtable;
25 import java.util.Properties;
26 import java.util.StringTokenizer;
27
28 import org.apache.log4j.Level;
29 import org.apache.log4j.Logger;
30 import org.apache.log4j.plugins.Pauseable;
31 import org.apache.log4j.plugins.Receiver;
32 import org.apache.log4j.scheduler.Job;
33 import org.apache.log4j.scheduler.Scheduler;
34 import org.apache.log4j.spi.LocationInfo;
35 import org.apache.log4j.spi.LoggerRepositoryEx;
36 import org.apache.log4j.spi.LoggingEvent;
37 import org.apache.log4j.spi.ThrowableInformation;
38 import org.apache.log4j.xml.DOMConfigurator;
39 import org.apache.log4j.xml.UnrecognizedElementHandler;
40 import org.w3c.dom.Element;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public class CustomSQLDBReceiver extends Receiver implements Pauseable, UnrecognizedElementHandler {
135
136 protected volatile Connection connection = null;
137
138 protected String sqlStatement = "";
139
140
141
142
143
144
145 static int DEFAULT_REFRESH_MILLIS = 1000;
146
147 int refreshMillis = DEFAULT_REFRESH_MILLIS;
148
149 protected String idField = null;
150
151 int lastID = -1;
152
153 private static final String WHERE_CLAUSE = " WHERE ";
154
155 private static final String AND_CLAUSE = " AND ";
156
157 private boolean whereExists = false;
158
159 private boolean paused = false;
160
161 private ConnectionSource connectionSource;
162
163 public static final String LOG4J_ID_KEY = "log4jid";
164
165 private Job customReceiverJob;
166
167 public void activateOptions() {
168
169 if(connectionSource == null) {
170 throw new IllegalStateException(
171 "CustomSQLDBReceiver cannot function without a connection source");
172 }
173 whereExists = (sqlStatement.toUpperCase().indexOf(WHERE_CLAUSE) > -1);
174
175 customReceiverJob = new CustomReceiverJob();
176
177 if(this.repository == null) {
178 throw new IllegalStateException(
179 "CustomSQLDBReceiver cannot function without a reference to its owning repository");
180 }
181
182
183
184 if (repository instanceof LoggerRepositoryEx) {
185 Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
186
187 scheduler.schedule(
188 customReceiverJob, System.currentTimeMillis() + 500, refreshMillis);
189 }
190
191 }
192
193 void closeConnection() {
194 if (connection != null) {
195 try {
196
197 connection.close();
198 } catch (SQLException sqle) {
199
200 }
201 }
202 }
203
204 public void setRefreshMillis(int refreshMillis) {
205 this.refreshMillis = refreshMillis;
206 }
207
208 public int getRefreshMillis() {
209 return refreshMillis;
210 }
211
212
213
214
215 public ConnectionSource getConnectionSource() {
216 return connectionSource;
217 }
218
219
220
221
222
223 public void setConnectionSource(ConnectionSource connectionSource) {
224 this.connectionSource = connectionSource;
225 }
226
227 public void close() {
228 try {
229 if ((connection != null) && !connection.isClosed()) {
230 connection.close();
231 }
232 } catch (SQLException e) {
233 e.printStackTrace();
234 } finally {
235 connection = null;
236 }
237 }
238
239 public void finalize() throws Throwable {
240 super.finalize();
241 close();
242 }
243
244
245
246
247
248
249 public void shutdown() {
250 getLogger().info("removing receiverJob from the Scheduler.");
251
252 if(this.repository instanceof LoggerRepositoryEx) {
253 Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
254 scheduler.delete(customReceiverJob);
255 }
256
257 lastID = -1;
258 }
259
260 public void setSql(String s) {
261 sqlStatement = s;
262 }
263
264 public String getSql() {
265 return sqlStatement;
266 }
267
268 public void setIDField(String id) {
269 idField = id;
270 }
271
272 public String getIDField() {
273 return idField;
274 }
275
276 public synchronized void setPaused(boolean p) {
277 paused = p;
278 }
279
280 public synchronized boolean isPaused() {
281 return paused;
282 }
283
284 class CustomReceiverJob implements Job {
285 public void execute() {
286 int oldLastID = lastID;
287 try {
288 connection = connectionSource.getConnection();
289 Statement statement = connection.createStatement();
290
291 Logger eventLogger = null;
292 long timeStamp = 0L;
293 String level = null;
294 String threadName = null;
295 Object message = null;
296 String ndc = null;
297 Hashtable mdc = null;
298 String[] throwable = null;
299 String className = null;
300 String methodName = null;
301 String fileName = null;
302 String lineNumber = null;
303 Hashtable properties = null;
304
305 String currentSQLStatement = sqlStatement;
306 if (whereExists) {
307 currentSQLStatement = sqlStatement + AND_CLAUSE + idField
308 + " > " + lastID;
309 } else {
310 currentSQLStatement = sqlStatement + WHERE_CLAUSE + idField
311 + " > " + lastID;
312 }
313
314 ResultSet rs = statement.executeQuery(currentSQLStatement);
315
316 int i = 0;
317 while (rs.next()) {
318
319 if (++i == 1000) {
320 synchronized (this) {
321 try {
322
323 wait(300);
324 } catch (InterruptedException ie) {
325 }
326 i = 0;
327 }
328 }
329 eventLogger = Logger.getLogger(rs.getString("LOGGER"));
330 timeStamp = rs.getTimestamp("TIMESTAMP").getTime();
331
332 level = rs.getString("LEVEL");
333 threadName = rs.getString("THREAD");
334 message = rs.getString("MESSAGE");
335 ndc = rs.getString("NDC");
336
337 String mdcString = rs.getString("MDC");
338 mdc = new Hashtable();
339
340 if (mdcString != null) {
341
342
343
344 if ((mdcString.indexOf("{{") > -1)
345 && (mdcString.indexOf("}}") > -1)) {
346 mdcString = mdcString
347 .substring(mdcString.indexOf("{{") + 2,
348 mdcString.indexOf("}}"));
349 }
350
351 StringTokenizer tok = new StringTokenizer(mdcString,
352 ",");
353
354 while (tok.countTokens() > 1) {
355 mdc.put(tok.nextToken(), tok.nextToken());
356 }
357 }
358
359 throwable = new String[] { rs.getString("THROWABLE") };
360 className = rs.getString("CLASS");
361 methodName = rs.getString("METHOD");
362 fileName = rs.getString("FILE");
363 lineNumber = rs.getString("LINE");
364
365
366
367
368
369
370
371 String propertiesString = rs.getString("PROPERTIES");
372 properties = new Hashtable();
373
374 if (propertiesString != null) {
375
376
377 if ((propertiesString.indexOf("{{") > -1)
378 && (propertiesString.indexOf("}}") > -1)) {
379 propertiesString = propertiesString.substring(
380 propertiesString.indexOf("{{") + 2,
381 propertiesString.indexOf("}}"));
382 }
383
384 StringTokenizer tok2 = new StringTokenizer(
385 propertiesString, ",");
386 while (tok2.countTokens() > 1) {
387 String tokenName = tok2.nextToken();
388 String value = tok2.nextToken();
389 if (tokenName.equals(LOG4J_ID_KEY)) {
390 try {
391 int thisInt = Integer.parseInt(value);
392 value = String.valueOf(thisInt);
393 if (thisInt > lastID) {
394 lastID = thisInt;
395 }
396 } catch (Exception e) {
397 }
398 }
399 properties.put(tokenName, value);
400 }
401 }
402
403 Level levelImpl = Level.toLevel(level);
404
405
406 LocationInfo locationInfo = new LocationInfo(fileName,
407 className, methodName, lineNumber);
408
409 ThrowableInformation throwableInfo = new ThrowableInformation(
410 throwable);
411
412 properties.putAll(mdc);
413
414 LoggingEvent event = new LoggingEvent(eventLogger.getName(),
415 eventLogger, timeStamp, levelImpl, message,
416 threadName,
417 throwableInfo,
418 ndc,
419 locationInfo,
420 properties);
421
422 doPost(event);
423 }
424
425 if (lastID != oldLastID) {
426 getLogger().debug("lastID: " + lastID);
427 oldLastID = lastID;
428 }
429
430 statement.close();
431 statement = null;
432 } catch (SQLException sqle) {
433 getLogger()
434 .error("*************Problem receiving events", sqle);
435 } finally {
436 closeConnection();
437 }
438
439
440 synchronized (this) {
441 while (isPaused()) {
442 try {
443 wait(1000);
444 } catch (InterruptedException ie) {
445 }
446 }
447 }
448 }
449 }
450
451
452
453
454 public boolean parseUnrecognizedElement(Element element, Properties props) throws Exception {
455 if ("connectionSource".equals(element.getNodeName())) {
456 Object instance =
457 DOMConfigurator.parseElement(element, props, ConnectionSource.class);
458 if (instance instanceof ConnectionSource) {
459 ConnectionSource source = (ConnectionSource) instance;
460 source.activateOptions();
461 setConnectionSource(source);
462 }
463 return true;
464 }
465 return false;
466 }
467
468 }