001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.net;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.IOException;
021    import java.io.OutputStream;
022    import java.io.Serializable;
023    import java.net.ConnectException;
024    import java.net.InetAddress;
025    import java.net.InetSocketAddress;
026    import java.net.Socket;
027    import java.net.UnknownHostException;
028    import java.util.HashMap;
029    import java.util.Map;
030    import java.util.concurrent.CountDownLatch;
031    
032    import org.apache.logging.log4j.core.Layout;
033    import org.apache.logging.log4j.core.appender.AppenderLoggingException;
034    import org.apache.logging.log4j.core.appender.ManagerFactory;
035    import org.apache.logging.log4j.core.appender.OutputStreamManager;
036    import org.apache.logging.log4j.util.Strings;
037    
038    /**
039     * Manager of TCP Socket connections.
040     */
041    public class TcpSocketManager extends AbstractSocketManager {
042        /**
043          The default reconnection delay (30000 milliseconds or 30 seconds).
044         */
045        public static final int DEFAULT_RECONNECTION_DELAY_MILLIS   = 30000;
046        /**
047          The default port number of remote logging server (4560).
048         */
049        private static final int DEFAULT_PORT = 4560;
050    
051        private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
052    
053        private final int reconnectionDelay;
054    
055        private Reconnector connector;
056    
057        private Socket socket;
058    
059        private final boolean retry;
060    
061        private final boolean immediateFail;
062        
063        private final int connectTimeoutMillis;
064    
065        /**
066         * The Constructor.
067         * @param name The unique name of this connection.
068         * @param os The OutputStream.
069         * @param sock The Socket.
070         * @param inetAddress The Internet address of the host.
071         * @param host The name of the host.
072         * @param port The port number on the host.
073         * @param connectTimeoutMillis the connect timeout in milliseconds.
074         * @param delay Reconnection interval.
075         * @param immediateFail
076         * @param layout The Layout.
077         */
078        public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
079                                final String host, final int port, int connectTimeoutMillis, final int delay,
080                                final boolean immediateFail, final Layout<? extends Serializable> layout) {
081            super(name, os, inetAddress, host, port, layout);
082            this.connectTimeoutMillis = connectTimeoutMillis;
083            this.reconnectionDelay = delay;
084            this.socket = sock;
085            this.immediateFail = immediateFail;
086            retry = delay > 0;
087            if (sock == null) {
088                connector = new Reconnector(this);
089                connector.setDaemon(true);
090                connector.setPriority(Thread.MIN_PRIORITY);
091                connector.start();
092            }
093        }
094    
095        /**
096         * Obtain a TcpSocketManager.
097         * @param host The host to connect to.
098         * @param port The port on the host.
099         * @param connectTimeoutMillis the connect timeout in milliseconds
100         * @param delayMillis The interval to pause between retries.
101         * @return A TcpSocketManager.
102         */
103        public static TcpSocketManager getSocketManager(final String host, int port, int connectTimeoutMillis,
104                int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) {
105            if (Strings.isEmpty(host)) {
106                throw new IllegalArgumentException("A host name is required");
107            }
108            if (port <= 0) {
109                port = DEFAULT_PORT;
110            }
111            if (delayMillis == 0) {
112                delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
113            }
114            return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
115                    connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY);
116        }
117    
118        @Override
119        protected void write(final byte[] bytes, final int offset, final int length)  {
120            if (socket == null) {
121                if (connector != null && !immediateFail) {
122                    connector.latch();
123                }
124                if (socket == null) {
125                    final String msg = "Error writing to " + getName() + " socket not available";
126                    throw new AppenderLoggingException(msg);
127                }
128            }
129            synchronized (this) {
130                try {
131                    getOutputStream().write(bytes, offset, length);
132                } catch (final IOException ex) {
133                    if (retry && connector == null) {
134                        connector = new Reconnector(this);
135                        connector.setDaemon(true);
136                        connector.setPriority(Thread.MIN_PRIORITY);
137                        connector.start();
138                    }
139                    final String msg = "Error writing to " + getName();
140                    throw new AppenderLoggingException(msg, ex);
141                }
142            }
143        }
144    
145        @Override
146        protected synchronized void close() {
147            super.close();
148            if (connector != null) {
149                connector.shutdown();
150                connector.interrupt();
151                connector = null;
152            }
153        }
154    
155        public int getConnectTimeoutMillis() {
156            return connectTimeoutMillis;
157        }
158    
159        /**
160         * Gets this TcpSocketManager's content format. Specified by:
161         * <ul>
162         * <li>Key: "protocol" Value: "tcp"</li>
163         * <li>Key: "direction" Value: "out"</li>
164         * </ul>
165         * 
166         * @return Map of content format keys supporting TcpSocketManager
167         */
168        @Override
169        public Map<String, String> getContentFormat() {
170            final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
171            result.put("protocol", "tcp");
172            result.put("direction", "out");
173            return result;
174        }
175    
176        /**
177         * Handles reconnecting to a Thread.
178         */
179        private class Reconnector extends Thread {
180    
181            private final CountDownLatch latch = new CountDownLatch(1);
182    
183            private boolean shutdown = false;
184    
185            private final Object owner;
186    
187            public Reconnector(final OutputStreamManager owner) {
188                this.owner = owner;
189            }
190    
191            public void latch()  {
192                try {
193                    latch.await();
194                } catch (final InterruptedException ex) {
195                    // Ignore the exception.
196                }
197            }
198    
199            public void shutdown() {
200                shutdown = true;
201            }
202    
203            @Override
204            public void run() {
205                while (!shutdown) {
206                    try {
207                        sleep(reconnectionDelay);
208                        final Socket sock = createSocket(inetAddress, port);
209                        final OutputStream newOS = sock.getOutputStream();
210                        synchronized (owner) {
211                            try {
212                                getOutputStream().close();
213                            } catch (final IOException ioe) {
214                                // Ignore this.
215                            }
216    
217                            setOutputStream(newOS);
218                            socket = sock;
219                            connector = null;
220                            shutdown = true;
221                        }
222                        LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
223                    } catch (final InterruptedException ie) {
224                        LOGGER.debug("Reconnection interrupted.");
225                    } catch (final ConnectException ex) {
226                        LOGGER.debug(host + ':' + port + " refused connection");
227                    } catch (final IOException ioe) {
228                        LOGGER.debug("Unable to reconnect to " + host + ':' + port);
229                    } finally {
230                        latch.countDown();
231                    }
232                }
233            }
234        }
235    
236        protected Socket createSocket(final InetAddress host, final int port) throws IOException {
237            return createSocket(host.getHostName(), port);
238        }
239    
240        protected Socket createSocket(final String host, final int port) throws IOException {
241            final InetSocketAddress address = new InetSocketAddress(host, port);
242            final Socket newSocket = new Socket();
243            newSocket.connect(address, connectTimeoutMillis);
244            return newSocket;
245        }
246    
247        /**
248         * Data for the factory.
249         */
250        private static class FactoryData {
251            private final String host;
252            private final int port;
253            private final int connectTimeoutMillis;
254            private final int delayMillis;
255            private final boolean immediateFail;
256            private final Layout<? extends Serializable> layout;
257    
258            public FactoryData(final String host, final int port, int connectTimeoutMillis, final int delayMillis,
259                               final boolean immediateFail, final Layout<? extends Serializable> layout) {
260                this.host = host;
261                this.port = port;
262                this.connectTimeoutMillis = connectTimeoutMillis;
263                this.delayMillis = delayMillis;
264                this.immediateFail = immediateFail;
265                this.layout = layout;
266            }
267        }
268    
269        /**
270         * Factory to create a TcpSocketManager.
271         */
272        protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
273            @Override
274            public TcpSocketManager createManager(final String name, final FactoryData data) {
275    
276                InetAddress inetAddress;
277                OutputStream os;
278                try {
279                    inetAddress = InetAddress.getByName(data.host);
280                } catch (final UnknownHostException ex) {
281                    LOGGER.error("Could not find address of " + data.host, ex);
282                    return null;
283                }
284                try {
285                    final Socket socket = new Socket(data.host, data.port);
286                    os = socket.getOutputStream();
287                    return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
288                            data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout);
289                } catch (final IOException ex) {
290                    LOGGER.error("TcpSocketManager (" + name + ") " + ex);
291                    os = new ByteArrayOutputStream();
292                }
293                if (data.delayMillis == 0) {
294                    return null;
295                }
296                return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
297                        data.delayMillis, data.immediateFail, data.layout);
298            }
299        }
300    
301    }