1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.log4j.net;
19
20 import org.apache.log4j.plugins.Pauseable;
21 import org.apache.log4j.plugins.Receiver;
22 import org.apache.log4j.spi.Decoder;
23 import org.apache.log4j.spi.LoggingEvent;
24
25 import java.io.IOException;
26 import java.net.*;
27 import java.util.ArrayList;
28 import java.util.List;
29
30
31
32
33
34
35
36
37
38 public class MulticastReceiver extends Receiver implements PortBased,
39 AddressBased, Pauseable {
40 private static final int PACKET_LENGTH = 16384;
41 private int port;
42 private String address;
43 private String encoding;
44 private MulticastSocket socket = null;
45
46
47 private String decoder = "org.apache.log4j.xml.XMLDecoder";
48 private Decoder decoderImpl;
49 private MulticastHandlerThread handlerThread;
50 private MulticastReceiverThread receiverThread;
51 private boolean paused;
52 private boolean advertiseViaMulticastDNS;
53 private ZeroConfSupport zeroConf;
54
55
56
57
58 public static final String ZONE = "_log4j_xml_mcast_receiver.local.";
59
60 public String getDecoder() {
61 return decoder;
62 }
63
64 public void setDecoder(String decoder) {
65 this.decoder = decoder;
66 }
67
68 public int getPort() {
69 return port;
70 }
71
72 public void setPort(int port) {
73 this.port = port;
74 }
75
76 public String getAddress() {
77 return address;
78 }
79
80
81
82
83
84 public void setEncoding(String encoding) {
85 this.encoding = encoding;
86 }
87
88
89
90
91 public String getEncoding() {
92 return encoding;
93 }
94
95 public synchronized void shutdown() {
96 active = false;
97 if (advertiseViaMulticastDNS) {
98 zeroConf.unadvertise();
99 }
100 if (handlerThread != null) {
101 handlerThread.interrupt();
102 }
103 if (receiverThread != null) {
104 receiverThread.interrupt();
105 }
106 if (socket != null) {
107 socket.close();
108 }
109 }
110
111 public void setAddress(String address) {
112 this.address = address;
113 }
114
115 public boolean isPaused() {
116 return paused;
117 }
118
119 public void setPaused(boolean b) {
120 paused = b;
121 }
122
123 public void activateOptions() {
124 InetAddress addr = null;
125
126 try {
127 Class c = Class.forName(decoder);
128 Object o = c.newInstance();
129
130 if (o instanceof Decoder) {
131 this.decoderImpl = (Decoder) o;
132 }
133 } catch (ClassNotFoundException cnfe) {
134 getLogger().warn("Unable to find decoder", cnfe);
135 } catch (IllegalAccessException | InstantiationException iae) {
136 getLogger().warn("Could not construct decoder", iae);
137 }
138
139 try {
140 addr = InetAddress.getByName(address);
141 } catch (UnknownHostException uhe) {
142 uhe.printStackTrace();
143 }
144
145 try {
146 active = true;
147 socket = new MulticastSocket(port);
148 socket.joinGroup(addr);
149 receiverThread = new MulticastReceiverThread();
150 receiverThread.start();
151 handlerThread = new MulticastHandlerThread();
152 handlerThread.start();
153 if (advertiseViaMulticastDNS) {
154 zeroConf = new ZeroConfSupport(ZONE, port, getName());
155 zeroConf.advertise();
156 }
157
158 } catch (IOException ioe) {
159 ioe.printStackTrace();
160 }
161 }
162
163 public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
164 this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
165 }
166
167 public boolean isAdvertiseViaMulticastDNS() {
168 return advertiseViaMulticastDNS;
169 }
170
171 class MulticastHandlerThread extends Thread {
172 private final List<String> list = new ArrayList<>();
173
174 public MulticastHandlerThread() {
175 setDaemon(true);
176 }
177
178 public void append(String data) {
179 synchronized (list) {
180 list.add(data);
181 list.notify();
182 }
183 }
184
185 public void run() {
186 ArrayList<String> list2 = new ArrayList<>();
187
188 while (isAlive()) {
189 synchronized (list) {
190 try {
191 while (list.size() == 0) {
192 list.wait();
193 }
194
195 if (list.size() > 0) {
196 list2.addAll(list);
197 list.clear();
198 }
199 } catch (InterruptedException ie) {
200 }
201 }
202
203 if (list2.size() > 0) {
204
205 for (Object aList2 : list2) {
206 String data = (String) aList2;
207 List<LoggingEvent> v = decoderImpl.decodeEvents(data.trim());
208
209 if (v != null) {
210
211 for (Object aV : v) {
212 if (!isPaused()) {
213 doPost((LoggingEvent) aV);
214 }
215 }
216 }
217 }
218
219 list2.clear();
220 } else {
221 try {
222 synchronized (this) {
223 wait(1000);
224 }
225 } catch (InterruptedException ie) {
226 }
227 }
228 }
229 }
230 }
231
232 class MulticastReceiverThread extends Thread {
233 public MulticastReceiverThread() {
234 setDaemon(true);
235 }
236
237 public void run() {
238 active = true;
239
240 byte[] b = new byte[PACKET_LENGTH];
241 DatagramPacket p = new DatagramPacket(b, b.length);
242
243 while (active) {
244 try {
245 socket.receive(p);
246
247
248
249 if (encoding == null) {
250 handlerThread.append(
251 new String(p.getData(), 0, p.getLength()));
252 } else {
253 handlerThread.append(
254 new String(p.getData(), 0, p.getLength(), encoding));
255 }
256 } catch (SocketException se) {
257
258 } catch (IOException ioe) {
259 ioe.printStackTrace();
260 }
261 }
262
263 getLogger().debug("{}'s thread is ending.", MulticastReceiver.this.getName());
264 }
265 }
266 }