View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net.server;
18  
19  import java.io.BufferedReader;
20  import java.io.ByteArrayInputStream;
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.InputStreamReader;
25  import java.io.ObjectInputStream;
26  import java.io.OptionalDataException;
27  import java.net.DatagramPacket;
28  import java.net.DatagramSocket;
29  
30  import org.apache.logging.log4j.core.config.ConfigurationFactory;
31  
32  /**
33   * Listens for events over a socket connection.
34   * 
35   * @param <T>
36   *            The kind of input stream read
37   */
38  public class UdpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
39  
40      /**
41       * Creates a socket server that reads JSON log events.
42       * 
43       * @param port
44       *            the port to listen
45       * @return a new a socket server
46       * @throws IOException
47       *             if an I/O error occurs when opening the socket.
48       */
49      public static UdpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
50          return new UdpSocketServer<InputStream>(port, new JsonInputStreamLogEventBridge());
51      }
52  
53      /**
54       * Creates a socket server that reads serialized log events.
55       * 
56       * @param port
57       *            the port to listen
58       * @return a new a socket server
59       * @throws IOException
60       *             if an I/O error occurs when opening the socket.
61       */
62      public static UdpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
63          return new UdpSocketServer<ObjectInputStream>(port, new ObjectInputStreamLogEventBridge());
64      }
65  
66      /**
67       * Creates a socket server that reads XML log events.
68       * 
69       * @param port
70       *            the port to listen
71       * @return a new a socket server
72       * @throws IOException
73       *             if an I/O error occurs when opening the socket.
74       */
75      public static UdpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
76          return new UdpSocketServer<InputStream>(port, new XmlInputStreamLogEventBridge());
77      }
78  
79      /**
80       * Main startup for the server.
81       * 
82       * @param args
83       *            The command line arguments.
84       * @throws Exception
85       *             if an error occurs.
86       */
87      public static void main(final String[] args) throws Exception {
88          if (args.length < 1 || args.length > 2) {
89              System.err.println("Incorrect number of arguments");
90              printUsage();
91              return;
92          }
93          final int port = Integer.parseInt(args[0]);
94          if (port <= 0 || port >= MAX_PORT) {
95              System.err.println("Invalid port number");
96              printUsage();
97              return;
98          }
99          if (args.length == 2 && args[1].length() > 0) {
100             ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1]));
101         }
102         final UdpSocketServer<ObjectInputStream> socketServer = UdpSocketServer.createSerializedSocketServer(port);
103         final Thread server = new Thread(socketServer);
104         server.start();
105         final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
106         while (true) {
107             final String line = reader.readLine();
108             if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
109                     || line.equalsIgnoreCase("Exit")) {
110                 socketServer.shutdown();
111                 server.join();
112                 break;
113             }
114         }
115     }
116 
117     private static void printUsage() {
118         System.out.println("Usage: ServerSocket port configFilePath");
119     }
120 
121     private final DatagramSocket datagramSocket;
122 
123     // max size so we only have to deal with one packet
124     private final int maxBufferSize = 1024 * 65 + 1024;
125 
126     /**
127      * Constructor.
128      * 
129      * @param port
130      *            to listen on.
131      * @param logEventInput
132      * @throws IOException
133      *             If an error occurs.
134      */
135     public UdpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
136         super(port, logEventInput);
137         this.datagramSocket = new DatagramSocket(port);
138     }
139 
140     /**
141      * Accept incoming events and processes them.
142      */
143     @Override
144     public void run() {
145         while (isActive()) {
146             if (datagramSocket.isClosed()) {
147                 // OK we're done.
148                 return;
149             }
150             try {
151                 final byte[] buf = new byte[maxBufferSize];
152                 final DatagramPacket packet = new DatagramPacket(buf, buf.length);
153                 datagramSocket.receive(packet);
154                 final ByteArrayInputStream bais = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength());
155                 logEventInput.logEvents(logEventInput.wrapStream(bais), this);
156             } catch (final OptionalDataException e) {
157                 if (datagramSocket.isClosed()) {
158                     // OK we're done.
159                     return;
160                 }
161                 logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
162             } catch (final EOFException e) {
163                 if (datagramSocket.isClosed()) {
164                     // OK we're done.
165                     return;
166                 }
167                 logger.info("EOF encountered");
168             } catch (final IOException e) {
169                 if (datagramSocket.isClosed()) {
170                     // OK we're done.
171                     return;
172                 }
173                 logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e);
174             }
175         }
176     }
177 
178     /**
179      * Shutdown the server.
180      */
181     public void shutdown() {
182         this.setActive(false);
183         Thread.currentThread().interrupt();
184         datagramSocket.close();
185     }
186 }