001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache license, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License. You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the license for the specific language governing permissions and
015     * limitations under the license.
016     */
017    package org.apache.logging.log4j.core.net;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.IOException;
021    import java.io.OutputStream;
022    import java.io.Serializable;
023    import java.net.ConnectException;
024    import java.net.InetAddress;
025    import java.net.Socket;
026    import java.net.UnknownHostException;
027    import java.util.HashMap;
028    import java.util.Map;
029    import java.util.concurrent.CountDownLatch;
030    
031    import org.apache.logging.log4j.core.Layout;
032    import org.apache.logging.log4j.core.appender.AppenderLoggingException;
033    import org.apache.logging.log4j.core.appender.ManagerFactory;
034    import org.apache.logging.log4j.core.appender.OutputStreamManager;
035    import org.apache.logging.log4j.util.Strings;
036    
037    /**
038     * Manager of TCP Socket connections.
039     */
040    public class TcpSocketManager extends AbstractSocketManager {
041        /**
042          The default reconnection delay (30000 milliseconds or 30 seconds).
043         */
044        public static final int DEFAULT_RECONNECTION_DELAY_MILLIS   = 30000;
045        /**
046          The default port number of remote logging server (4560).
047         */
048        private static final int DEFAULT_PORT = 4560;
049    
050        private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
051    
052        private final int reconnectionDelay;
053    
054        private Reconnector connector = null;
055    
056        private Socket socket;
057    
058        private final boolean retry;
059    
060        private final boolean immediateFail;
061    
062        /**
063         * The Constructor.
064         * @param name The unique name of this connection.
065         * @param os The OutputStream.
066         * @param sock The Socket.
067         * @param inetAddress The internet address of the host.
068         * @param host The name of the host.
069         * @param port The port number on the host.
070         * @param delay Reconnection interval.
071         * @param immediateFail
072         * @param layout The Layout.
073         */
074        public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
075                                final String host, final int port, final int delay, final boolean immediateFail,
076                                final Layout<? extends Serializable> layout) {
077            super(name, os, inetAddress, host, port, layout);
078            this.reconnectionDelay = delay;
079            this.socket = sock;
080            this.immediateFail = immediateFail;
081            retry = delay > 0;
082            if (sock == null) {
083                connector = new Reconnector(this);
084                connector.setDaemon(true);
085                connector.setPriority(Thread.MIN_PRIORITY);
086                connector.start();
087            }
088        }
089    
090        /**
091         * Obtain a TcpSocketManager.
092         * @param host The host to connect to.
093         * @param port The port on the host.
094         * @param delayMillis The interval to pause between retries.
095         * @return A TcpSocketManager.
096         */
097        public static TcpSocketManager getSocketManager(final String host, int port, int delayMillis,
098                                                        final boolean immediateFail, final Layout<? extends Serializable> layout ) {
099            if (Strings.isEmpty(host)) {
100                throw new IllegalArgumentException("A host name is required");
101            }
102            if (port <= 0) {
103                port = DEFAULT_PORT;
104            }
105            if (delayMillis == 0) {
106                delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
107            }
108            return (TcpSocketManager) getManager("TCP:" + host + ':' + port,
109                new FactoryData(host, port, delayMillis, immediateFail, layout), FACTORY);
110        }
111    
112        @Override
113        protected void write(final byte[] bytes, final int offset, final int length)  {
114            if (socket == null) {
115                if (connector != null && !immediateFail) {
116                    connector.latch();
117                }
118                if (socket == null) {
119                    final String msg = "Error writing to " + getName() + " socket not available";
120                    throw new AppenderLoggingException(msg);
121                }
122            }
123            synchronized (this) {
124                try {
125                    getOutputStream().write(bytes, offset, length);
126                } catch (final IOException ex) {
127                    if (retry && connector == null) {
128                        connector = new Reconnector(this);
129                        connector.setDaemon(true);
130                        connector.setPriority(Thread.MIN_PRIORITY);
131                        connector.start();
132                    }
133                    final String msg = "Error writing to " + getName();
134                    throw new AppenderLoggingException(msg, ex);
135                }
136            }
137        }
138    
139        @Override
140        protected synchronized void close() {
141            super.close();
142            if (connector != null) {
143                connector.shutdown();
144                connector.interrupt();
145                connector = null;
146            }
147        }
148    
149        /**
150         * Gets this TcpSocketManager's content format. Specified by:
151         * <ul>
152         * <li>Key: "protocol" Value: "tcp"</li>
153         * <li>Key: "direction" Value: "out"</li>
154         * </ul>
155         * 
156         * @return Map of content format keys supporting TcpSocketManager
157         */
158        @Override
159        public Map<String, String> getContentFormat() {
160            final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
161            result.put("protocol", "tcp");
162            result.put("direction", "out");
163            return result;
164        }
165    
166        /**
167         * Handles reconnecting to a Thread.
168         */
169        private class Reconnector extends Thread {
170    
171            private final CountDownLatch latch = new CountDownLatch(1);
172    
173            private boolean shutdown = false;
174    
175            private final Object owner;
176    
177            public Reconnector(final OutputStreamManager owner) {
178                this.owner = owner;
179            }
180    
181            public void latch()  {
182                try {
183                    latch.await();
184                } catch (final InterruptedException ex) {
185                    // Ignore the exception.
186                }
187            }
188    
189            public void shutdown() {
190                shutdown = true;
191            }
192    
193            @Override
194            public void run() {
195                while (!shutdown) {
196                    try {
197                        sleep(reconnectionDelay);
198                        final Socket sock = createSocket(inetAddress, port);
199                        final OutputStream newOS = sock.getOutputStream();
200                        synchronized (owner) {
201                            try {
202                                getOutputStream().close();
203                            } catch (final IOException ioe) {
204                                // Ignore this.
205                            }
206    
207                            setOutputStream(newOS);
208                            socket = sock;
209                            connector = null;
210                            shutdown = true;
211                        }
212                        LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
213                    } catch (final InterruptedException ie) {
214                        LOGGER.debug("Reconnection interrupted.");
215                    } catch (final ConnectException ex) {
216                        LOGGER.debug(host + ':' + port + " refused connection");
217                    } catch (final IOException ioe) {
218                        LOGGER.debug("Unable to reconnect to " + host + ':' + port);
219                    } finally {
220                        latch.countDown();
221                    }
222                }
223            }
224        }
225    
226        protected Socket createSocket(final InetAddress host, final int port) throws IOException {
227            return createSocket(host.getHostName(), port);
228        }
229    
230        protected Socket createSocket(final String host, final int port) throws IOException {
231            return new Socket(host, port);
232        }
233    
234        /**
235         * Data for the factory.
236         */
237        private static class FactoryData {
238            private final String host;
239            private final int port;
240            private final int delayMillis;
241            private final boolean immediateFail;
242            private final Layout<? extends Serializable> layout;
243    
244            public FactoryData(final String host, final int port, final int delayMillis, final boolean immediateFail,
245                               final Layout<? extends Serializable> layout) {
246                this.host = host;
247                this.port = port;
248                this.delayMillis = delayMillis;
249                this.immediateFail = immediateFail;
250                this.layout = layout;
251            }
252        }
253    
254        /**
255         * Factory to create a TcpSocketManager.
256         */
257        protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
258            @Override
259            public TcpSocketManager createManager(final String name, final FactoryData data) {
260    
261                InetAddress inetAddress;
262                OutputStream os;
263                try {
264                    inetAddress = InetAddress.getByName(data.host);
265                } catch (final UnknownHostException ex) {
266                    LOGGER.error("Could not find address of " + data.host, ex);
267                    return null;
268                }
269                try {
270                    final Socket socket = new Socket(data.host, data.port);
271                    os = socket.getOutputStream();
272                    return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port, data.delayMillis,
273                        data.immediateFail, data.layout);
274                } catch (final IOException ex) {
275                    LOGGER.error("TcpSocketManager (" + name + ") " + ex);
276                    os = new ByteArrayOutputStream();
277                }
278                if (data.delayMillis == 0) {
279                    return null;
280                }
281                return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.delayMillis, data.immediateFail,
282                    data.layout);
283            }
284        }
285    }