001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net;
018
019import java.io.ByteArrayOutputStream;
020import java.io.IOException;
021import java.io.OutputStream;
022import java.io.Serializable;
023import java.net.ConnectException;
024import java.net.InetAddress;
025import java.net.InetSocketAddress;
026import java.net.Socket;
027import java.net.UnknownHostException;
028import java.util.HashMap;
029import java.util.Map;
030import java.util.concurrent.CountDownLatch;
031
032import org.apache.logging.log4j.core.Layout;
033import org.apache.logging.log4j.core.appender.AppenderLoggingException;
034import org.apache.logging.log4j.core.appender.ManagerFactory;
035import org.apache.logging.log4j.core.appender.OutputStreamManager;
036import org.apache.logging.log4j.util.Strings;
037
038/**
039 * Manager of TCP Socket connections.
040 */
041public class TcpSocketManager extends AbstractSocketManager {
042    /**
043      The default reconnection delay (30000 milliseconds or 30 seconds).
044     */
045    public static final int DEFAULT_RECONNECTION_DELAY_MILLIS   = 30000;
046    /**
047      The default port number of remote logging server (4560).
048     */
049    private static final int DEFAULT_PORT = 4560;
050
051    private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
052
053    private final int reconnectionDelay;
054
055    private Reconnector connector;
056
057    private Socket socket;
058
059    private final boolean retry;
060
061    private final boolean immediateFail;
062    
063    private final int connectTimeoutMillis;
064
065    /**
066     * The Constructor.
067     * @param name The unique name of this connection.
068     * @param os The OutputStream.
069     * @param sock The Socket.
070     * @param inetAddress The Internet address of the host.
071     * @param host The name of the host.
072     * @param port The port number on the host.
073     * @param connectTimeoutMillis the connect timeout in milliseconds.
074     * @param delay Reconnection interval.
075     * @param immediateFail
076     * @param layout The Layout.
077     */
078    public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
079                            final String host, final int port, int connectTimeoutMillis, final int delay,
080                            final boolean immediateFail, final Layout<? extends Serializable> layout) {
081        super(name, os, inetAddress, host, port, layout);
082        this.connectTimeoutMillis = connectTimeoutMillis;
083        this.reconnectionDelay = delay;
084        this.socket = sock;
085        this.immediateFail = immediateFail;
086        retry = delay > 0;
087        if (sock == null) {
088            connector = new Reconnector(this);
089            connector.setDaemon(true);
090            connector.setPriority(Thread.MIN_PRIORITY);
091            connector.start();
092        }
093    }
094
095    /**
096     * Obtain a TcpSocketManager.
097     * @param host The host to connect to.
098     * @param port The port on the host.
099     * @param connectTimeoutMillis the connect timeout in milliseconds
100     * @param delayMillis The interval to pause between retries.
101     * @return A TcpSocketManager.
102     */
103    public static TcpSocketManager getSocketManager(final String host, int port, int connectTimeoutMillis,
104            int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) {
105        if (Strings.isEmpty(host)) {
106            throw new IllegalArgumentException("A host name is required");
107        }
108        if (port <= 0) {
109            port = DEFAULT_PORT;
110        }
111        if (delayMillis == 0) {
112            delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
113        }
114        return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
115                connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY);
116    }
117
118    @Override
119    protected void write(final byte[] bytes, final int offset, final int length)  {
120        if (socket == null) {
121            if (connector != null && !immediateFail) {
122                connector.latch();
123            }
124            if (socket == null) {
125                final String msg = "Error writing to " + getName() + " socket not available";
126                throw new AppenderLoggingException(msg);
127            }
128        }
129        synchronized (this) {
130            try {
131                getOutputStream().write(bytes, offset, length);
132            } catch (final IOException ex) {
133                if (retry && connector == null) {
134                    connector = new Reconnector(this);
135                    connector.setDaemon(true);
136                    connector.setPriority(Thread.MIN_PRIORITY);
137                    connector.start();
138                }
139                final String msg = "Error writing to " + getName();
140                throw new AppenderLoggingException(msg, ex);
141            }
142        }
143    }
144
145    @Override
146    protected synchronized void close() {
147        super.close();
148        if (connector != null) {
149            connector.shutdown();
150            connector.interrupt();
151            connector = null;
152        }
153    }
154
155    public int getConnectTimeoutMillis() {
156        return connectTimeoutMillis;
157    }
158
159    /**
160     * Gets this TcpSocketManager's content format. Specified by:
161     * <ul>
162     * <li>Key: "protocol" Value: "tcp"</li>
163     * <li>Key: "direction" Value: "out"</li>
164     * </ul>
165     * 
166     * @return Map of content format keys supporting TcpSocketManager
167     */
168    @Override
169    public Map<String, String> getContentFormat() {
170        final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
171        result.put("protocol", "tcp");
172        result.put("direction", "out");
173        return result;
174    }
175
176    /**
177     * Handles reconnecting to a Thread.
178     */
179    private class Reconnector extends Thread {
180
181        private final CountDownLatch latch = new CountDownLatch(1);
182
183        private boolean shutdown = false;
184
185        private final Object owner;
186
187        public Reconnector(final OutputStreamManager owner) {
188            this.owner = owner;
189        }
190
191        public void latch()  {
192            try {
193                latch.await();
194            } catch (final InterruptedException ex) {
195                // Ignore the exception.
196            }
197        }
198
199        public void shutdown() {
200            shutdown = true;
201        }
202
203        @Override
204        public void run() {
205            while (!shutdown) {
206                try {
207                    sleep(reconnectionDelay);
208                    final Socket sock = createSocket(inetAddress, port);
209                    final OutputStream newOS = sock.getOutputStream();
210                    synchronized (owner) {
211                        try {
212                            getOutputStream().close();
213                        } catch (final IOException ioe) {
214                            // Ignore this.
215                        }
216
217                        setOutputStream(newOS);
218                        socket = sock;
219                        connector = null;
220                        shutdown = true;
221                    }
222                    LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
223                } catch (final InterruptedException ie) {
224                    LOGGER.debug("Reconnection interrupted.");
225                } catch (final ConnectException ex) {
226                    LOGGER.debug(host + ':' + port + " refused connection");
227                } catch (final IOException ioe) {
228                    LOGGER.debug("Unable to reconnect to " + host + ':' + port);
229                } finally {
230                    latch.countDown();
231                }
232            }
233        }
234    }
235
236    protected Socket createSocket(final InetAddress host, final int port) throws IOException {
237        return createSocket(host.getHostName(), port);
238    }
239
240    protected Socket createSocket(final String host, final int port) throws IOException {
241        final InetSocketAddress address = new InetSocketAddress(host, port);
242        final Socket newSocket = new Socket();
243        newSocket.connect(address, connectTimeoutMillis);
244        return newSocket;
245    }
246
247    /**
248     * Data for the factory.
249     */
250    private static class FactoryData {
251        private final String host;
252        private final int port;
253        private final int connectTimeoutMillis;
254        private final int delayMillis;
255        private final boolean immediateFail;
256        private final Layout<? extends Serializable> layout;
257
258        public FactoryData(final String host, final int port, int connectTimeoutMillis, final int delayMillis,
259                           final boolean immediateFail, final Layout<? extends Serializable> layout) {
260            this.host = host;
261            this.port = port;
262            this.connectTimeoutMillis = connectTimeoutMillis;
263            this.delayMillis = delayMillis;
264            this.immediateFail = immediateFail;
265            this.layout = layout;
266        }
267    }
268
269    /**
270     * Factory to create a TcpSocketManager.
271     */
272    protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
273        @Override
274        public TcpSocketManager createManager(final String name, final FactoryData data) {
275
276            InetAddress inetAddress;
277            OutputStream os;
278            try {
279                inetAddress = InetAddress.getByName(data.host);
280            } catch (final UnknownHostException ex) {
281                LOGGER.error("Could not find address of " + data.host, ex);
282                return null;
283            }
284            try {
285                final Socket socket = new Socket(data.host, data.port);
286                os = socket.getOutputStream();
287                return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
288                        data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout);
289            } catch (final IOException ex) {
290                LOGGER.error("TcpSocketManager (" + name + ") " + ex);
291                os = new ByteArrayOutputStream();
292            }
293            if (data.delayMillis == 0) {
294                return null;
295            }
296            return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
297                    data.delayMillis, data.immediateFail, data.layout);
298        }
299    }
300
301}