View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.log4j.net;
19  
20  import org.apache.log4j.plugins.Pauseable;
21  import org.apache.log4j.plugins.Receiver;
22  import org.apache.log4j.spi.Decoder;
23  import org.apache.log4j.spi.LoggingEvent;
24  
25  import java.io.IOException;
26  import java.net.DatagramPacket;
27  import java.net.DatagramSocket;
28  import java.net.SocketException;
29  import java.util.ArrayList;
30  import java.util.List;
31  
32  
33  /**
34   * Receive LoggingEvents encoded with an XMLLayout, convert the XML data to a
35   * LoggingEvent and post the LoggingEvent.
36   *
37   * @author Scott Deboy <sdeboy@apache.org>
38   */
39  public class UDPReceiver extends Receiver implements PortBased, Pauseable {
40      private static final int PACKET_LENGTH = 16384;
41      private UDPReceiverThread receiverThread;
42      private String encoding;
43  
44      //default to log4j xml decoder
45      private String decoder = "org.apache.log4j.xml.XMLDecoder";
46      private Decoder decoderImpl;
47      protected boolean paused;
48      private transient boolean closed = false;
49      private int port;
50      private DatagramSocket socket;
51      UDPHandlerThread handlerThread;
52      private boolean advertiseViaMulticastDNS;
53      private ZeroConfSupport zeroConf;
54  
55      /**
56       * The MulticastDNS zone advertised by a UDPReceiver
57       */
58      public static final String ZONE = "_log4j_xml_udp_receiver.local.";
59  
60  
61      public int getPort() {
62          return port;
63      }
64  
65      public void setPort(int port) {
66          this.port = port;
67      }
68  
69      /**
70       * The <b>Encoding</b> option specifies how the bytes are encoded.  If this
71       * option is not specified, the system encoding will be used.
72       */
73      public void setEncoding(String encoding) {
74          this.encoding = encoding;
75      }
76  
77      /**
78       * Returns value of the <b>Encoding</b> option.
79       */
80      public String getEncoding() {
81          return encoding;
82      }
83  
84      public String getDecoder() {
85          return decoder;
86      }
87  
88      public void setDecoder(String decoder) {
89          this.decoder = decoder;
90      }
91  
92      public boolean isPaused() {
93          return paused;
94      }
95  
96      public void setPaused(boolean b) {
97          paused = b;
98      }
99  
100     public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
101         this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
102     }
103 
104     public boolean isAdvertiseViaMulticastDNS() {
105         return advertiseViaMulticastDNS;
106     }
107 
108     public synchronized void shutdown() {
109         if (closed == true) {
110             return;
111         }
112         closed = true;
113         active = false;
114         // Closing the datagram socket will unblock the UDPReceiverThread if it is
115         // was waiting to receive data from the socket.
116         if (socket != null) {
117             socket.close();
118         }
119 
120         if (advertiseViaMulticastDNS) {
121             zeroConf.unadvertise();
122         }
123 
124         try {
125             if (handlerThread != null) {
126                 handlerThread.close();
127                 handlerThread.join();
128             }
129             if (receiverThread != null) {
130                 receiverThread.join();
131             }
132         } catch (InterruptedException ie) {
133         }
134     }
135 
136     /**
137      * Returns true if this receiver is active.
138      */
139 //  public synchronized boolean isActive() {
140 //    return isActive;
141 //}
142     public void activateOptions() {
143         try {
144             Class c = Class.forName(decoder);
145             Object o = c.newInstance();
146 
147             if (o instanceof Decoder) {
148                 this.decoderImpl = (Decoder) o;
149             }
150         } catch (ClassNotFoundException cnfe) {
151             getLogger().warn("Unable to find decoder", cnfe);
152         } catch (IllegalAccessException | InstantiationException iae) {
153             getLogger().warn("Could not construct decoder", iae);
154         }
155 
156         try {
157             socket = new DatagramSocket(port);
158             receiverThread = new UDPReceiverThread();
159             receiverThread.start();
160             handlerThread = new UDPHandlerThread();
161             handlerThread.start();
162             if (advertiseViaMulticastDNS) {
163                 zeroConf = new ZeroConfSupport(ZONE, port, getName());
164                 zeroConf.advertise();
165             }
166             active = true;
167         } catch (IOException ioe) {
168             ioe.printStackTrace();
169         }
170     }
171 
172     class UDPHandlerThread extends Thread {
173         private final List<String> list = new ArrayList<>();
174 
175         public UDPHandlerThread() {
176             setDaemon(true);
177         }
178 
179         public void append(String data) {
180             synchronized (list) {
181                 list.add(data);
182                 list.notify();
183             }
184         }
185 
186         /**
187          * Allow the UDPHandlerThread to wakeup and exit gracefully.
188          */
189         void close() {
190             synchronized (list) {
191                 list.notify();
192             }
193         }
194 
195         public void run() {
196             ArrayList<String> list2 = new ArrayList<>();
197 
198             while (!UDPReceiver.this.closed) {
199                 synchronized (list) {
200                     try {
201                         while (!UDPReceiver.this.closed && list.size() == 0) {
202                             list.wait(300);
203                         }
204 
205                         if (list.size() > 0) {
206                             list2.addAll(list);
207                             list.clear();
208                         }
209                     } catch (InterruptedException ie) {
210                     }
211                 }
212 
213                 if (list2.size() > 0) {
214 
215                     for (Object aList2 : list2) {
216                         String data = (String) aList2;
217                         List<LoggingEvent> v = decoderImpl.decodeEvents(data);
218 
219                         if (v != null) {
220 
221                             for (Object aV : v) {
222                                 if (!isPaused()) {
223                                     doPost((LoggingEvent) aV);
224                                 }
225                             }
226                         }
227                     }
228 
229                     list2.clear();
230                 } else {
231                     try {
232                         synchronized (this) {
233                             wait(1000);
234                         }
235                     } catch (InterruptedException ie) {
236                     }
237                 }
238             } // while
239             getLogger().debug(UDPReceiver.this.getName() + "'s handler thread is exiting");
240         } // run
241     } // UDPHandlerThread
242 
243     class UDPReceiverThread extends Thread {
244         public UDPReceiverThread() {
245             setDaemon(true);
246         }
247 
248         public void run() {
249             byte[] b = new byte[PACKET_LENGTH];
250             DatagramPacket p = new DatagramPacket(b, b.length);
251 
252             while (!UDPReceiver.this.closed) {
253                 try {
254                     socket.receive(p);
255 
256                     //this string constructor which accepts a charset throws an exception if it is
257                     //null
258                     if (encoding == null) {
259                         handlerThread.append(
260                             new String(p.getData(), 0, p.getLength()));
261                     } else {
262                         handlerThread.append(
263                             new String(p.getData(), 0, p.getLength(), encoding));
264                     }
265                 } catch (SocketException se) {
266                     //disconnected
267                 } catch (IOException ioe) {
268                     ioe.printStackTrace();
269                 }
270             }
271 
272             //LogLog.debug(UDPReceiver.this.getName() + "'s thread is ending.");
273         }
274     }
275 }