001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.net.server;
018    
019    import java.io.BufferedReader;
020    import java.io.EOFException;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.InputStreamReader;
024    import java.io.ObjectInputStream;
025    import java.io.OptionalDataException;
026    import java.net.ServerSocket;
027    import java.net.Socket;
028    import java.nio.charset.Charset;
029    import java.util.Map;
030    import java.util.concurrent.ConcurrentHashMap;
031    import java.util.concurrent.ConcurrentMap;
032    
033    import org.apache.logging.log4j.core.config.ConfigurationFactory;
034    
035    /**
036     * Listens for events over a socket connection.
037     * 
038     * @param <T>
039     *        The kind of input stream read
040     */
041    public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
042    
043        /**
044         * Thread that processes the events.
045         */
046        private class SocketHandler extends Thread {
047    
048            private final T inputStream;
049    
050            private volatile boolean shutdown = false;
051    
052            public SocketHandler(final Socket socket) throws IOException {
053                this.inputStream = logEventInput.wrapStream(socket.getInputStream());
054            }
055    
056            @Override
057            public void run() {
058                boolean closed = false;
059                try {
060                    try {
061                        while (!shutdown) {
062                            logEventInput.logEvents(inputStream, TcpSocketServer.this);
063                        }
064                    } catch (final EOFException e) {
065                        closed = true;
066                    } catch (final OptionalDataException e) {
067                        logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
068                    } catch (final IOException e) {
069                        logger.error("IOException encountered while reading from socket", e);
070                    }
071                    if (!closed) {
072                        try {
073                            inputStream.close();
074                        } catch (final Exception ex) {
075                            // Ignore the exception;
076                        }
077                    }
078                } finally {
079                    handlers.remove(Long.valueOf(getId()));
080                }
081            }
082    
083            public void shutdown() {
084                this.shutdown = true;
085                interrupt();
086            }
087        }
088    
089        /**
090         * Creates a socket server that reads JSON log events.
091         * 
092         * @param port
093         *        the port to listen
094         * @return a new a socket server
095         * @throws IOException
096         *         if an I/O error occurs when opening the socket.
097         */
098        public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
099            return new TcpSocketServer<InputStream>(port, new JsonInputStreamLogEventBridge());
100        }
101    
102        /**
103         * Creates a socket server that reads serialized log events.
104         * 
105         * @param port
106         *        the port to listen
107         * @return a new a socket server
108         * @throws IOException
109         *         if an I/O error occurs when opening the socket.
110         */
111        public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
112            return new TcpSocketServer<ObjectInputStream>(port, new ObjectInputStreamLogEventBridge());
113        }
114    
115        /**
116         * Creates a socket server that reads XML log events.
117         * 
118         * @param port
119         *        the port to listen
120         * @return a new a socket server
121         * @throws IOException
122         *         if an I/O error occurs when opening the socket.
123         */
124        public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
125            return new TcpSocketServer<InputStream>(port, new XmlInputStreamLogEventBridge());
126        }
127    
128        /**
129         * Main startup for the server.
130         * 
131         * @param args
132         *        The command line arguments.
133         * @throws Exception
134         *         if an error occurs.
135         */
136        public static void main(final String[] args) throws Exception {
137            if (args.length < 1 || args.length > 2) {
138                System.err.println("Incorrect number of arguments");
139                printUsage();
140                return;
141            }
142            final int port = Integer.parseInt(args[0]);
143            if (port <= 0 || port >= MAX_PORT) {
144                System.err.println("Invalid port number");
145                printUsage();
146                return;
147            }
148            if (args.length == 2 && args[1].length() > 0) {
149                ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1]));
150            }
151            final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port);
152            final Thread serverThread = new Thread(socketServer);
153            serverThread.start();
154            final Charset enc = Charset.defaultCharset();
155            final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc));
156            while (true) {
157                final String line = reader.readLine();
158                if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
159                        || line.equalsIgnoreCase("Exit")) {
160                    socketServer.shutdown();
161                    serverThread.join();
162                    break;
163                }
164            }
165        }
166    
167        private static void printUsage() {
168            System.out.println("Usage: ServerSocket port configFilePath");
169        }
170    
171        private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<Long, SocketHandler>();
172    
173        private final ServerSocket serverSocket;
174    
175        /**
176         * Constructor.
177         * 
178         * @param port
179         *        to listen.
180         * @param logEventInput
181         *        the log even input
182         * @throws IOException
183         *         if an I/O error occurs when opening the socket.
184         */
185        public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
186            this(port, logEventInput, new ServerSocket(port));
187        }
188    
189        /**
190         * Constructor.
191         * 
192         * @param port
193         *        to listen.
194         * @param logEventInput
195         *        the log even input
196         * @param serverSocket
197         *        the socket server
198         * @throws IOException
199         *         if an I/O error occurs when opening the socket.
200         */
201        public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket)
202                throws IOException {
203            super(port, logEventInput);
204            this.serverSocket = serverSocket;
205        }
206    
207        /**
208         * Accept incoming events and processes them.
209         */
210        @Override
211        public void run() {
212            while (isActive()) {
213                if (serverSocket.isClosed()) {
214                    return;
215                }
216                try {
217                    // Accept incoming connections.
218                    final Socket clientSocket = serverSocket.accept();
219                    clientSocket.setSoLinger(true, 0);
220    
221                    // accept() will block until a client connects to the server.
222                    // If execution reaches this point, then it means that a client
223                    // socket has been accepted.
224    
225                    final SocketHandler handler = new SocketHandler(clientSocket);
226                    handlers.put(Long.valueOf(handler.getId()), handler);
227                    handler.start();
228                } catch (final IOException ioe) {
229                    if (serverSocket.isClosed()) {
230                        // OK we're done.
231                        return;
232                    }
233                    System.out.println("Exception encountered on accept. Ignoring. Stack Trace :");
234                    ioe.printStackTrace();
235                }
236            }
237            for (final Map.Entry<Long, SocketHandler> entry : handlers.entrySet()) {
238                final SocketHandler handler = entry.getValue();
239                handler.shutdown();
240                try {
241                    handler.join();
242                } catch (final InterruptedException ie) {
243                    // Ignore the exception
244                }
245            }
246        }
247    
248        /**
249         * Shutdown the server.
250         * 
251         * @throws IOException
252         */
253        public void shutdown() throws IOException {
254            setActive(false);
255            Thread.currentThread().interrupt();
256            serverSocket.close();
257        }
258    }