001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017 package org.apache.logging.log4j.core.net.server; 018 019 import java.io.BufferedReader; 020 import java.io.EOFException; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.InputStreamReader; 024 import java.io.ObjectInputStream; 025 import java.io.OptionalDataException; 026 import java.net.ServerSocket; 027 import java.net.Socket; 028 import java.nio.charset.Charset; 029 import java.util.Map; 030 import java.util.concurrent.ConcurrentHashMap; 031 import java.util.concurrent.ConcurrentMap; 032 033 import org.apache.logging.log4j.core.config.ConfigurationFactory; 034 035 /** 036 * Listens for events over a socket connection. 037 * 038 * @param <T> 039 * The kind of input stream read 040 */ 041 public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> { 042 043 /** 044 * Thread that processes the events. 045 */ 046 private class SocketHandler extends Thread { 047 048 private final T inputStream; 049 050 private volatile boolean shutdown = false; 051 052 public SocketHandler(final Socket socket) throws IOException { 053 this.inputStream = logEventInput.wrapStream(socket.getInputStream()); 054 } 055 056 @Override 057 public void run() { 058 boolean closed = false; 059 try { 060 try { 061 while (!shutdown) { 062 logEventInput.logEvents(inputStream, TcpSocketServer.this); 063 } 064 } catch (final EOFException e) { 065 closed = true; 066 } catch (final OptionalDataException e) { 067 logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e); 068 } catch (final IOException e) { 069 logger.error("IOException encountered while reading from socket", e); 070 } 071 if (!closed) { 072 try { 073 inputStream.close(); 074 } catch (final Exception ex) { 075 // Ignore the exception; 076 } 077 } 078 } finally { 079 handlers.remove(Long.valueOf(getId())); 080 } 081 } 082 083 public void shutdown() { 084 this.shutdown = true; 085 interrupt(); 086 } 087 } 088 089 /** 090 * Creates a socket server that reads JSON log events. 091 * 092 * @param port 093 * the port to listen 094 * @return a new a socket server 095 * @throws IOException 096 * if an I/O error occurs when opening the socket. 097 */ 098 public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException { 099 return new TcpSocketServer<InputStream>(port, new JsonInputStreamLogEventBridge()); 100 } 101 102 /** 103 * Creates a socket server that reads serialized log events. 104 * 105 * @param port 106 * the port to listen 107 * @return a new a socket server 108 * @throws IOException 109 * if an I/O error occurs when opening the socket. 110 */ 111 public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException { 112 return new TcpSocketServer<ObjectInputStream>(port, new ObjectInputStreamLogEventBridge()); 113 } 114 115 /** 116 * Creates a socket server that reads XML log events. 117 * 118 * @param port 119 * the port to listen 120 * @return a new a socket server 121 * @throws IOException 122 * if an I/O error occurs when opening the socket. 123 */ 124 public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException { 125 return new TcpSocketServer<InputStream>(port, new XmlInputStreamLogEventBridge()); 126 } 127 128 /** 129 * Main startup for the server. 130 * 131 * @param args 132 * The command line arguments. 133 * @throws Exception 134 * if an error occurs. 135 */ 136 public static void main(final String[] args) throws Exception { 137 if (args.length < 1 || args.length > 2) { 138 System.err.println("Incorrect number of arguments"); 139 printUsage(); 140 return; 141 } 142 final int port = Integer.parseInt(args[0]); 143 if (port <= 0 || port >= MAX_PORT) { 144 System.err.println("Invalid port number"); 145 printUsage(); 146 return; 147 } 148 if (args.length == 2 && args[1].length() > 0) { 149 ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1])); 150 } 151 final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port); 152 final Thread serverThread = new Thread(socketServer); 153 serverThread.start(); 154 final Charset enc = Charset.defaultCharset(); 155 final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc)); 156 while (true) { 157 final String line = reader.readLine(); 158 if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop") 159 || line.equalsIgnoreCase("Exit")) { 160 socketServer.shutdown(); 161 serverThread.join(); 162 break; 163 } 164 } 165 } 166 167 private static void printUsage() { 168 System.out.println("Usage: ServerSocket port configFilePath"); 169 } 170 171 private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<Long, SocketHandler>(); 172 173 private final ServerSocket serverSocket; 174 175 /** 176 * Constructor. 177 * 178 * @param port 179 * to listen. 180 * @param logEventInput 181 * the log even input 182 * @throws IOException 183 * if an I/O error occurs when opening the socket. 184 */ 185 public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException { 186 this(port, logEventInput, new ServerSocket(port)); 187 } 188 189 /** 190 * Constructor. 191 * 192 * @param port 193 * to listen. 194 * @param logEventInput 195 * the log even input 196 * @param serverSocket 197 * the socket server 198 * @throws IOException 199 * if an I/O error occurs when opening the socket. 200 */ 201 public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket) 202 throws IOException { 203 super(port, logEventInput); 204 this.serverSocket = serverSocket; 205 } 206 207 /** 208 * Accept incoming events and processes them. 209 */ 210 @Override 211 public void run() { 212 while (isActive()) { 213 if (serverSocket.isClosed()) { 214 return; 215 } 216 try { 217 // Accept incoming connections. 218 final Socket clientSocket = serverSocket.accept(); 219 clientSocket.setSoLinger(true, 0); 220 221 // accept() will block until a client connects to the server. 222 // If execution reaches this point, then it means that a client 223 // socket has been accepted. 224 225 final SocketHandler handler = new SocketHandler(clientSocket); 226 handlers.put(Long.valueOf(handler.getId()), handler); 227 handler.start(); 228 } catch (final IOException ioe) { 229 if (serverSocket.isClosed()) { 230 // OK we're done. 231 return; 232 } 233 System.out.println("Exception encountered on accept. Ignoring. Stack Trace :"); 234 ioe.printStackTrace(); 235 } 236 } 237 for (final Map.Entry<Long, SocketHandler> entry : handlers.entrySet()) { 238 final SocketHandler handler = entry.getValue(); 239 handler.shutdown(); 240 try { 241 handler.join(); 242 } catch (final InterruptedException ie) { 243 // Ignore the exception 244 } 245 } 246 } 247 248 /** 249 * Shutdown the server. 250 * 251 * @throws IOException 252 */ 253 public void shutdown() throws IOException { 254 setActive(false); 255 Thread.currentThread().interrupt(); 256 serverSocket.close(); 257 } 258 }