View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net.server;
18  
19  import java.io.BufferedReader;
20  import java.io.EOFException;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InputStreamReader;
24  import java.io.ObjectInputStream;
25  import java.io.OptionalDataException;
26  import java.net.ServerSocket;
27  import java.net.Socket;
28  import java.nio.charset.Charset;
29  import java.util.Map;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentMap;
32  
33  import org.apache.logging.log4j.core.config.ConfigurationFactory;
34  
35  /**
36   * Listens for events over a socket connection.
37   * 
38   * @param <T>
39   *        The kind of input stream read
40   */
41  public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
42  
43      /**
44       * Thread that processes the events.
45       */
46      private class SocketHandler extends Thread {
47  
48          private final T inputStream;
49  
50          private volatile boolean shutdown = false;
51  
52          public SocketHandler(final Socket socket) throws IOException {
53              this.inputStream = logEventInput.wrapStream(socket.getInputStream());
54          }
55  
56          @Override
57          public void run() {
58              boolean closed = false;
59              try {
60                  try {
61                      while (!shutdown) {
62                          logEventInput.logEvents(inputStream, TcpSocketServer.this);
63                      }
64                  } catch (final EOFException e) {
65                      closed = true;
66                  } catch (final OptionalDataException e) {
67                      logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
68                  } catch (final IOException e) {
69                      logger.error("IOException encountered while reading from socket", e);
70                  }
71                  if (!closed) {
72                      try {
73                          inputStream.close();
74                      } catch (final Exception ex) {
75                          // Ignore the exception;
76                      }
77                  }
78              } finally {
79                  handlers.remove(Long.valueOf(getId()));
80              }
81          }
82  
83          public void shutdown() {
84              this.shutdown = true;
85              interrupt();
86          }
87      }
88  
89      /**
90       * Creates a socket server that reads JSON log events.
91       * 
92       * @param port
93       *        the port to listen
94       * @return a new a socket server
95       * @throws IOException
96       *         if an I/O error occurs when opening the socket.
97       */
98      public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
99          return new TcpSocketServer<InputStream>(port, new JsonInputStreamLogEventBridge());
100     }
101 
102     /**
103      * Creates a socket server that reads serialized log events.
104      * 
105      * @param port
106      *        the port to listen
107      * @return a new a socket server
108      * @throws IOException
109      *         if an I/O error occurs when opening the socket.
110      */
111     public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
112         return new TcpSocketServer<ObjectInputStream>(port, new ObjectInputStreamLogEventBridge());
113     }
114 
115     /**
116      * Creates a socket server that reads XML log events.
117      * 
118      * @param port
119      *        the port to listen
120      * @return a new a socket server
121      * @throws IOException
122      *         if an I/O error occurs when opening the socket.
123      */
124     public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
125         return new TcpSocketServer<InputStream>(port, new XmlInputStreamLogEventBridge());
126     }
127 
128     /**
129      * Main startup for the server.
130      * 
131      * @param args
132      *        The command line arguments.
133      * @throws Exception
134      *         if an error occurs.
135      */
136     public static void main(final String[] args) throws Exception {
137         if (args.length < 1 || args.length > 2) {
138             System.err.println("Incorrect number of arguments");
139             printUsage();
140             return;
141         }
142         final int port = Integer.parseInt(args[0]);
143         if (port <= 0 || port >= MAX_PORT) {
144             System.err.println("Invalid port number");
145             printUsage();
146             return;
147         }
148         if (args.length == 2 && args[1].length() > 0) {
149             ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1]));
150         }
151         final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port);
152         final Thread serverThread = new Thread(socketServer);
153         serverThread.start();
154         final Charset enc = Charset.defaultCharset();
155         final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc));
156         while (true) {
157             final String line = reader.readLine();
158             if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
159                     || line.equalsIgnoreCase("Exit")) {
160                 socketServer.shutdown();
161                 serverThread.join();
162                 break;
163             }
164         }
165     }
166 
167     private static void printUsage() {
168         System.out.println("Usage: ServerSocket port configFilePath");
169     }
170 
171     private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<Long, SocketHandler>();
172 
173     private final ServerSocket serverSocket;
174 
175     /**
176      * Constructor.
177      * 
178      * @param port
179      *        to listen.
180      * @param logEventInput
181      *        the log even input
182      * @throws IOException
183      *         if an I/O error occurs when opening the socket.
184      */
185     public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
186         this(port, logEventInput, new ServerSocket(port));
187     }
188 
189     /**
190      * Constructor.
191      * 
192      * @param port
193      *        to listen.
194      * @param logEventInput
195      *        the log even input
196      * @param serverSocket
197      *        the socket server
198      * @throws IOException
199      *         if an I/O error occurs when opening the socket.
200      */
201     public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket)
202             throws IOException {
203         super(port, logEventInput);
204         this.serverSocket = serverSocket;
205     }
206 
207     /**
208      * Accept incoming events and processes them.
209      */
210     @Override
211     public void run() {
212         while (isActive()) {
213             if (serverSocket.isClosed()) {
214                 return;
215             }
216             try {
217                 // Accept incoming connections.
218                 final Socket clientSocket = serverSocket.accept();
219                 clientSocket.setSoLinger(true, 0);
220 
221                 // accept() will block until a client connects to the server.
222                 // If execution reaches this point, then it means that a client
223                 // socket has been accepted.
224 
225                 final SocketHandler handler = new SocketHandler(clientSocket);
226                 handlers.put(Long.valueOf(handler.getId()), handler);
227                 handler.start();
228             } catch (final IOException ioe) {
229                 if (serverSocket.isClosed()) {
230                     // OK we're done.
231                     return;
232                 }
233                 System.out.println("Exception encountered on accept. Ignoring. Stack Trace :");
234                 ioe.printStackTrace();
235             }
236         }
237         for (final Map.Entry<Long, SocketHandler> entry : handlers.entrySet()) {
238             final SocketHandler handler = entry.getValue();
239             handler.shutdown();
240             try {
241                 handler.join();
242             } catch (final InterruptedException ie) {
243                 // Ignore the exception
244             }
245         }
246     }
247 
248     /**
249      * Shutdown the server.
250      * 
251      * @throws IOException
252      */
253     public void shutdown() throws IOException {
254         setActive(false);
255         Thread.currentThread().interrupt();
256         serverSocket.close();
257     }
258 }