001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net.server;
018
019import java.io.BufferedReader;
020import java.io.EOFException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.io.ObjectInputStream;
025import java.io.OptionalDataException;
026import java.net.ServerSocket;
027import java.net.Socket;
028import java.nio.charset.Charset;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032
033import org.apache.logging.log4j.core.config.ConfigurationFactory;
034
035/**
036 * Listens for events over a socket connection.
037 * 
038 * @param <T>
039 *        The kind of input stream read
040 */
041public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
042
043    /**
044     * Thread that processes the events.
045     */
046    private class SocketHandler extends Thread {
047
048        private final T inputStream;
049
050        private volatile boolean shutdown = false;
051
052        public SocketHandler(final Socket socket) throws IOException {
053            this.inputStream = logEventInput.wrapStream(socket.getInputStream());
054        }
055
056        @Override
057        public void run() {
058            boolean closed = false;
059            try {
060                try {
061                    while (!shutdown) {
062                        logEventInput.logEvents(inputStream, TcpSocketServer.this);
063                    }
064                } catch (final EOFException e) {
065                    closed = true;
066                } catch (final OptionalDataException e) {
067                    logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
068                } catch (final IOException e) {
069                    logger.error("IOException encountered while reading from socket", e);
070                }
071                if (!closed) {
072                    try {
073                        inputStream.close();
074                    } catch (final Exception ex) {
075                        // Ignore the exception;
076                    }
077                }
078            } finally {
079                handlers.remove(Long.valueOf(getId()));
080            }
081        }
082
083        public void shutdown() {
084            this.shutdown = true;
085            interrupt();
086        }
087    }
088
089    /**
090     * Creates a socket server that reads JSON log events.
091     * 
092     * @param port
093     *        the port to listen
094     * @return a new a socket server
095     * @throws IOException
096     *         if an I/O error occurs when opening the socket.
097     */
098    public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
099        return new TcpSocketServer<InputStream>(port, new JsonInputStreamLogEventBridge());
100    }
101
102    /**
103     * Creates a socket server that reads serialized log events.
104     * 
105     * @param port
106     *        the port to listen
107     * @return a new a socket server
108     * @throws IOException
109     *         if an I/O error occurs when opening the socket.
110     */
111    public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
112        return new TcpSocketServer<ObjectInputStream>(port, new ObjectInputStreamLogEventBridge());
113    }
114
115    /**
116     * Creates a socket server that reads XML log events.
117     * 
118     * @param port
119     *        the port to listen
120     * @return a new a socket server
121     * @throws IOException
122     *         if an I/O error occurs when opening the socket.
123     */
124    public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
125        return new TcpSocketServer<InputStream>(port, new XmlInputStreamLogEventBridge());
126    }
127
128    /**
129     * Main startup for the server.
130     * 
131     * @param args
132     *        The command line arguments.
133     * @throws Exception
134     *         if an error occurs.
135     */
136    public static void main(final String[] args) throws Exception {
137        if (args.length < 1 || args.length > 2) {
138            System.err.println("Incorrect number of arguments");
139            printUsage();
140            return;
141        }
142        final int port = Integer.parseInt(args[0]);
143        if (port <= 0 || port >= MAX_PORT) {
144            System.err.println("Invalid port number");
145            printUsage();
146            return;
147        }
148        if (args.length == 2 && args[1].length() > 0) {
149            ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1]));
150        }
151        final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port);
152        final Thread serverThread = new Thread(socketServer);
153        serverThread.start();
154        final Charset enc = Charset.defaultCharset();
155        final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc));
156        while (true) {
157            final String line = reader.readLine();
158            if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
159                    || line.equalsIgnoreCase("Exit")) {
160                socketServer.shutdown();
161                serverThread.join();
162                break;
163            }
164        }
165    }
166
167    private static void printUsage() {
168        System.out.println("Usage: ServerSocket port configFilePath");
169    }
170
171    private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<Long, SocketHandler>();
172
173    private final ServerSocket serverSocket;
174
175    /**
176     * Constructor.
177     * 
178     * @param port
179     *        to listen.
180     * @param logEventInput
181     *        the log even input
182     * @throws IOException
183     *         if an I/O error occurs when opening the socket.
184     */
185    public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
186        this(port, logEventInput, new ServerSocket(port));
187    }
188
189    /**
190     * Constructor.
191     * 
192     * @param port
193     *        to listen.
194     * @param logEventInput
195     *        the log even input
196     * @param serverSocket
197     *        the socket server
198     * @throws IOException
199     *         if an I/O error occurs when opening the socket.
200     */
201    public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket)
202            throws IOException {
203        super(port, logEventInput);
204        this.serverSocket = serverSocket;
205    }
206
207    /**
208     * Accept incoming events and processes them.
209     */
210    @Override
211    public void run() {
212        while (isActive()) {
213            if (serverSocket.isClosed()) {
214                return;
215            }
216            try {
217                // Accept incoming connections.
218                final Socket clientSocket = serverSocket.accept();
219                clientSocket.setSoLinger(true, 0);
220
221                // accept() will block until a client connects to the server.
222                // If execution reaches this point, then it means that a client
223                // socket has been accepted.
224
225                final SocketHandler handler = new SocketHandler(clientSocket);
226                handlers.put(Long.valueOf(handler.getId()), handler);
227                handler.start();
228            } catch (final IOException ioe) {
229                if (serverSocket.isClosed()) {
230                    // OK we're done.
231                    return;
232                }
233                System.out.println("Exception encountered on accept. Ignoring. Stack Trace :");
234                ioe.printStackTrace();
235            }
236        }
237        for (final Map.Entry<Long, SocketHandler> entry : handlers.entrySet()) {
238            final SocketHandler handler = entry.getValue();
239            handler.shutdown();
240            try {
241                handler.join();
242            } catch (final InterruptedException ie) {
243                // Ignore the exception
244            }
245        }
246    }
247
248    /**
249     * Shutdown the server.
250     * 
251     * @throws IOException
252     */
253    public void shutdown() throws IOException {
254        setActive(false);
255        Thread.currentThread().interrupt();
256        serverSocket.close();
257    }
258}