View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.log4j.net;
19  
20  import org.apache.log4j.plugins.Pauseable;
21  import org.apache.log4j.plugins.Receiver;
22  import org.apache.log4j.spi.Decoder;
23  import org.apache.log4j.spi.LoggingEvent;
24  
25  import java.io.IOException;
26  import java.net.*;
27  import java.util.ArrayList;
28  import java.util.List;
29  
30  
31  /**
32   * Multicast-based receiver.  Accepts LoggingEvents encoded using
33   * MulticastAppender and XMLLayout. The the XML data is converted
34   * back to a LoggingEvent and is posted.
35   *
36   * @author Scott Deboy <sdeboy@apache.org>
37   */
38  public class MulticastReceiver extends Receiver implements PortBased,
39      AddressBased, Pauseable {
40      private static final int PACKET_LENGTH = 16384;
41      private int port;
42      private String address;
43      private String encoding;
44      private MulticastSocket socket = null;
45  
46      //default to log4j xml decoder
47      private String decoder = "org.apache.log4j.xml.XMLDecoder";
48      private Decoder decoderImpl;
49      private MulticastHandlerThread handlerThread;
50      private MulticastReceiverThread receiverThread;
51      private boolean paused;
52      private boolean advertiseViaMulticastDNS;
53      private ZeroConfSupport zeroConf;
54  
55      /**
56       * The MulticastDNS zone advertised by a MulticastReceiver
57       */
58      public static final String ZONE = "_log4j_xml_mcast_receiver.local.";
59  
60      public String getDecoder() {
61          return decoder;
62      }
63  
64      public void setDecoder(String decoder) {
65          this.decoder = decoder;
66      }
67  
68      public int getPort() {
69          return port;
70      }
71  
72      public void setPort(int port) {
73          this.port = port;
74      }
75  
76      public String getAddress() {
77          return address;
78      }
79  
80      /**
81       * The <b>Encoding</b> option specifies how the bytes are encoded.  If this option is not specified,
82       * the system encoding will be used.
83       */
84      public void setEncoding(String encoding) {
85          this.encoding = encoding;
86      }
87  
88      /**
89       * Returns value of the <b>Encoding</b> option.
90       */
91      public String getEncoding() {
92          return encoding;
93      }
94  
95      public synchronized void shutdown() {
96          active = false;
97          if (advertiseViaMulticastDNS) {
98              zeroConf.unadvertise();
99          }
100         if (handlerThread != null) {
101             handlerThread.interrupt();
102         }
103         if (receiverThread != null) {
104             receiverThread.interrupt();
105         }
106         if (socket != null) {
107             socket.close();
108         }
109     }
110 
111     public void setAddress(String address) {
112         this.address = address;
113     }
114 
115     public boolean isPaused() {
116         return paused;
117     }
118 
119     public void setPaused(boolean b) {
120         paused = b;
121     }
122 
123     public void activateOptions() {
124         InetAddress addr = null;
125 
126         try {
127             Class c = Class.forName(decoder);
128             Object o = c.newInstance();
129 
130             if (o instanceof Decoder) {
131                 this.decoderImpl = (Decoder) o;
132             }
133         } catch (ClassNotFoundException cnfe) {
134             getLogger().warn("Unable to find decoder", cnfe);
135         } catch (IllegalAccessException | InstantiationException iae) {
136             getLogger().warn("Could not construct decoder", iae);
137         }
138 
139         try {
140             addr = InetAddress.getByName(address);
141         } catch (UnknownHostException uhe) {
142             uhe.printStackTrace();
143         }
144 
145         try {
146             active = true;
147             socket = new MulticastSocket(port);
148             socket.joinGroup(addr);
149             receiverThread = new MulticastReceiverThread();
150             receiverThread.start();
151             handlerThread = new MulticastHandlerThread();
152             handlerThread.start();
153             if (advertiseViaMulticastDNS) {
154                 zeroConf = new ZeroConfSupport(ZONE, port, getName());
155                 zeroConf.advertise();
156             }
157 
158         } catch (IOException ioe) {
159             ioe.printStackTrace();
160         }
161     }
162 
163     public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
164         this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
165     }
166 
167     public boolean isAdvertiseViaMulticastDNS() {
168         return advertiseViaMulticastDNS;
169     }
170 
171     class MulticastHandlerThread extends Thread {
172         private final List<String> list = new ArrayList<>();
173 
174         public MulticastHandlerThread() {
175             setDaemon(true);
176         }
177 
178         public void append(String data) {
179             synchronized (list) {
180                 list.add(data);
181                 list.notify();
182             }
183         }
184 
185         public void run() {
186             ArrayList<String> list2 = new ArrayList<>();
187 
188             while (isAlive()) {
189                 synchronized (list) {
190                     try {
191                         while (list.size() == 0) {
192                             list.wait();
193                         }
194 
195                         if (list.size() > 0) {
196                             list2.addAll(list);
197                             list.clear();
198                         }
199                     } catch (InterruptedException ie) {
200                     }
201                 }
202 
203                 if (list2.size() > 0) {
204 
205                     for (Object aList2 : list2) {
206                         String data = (String) aList2;
207                         List<LoggingEvent> v = decoderImpl.decodeEvents(data.trim());
208 
209                         if (v != null) {
210 
211                             for (Object aV : v) {
212                                 if (!isPaused()) {
213                                     doPost((LoggingEvent) aV);
214                                 }
215                             }
216                         }
217                     }
218 
219                     list2.clear();
220                 } else {
221                     try {
222                         synchronized (this) {
223                             wait(1000);
224                         }
225                     } catch (InterruptedException ie) {
226                     }
227                 }
228             }
229         }
230     }
231 
232     class MulticastReceiverThread extends Thread {
233         public MulticastReceiverThread() {
234             setDaemon(true);
235         }
236 
237         public void run() {
238             active = true;
239 
240             byte[] b = new byte[PACKET_LENGTH];
241             DatagramPacket p = new DatagramPacket(b, b.length);
242 
243             while (active) {
244                 try {
245                     socket.receive(p);
246 
247                     //this string constructor which accepts a charset throws an exception if it is
248                     //null
249                     if (encoding == null) {
250                         handlerThread.append(
251                             new String(p.getData(), 0, p.getLength()));
252                     } else {
253                         handlerThread.append(
254                             new String(p.getData(), 0, p.getLength(), encoding));
255                     }
256                 } catch (SocketException se) {
257                     //disconnected
258                 } catch (IOException ioe) {
259                     ioe.printStackTrace();
260                 }
261             }
262 
263             getLogger().debug("{}'s thread is ending.", MulticastReceiver.this.getName());
264         }
265     }
266 }