1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.log4j.net;
19
20 import java.io.IOException;
21 import java.net.DatagramPacket;
22 import java.net.DatagramSocket;
23 import java.net.SocketException;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27
28 import org.apache.log4j.plugins.Pauseable;
29 import org.apache.log4j.plugins.Receiver;
30 import org.apache.log4j.spi.Decoder;
31 import org.apache.log4j.spi.LoggingEvent;
32
33
34
35
36
37
38
39
40
41 public class UDPReceiver extends Receiver implements PortBased, Pauseable {
42 private static final int PACKET_LENGTH = 16384;
43 private UDPReceiverThread receiverThread;
44 private String encoding;
45
46
47 private String decoder = "org.apache.log4j.xml.XMLDecoder";
48 private Decoder decoderImpl;
49 protected boolean paused;
50 private transient boolean closed = false;
51 private int port;
52 private DatagramSocket socket;
53 UDPHandlerThread handlerThread;
54 private boolean advertiseViaMulticastDNS;
55 private ZeroConfSupport zeroConf;
56
57
58
59
60 public static final String ZONE = "_log4j_xml_udp_receiver.local.";
61
62
63 public int getPort() {
64 return port;
65 }
66
67 public void setPort(int port) {
68 this.port = port;
69 }
70
71
72
73
74
75 public void setEncoding(String encoding) {
76 this.encoding = encoding;
77 }
78
79
80
81
82 public String getEncoding() {
83 return encoding;
84 }
85
86 public String getDecoder() {
87 return decoder;
88 }
89
90 public void setDecoder(String decoder) {
91 this.decoder = decoder;
92 }
93
94 public boolean isPaused() {
95 return paused;
96 }
97
98 public void setPaused(boolean b) {
99 paused = b;
100 }
101
102 public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
103 this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
104 }
105
106 public boolean isAdvertiseViaMulticastDNS() {
107 return advertiseViaMulticastDNS;
108 }
109
110 public synchronized void shutdown() {
111 if(closed == true) {
112 return;
113 }
114 closed = true;
115 active = false;
116
117
118 if (socket != null) {
119 socket.close();
120 }
121
122 if (advertiseViaMulticastDNS) {
123 zeroConf.unadvertise();
124 }
125
126 try {
127 if(handlerThread != null) {
128 handlerThread.close();
129 handlerThread.join();
130 }
131 if(receiverThread != null) {
132 receiverThread.join();
133 }
134 } catch(InterruptedException ie) {
135 }
136 }
137
138
139
140
141
142
143
144 public void activateOptions() {
145 try {
146 Class c = Class.forName(decoder);
147 Object o = c.newInstance();
148
149 if (o instanceof Decoder) {
150 this.decoderImpl = (Decoder) o;
151 }
152 } catch (ClassNotFoundException cnfe) {
153 getLogger().warn("Unable to find decoder", cnfe);
154 } catch (IllegalAccessException iae) {
155 getLogger().warn("Could not construct decoder", iae);
156 } catch (InstantiationException ie) {
157 getLogger().warn("Could not construct decoder", ie);
158 }
159
160 try {
161 socket = new DatagramSocket(port);
162 receiverThread = new UDPReceiverThread();
163 receiverThread.start();
164 handlerThread = new UDPHandlerThread();
165 handlerThread.start();
166 if (advertiseViaMulticastDNS) {
167 zeroConf = new ZeroConfSupport(ZONE, port, getName());
168 zeroConf.advertise();
169 }
170 active = true;
171 } catch (IOException ioe) {
172 ioe.printStackTrace();
173 }
174 }
175
176 class UDPHandlerThread extends Thread {
177 private List list = new ArrayList();
178
179 public UDPHandlerThread() {
180 setDaemon(true);
181 }
182
183 public void append(String data) {
184 synchronized (list) {
185 list.add(data);
186 list.notify();
187 }
188 }
189
190
191
192
193 void close() {
194 synchronized(list) {
195 list.notify();
196 }
197 }
198
199 public void run() {
200 ArrayList list2 = new ArrayList();
201
202 while (!UDPReceiver.this.closed) {
203 synchronized (list) {
204 try {
205 while (!UDPReceiver.this.closed && list.size() == 0) {
206 list.wait(300);
207 }
208
209 if (list.size() > 0) {
210 list2.addAll(list);
211 list.clear();
212 }
213 } catch (InterruptedException ie) {
214 }
215 }
216
217 if (list2.size() > 0) {
218 Iterator iter = list2.iterator();
219
220 while (iter.hasNext()) {
221 String data = (String) iter.next();
222 List v = decoderImpl.decodeEvents(data);
223
224 if (v != null) {
225 Iterator eventIter = v.iterator();
226
227 while (eventIter.hasNext()) {
228 if (!isPaused()) {
229 doPost((LoggingEvent) eventIter.next());
230 }
231 }
232 }
233 }
234
235 list2.clear();
236 } else {
237 try {
238 synchronized (this) {
239 wait(1000);
240 }
241 } catch (InterruptedException ie) {
242 }
243 }
244 }
245 getLogger().debug(UDPReceiver.this.getName()+ "'s handler thread is exiting");
246 }
247 }
248
249 class UDPReceiverThread extends Thread {
250 public UDPReceiverThread() {
251 setDaemon(true);
252 }
253
254 public void run() {
255 byte[] b = new byte[PACKET_LENGTH];
256 DatagramPacket p = new DatagramPacket(b, b.length);
257
258 while (!UDPReceiver.this.closed) {
259 try {
260 socket.receive(p);
261
262
263
264 if (encoding == null) {
265 handlerThread.append(
266 new String(p.getData(), 0, p.getLength()));
267 } else {
268 handlerThread.append(
269 new String(p.getData(), 0, p.getLength(), encoding));
270 }
271 } catch (SocketException se) {
272
273 } catch (IOException ioe) {
274 ioe.printStackTrace();
275 }
276 }
277
278
279 }
280 }
281 }