1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.log4j.net;
19
20 import org.apache.log4j.plugins.Pauseable;
21 import org.apache.log4j.plugins.Receiver;
22 import org.apache.log4j.spi.Decoder;
23 import org.apache.log4j.spi.LoggingEvent;
24
25 import java.io.IOException;
26 import java.net.DatagramPacket;
27 import java.net.DatagramSocket;
28 import java.net.SocketException;
29 import java.util.ArrayList;
30 import java.util.List;
31
32
33
34
35
36
37
38
39 public class UDPReceiver extends Receiver implements PortBased, Pauseable {
40 private static final int PACKET_LENGTH = 16384;
41 private UDPReceiverThread receiverThread;
42 private String encoding;
43
44
45 private String decoder = "org.apache.log4j.xml.XMLDecoder";
46 private Decoder decoderImpl;
47 protected boolean paused;
48 private transient boolean closed = false;
49 private int port;
50 private DatagramSocket socket;
51 UDPHandlerThread handlerThread;
52 private boolean advertiseViaMulticastDNS;
53 private ZeroConfSupport zeroConf;
54
55
56
57
58 public static final String ZONE = "_log4j_xml_udp_receiver.local.";
59
60
61 public int getPort() {
62 return port;
63 }
64
65 public void setPort(int port) {
66 this.port = port;
67 }
68
69
70
71
72
73 public void setEncoding(String encoding) {
74 this.encoding = encoding;
75 }
76
77
78
79
80 public String getEncoding() {
81 return encoding;
82 }
83
84 public String getDecoder() {
85 return decoder;
86 }
87
88 public void setDecoder(String decoder) {
89 this.decoder = decoder;
90 }
91
92 public boolean isPaused() {
93 return paused;
94 }
95
96 public void setPaused(boolean b) {
97 paused = b;
98 }
99
100 public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
101 this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
102 }
103
104 public boolean isAdvertiseViaMulticastDNS() {
105 return advertiseViaMulticastDNS;
106 }
107
108 public synchronized void shutdown() {
109 if (closed == true) {
110 return;
111 }
112 closed = true;
113 active = false;
114
115
116 if (socket != null) {
117 socket.close();
118 }
119
120 if (advertiseViaMulticastDNS) {
121 zeroConf.unadvertise();
122 }
123
124 try {
125 if (handlerThread != null) {
126 handlerThread.close();
127 handlerThread.join();
128 }
129 if (receiverThread != null) {
130 receiverThread.join();
131 }
132 } catch (InterruptedException ie) {
133 }
134 }
135
136
137
138
139
140
141
142 public void activateOptions() {
143 try {
144 Class c = Class.forName(decoder);
145 Object o = c.newInstance();
146
147 if (o instanceof Decoder) {
148 this.decoderImpl = (Decoder) o;
149 }
150 } catch (ClassNotFoundException cnfe) {
151 getLogger().warn("Unable to find decoder", cnfe);
152 } catch (IllegalAccessException | InstantiationException iae) {
153 getLogger().warn("Could not construct decoder", iae);
154 }
155
156 try {
157 socket = new DatagramSocket(port);
158 receiverThread = new UDPReceiverThread();
159 receiverThread.start();
160 handlerThread = new UDPHandlerThread();
161 handlerThread.start();
162 if (advertiseViaMulticastDNS) {
163 zeroConf = new ZeroConfSupport(ZONE, port, getName());
164 zeroConf.advertise();
165 }
166 active = true;
167 } catch (IOException ioe) {
168 ioe.printStackTrace();
169 }
170 }
171
172 class UDPHandlerThread extends Thread {
173 private final List<String> list = new ArrayList<>();
174
175 public UDPHandlerThread() {
176 setDaemon(true);
177 }
178
179 public void append(String data) {
180 synchronized (list) {
181 list.add(data);
182 list.notify();
183 }
184 }
185
186
187
188
189 void close() {
190 synchronized (list) {
191 list.notify();
192 }
193 }
194
195 public void run() {
196 ArrayList<String> list2 = new ArrayList<>();
197
198 while (!UDPReceiver.this.closed) {
199 synchronized (list) {
200 try {
201 while (!UDPReceiver.this.closed && list.size() == 0) {
202 list.wait(300);
203 }
204
205 if (list.size() > 0) {
206 list2.addAll(list);
207 list.clear();
208 }
209 } catch (InterruptedException ie) {
210 }
211 }
212
213 if (list2.size() > 0) {
214
215 for (Object aList2 : list2) {
216 String data = (String) aList2;
217 List<LoggingEvent> v = decoderImpl.decodeEvents(data);
218
219 if (v != null) {
220
221 for (Object aV : v) {
222 if (!isPaused()) {
223 doPost((LoggingEvent) aV);
224 }
225 }
226 }
227 }
228
229 list2.clear();
230 } else {
231 try {
232 synchronized (this) {
233 wait(1000);
234 }
235 } catch (InterruptedException ie) {
236 }
237 }
238 }
239 getLogger().debug(UDPReceiver.this.getName() + "'s handler thread is exiting");
240 }
241 }
242
243 class UDPReceiverThread extends Thread {
244 public UDPReceiverThread() {
245 setDaemon(true);
246 }
247
248 public void run() {
249 byte[] b = new byte[PACKET_LENGTH];
250 DatagramPacket p = new DatagramPacket(b, b.length);
251
252 while (!UDPReceiver.this.closed) {
253 try {
254 socket.receive(p);
255
256
257
258 if (encoding == null) {
259 handlerThread.append(
260 new String(p.getData(), 0, p.getLength()));
261 } else {
262 handlerThread.append(
263 new String(p.getData(), 0, p.getLength(), encoding));
264 }
265 } catch (SocketException se) {
266
267 } catch (IOException ioe) {
268 ioe.printStackTrace();
269 }
270 }
271
272
273 }
274 }
275 }