1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.log4j.net;
19
20 import java.io.IOException;
21 import java.net.DatagramPacket;
22 import java.net.InetAddress;
23 import java.net.MulticastSocket;
24 import java.net.SocketException;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
29
30 import org.apache.log4j.plugins.Pauseable;
31 import org.apache.log4j.plugins.Receiver;
32 import org.apache.log4j.spi.Decoder;
33 import org.apache.log4j.spi.LoggingEvent;
34
35
36
37
38
39
40
41
42
43
44 public class MulticastReceiver extends Receiver implements PortBased,
45 AddressBased, Pauseable {
46 private static final int PACKET_LENGTH = 16384;
47 private int port;
48 private String address;
49 private String encoding;
50 private MulticastSocket socket = null;
51
52
53 private String decoder = "org.apache.log4j.xml.XMLDecoder";
54 private Decoder decoderImpl;
55 private MulticastHandlerThread handlerThread;
56 private MulticastReceiverThread receiverThread;
57 private boolean paused;
58 private boolean advertiseViaMulticastDNS;
59 private ZeroConfSupport zeroConf;
60
61
62
63
64 public static final String ZONE = "_log4j_xml_mcast_receiver.local.";
65
66 public String getDecoder() {
67 return decoder;
68 }
69
70 public void setDecoder(String decoder) {
71 this.decoder = decoder;
72 }
73
74 public int getPort() {
75 return port;
76 }
77
78 public void setPort(int port) {
79 this.port = port;
80 }
81
82 public String getAddress() {
83 return address;
84 }
85
86
87
88
89
90 public void setEncoding(String encoding) {
91 this.encoding = encoding;
92 }
93
94
95
96
97 public String getEncoding() {
98 return encoding;
99 }
100
101 public synchronized void shutdown() {
102 active = false;
103 if (advertiseViaMulticastDNS) {
104 zeroConf.unadvertise();
105 }
106 if (handlerThread != null) {
107 handlerThread.interrupt();
108 }
109 if (receiverThread != null) {
110 receiverThread.interrupt();
111 }
112 if (socket != null) {
113 socket.close();
114 }
115 }
116
117 public void setAddress(String address) {
118 this.address = address;
119 }
120
121 public boolean isPaused() {
122 return paused;
123 }
124
125 public void setPaused(boolean b) {
126 paused = b;
127 }
128
129 public void activateOptions() {
130 InetAddress addr = null;
131
132 try {
133 Class c = Class.forName(decoder);
134 Object o = c.newInstance();
135
136 if (o instanceof Decoder) {
137 this.decoderImpl = (Decoder) o;
138 }
139 } catch (ClassNotFoundException cnfe) {
140 getLogger().warn("Unable to find decoder", cnfe);
141 } catch (IllegalAccessException iae) {
142 getLogger().warn("Could not construct decoder", iae);
143 } catch (InstantiationException ie) {
144 getLogger().warn("Could not construct decoder", ie);
145 }
146
147 try {
148 addr = InetAddress.getByName(address);
149 } catch (UnknownHostException uhe) {
150 uhe.printStackTrace();
151 }
152
153 try {
154 active = true;
155 socket = new MulticastSocket(port);
156 socket.joinGroup(addr);
157 receiverThread = new MulticastReceiverThread();
158 receiverThread.start();
159 handlerThread = new MulticastHandlerThread();
160 handlerThread.start();
161 if (advertiseViaMulticastDNS) {
162 zeroConf = new ZeroConfSupport(ZONE, port, getName());
163 zeroConf.advertise();
164 }
165
166 } catch (IOException ioe) {
167 ioe.printStackTrace();
168 }
169 }
170
171 public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
172 this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
173 }
174
175 public boolean isAdvertiseViaMulticastDNS() {
176 return advertiseViaMulticastDNS;
177 }
178
179 class MulticastHandlerThread extends Thread {
180 private List list = new ArrayList();
181
182 public MulticastHandlerThread() {
183 setDaemon(true);
184 }
185
186 public void append(String data) {
187 synchronized (list) {
188 list.add(data);
189 list.notify();
190 }
191 }
192
193 public void run() {
194 ArrayList list2 = new ArrayList();
195
196 while (isAlive()) {
197 synchronized (list) {
198 try {
199 while (list.size() == 0) {
200 list.wait();
201 }
202
203 if (list.size() > 0) {
204 list2.addAll(list);
205 list.clear();
206 }
207 } catch (InterruptedException ie) {
208 }
209 }
210
211 if (list2.size() > 0) {
212 Iterator iter = list2.iterator();
213
214 while (iter.hasNext()) {
215 String data = (String) iter.next();
216 List v = decoderImpl.decodeEvents(data.trim());
217
218 if (v != null) {
219 Iterator eventIter = v.iterator();
220
221 while (eventIter.hasNext()) {
222 if (!isPaused()) {
223 doPost((LoggingEvent) eventIter.next());
224 }
225 }
226 }
227 }
228
229 list2.clear();
230 } else {
231 try {
232 synchronized (this) {
233 wait(1000);
234 }
235 } catch (InterruptedException ie) {
236 }
237 }
238 }
239 }
240 }
241
242 class MulticastReceiverThread extends Thread {
243 public MulticastReceiverThread() {
244 setDaemon(true);
245 }
246
247 public void run() {
248 active = true;
249
250 byte[] b = new byte[PACKET_LENGTH];
251 DatagramPacket p = new DatagramPacket(b, b.length);
252
253 while (active) {
254 try {
255 socket.receive(p);
256
257
258
259 if (encoding == null) {
260 handlerThread.append(
261 new String(p.getData(), 0, p.getLength()));
262 } else {
263 handlerThread.append(
264 new String(p.getData(), 0, p.getLength(), encoding));
265 }
266 } catch (SocketException se) {
267
268 } catch (IOException ioe) {
269 ioe.printStackTrace();
270 }
271 }
272
273 getLogger().debug("{}'s thread is ending.", MulticastReceiver.this.getName());
274 }
275 }
276 }