001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.io.Serializable;
022import java.net.ConnectException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.Socket;
026import java.net.UnknownHostException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.concurrent.CountDownLatch;
032
033import org.apache.logging.log4j.core.Layout;
034import org.apache.logging.log4j.core.appender.AppenderLoggingException;
035import org.apache.logging.log4j.core.appender.ManagerFactory;
036import org.apache.logging.log4j.core.appender.OutputStreamManager;
037import org.apache.logging.log4j.core.util.Closer;
038import org.apache.logging.log4j.core.util.Log4jThread;
039import org.apache.logging.log4j.core.util.NullOutputStream;
040import org.apache.logging.log4j.util.Strings;
041
042/**
043 * Manager of TCP Socket connections.
044 */
045public class TcpSocketManager extends AbstractSocketManager {
046    /**
047     * The default reconnection delay (30000 milliseconds or 30 seconds).
048     */
049    public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
050    /**
051     * The default port number of remote logging server (4560).
052     */
053    private static final int DEFAULT_PORT = 4560;
054
055    private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
056
057    private final int reconnectionDelayMillis;
058
059    private Reconnector reconnector;
060
061    private Socket socket;
062
063    private final SocketOptions socketOptions;
064
065    private final boolean retry;
066
067    private final boolean immediateFail;
068
069    private final int connectTimeoutMillis;
070
071    /**
072     * Constructs.
073     *
074     * @param name
075     *            The unique name of this connection.
076     * @param os
077     *            The OutputStream.
078     * @param socket
079     *            The Socket.
080     * @param inetAddress
081     *            The Internet address of the host.
082     * @param host
083     *            The name of the host.
084     * @param port
085     *            The port number on the host.
086     * @param connectTimeoutMillis
087     *            the connect timeout in milliseconds.
088     * @param reconnectionDelayMillis
089     *            Reconnection interval.
090     * @param immediateFail
091     *            True if the write should fail if no socket is immediately available.
092     * @param layout
093     *            The Layout.
094     * @param bufferSize
095     *            The buffer size.
096     * @deprecated Use
097     *             {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}.
098     */
099    @Deprecated
100    public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
101            final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
102            final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
103            final int bufferSize) {
104        this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
105                layout, bufferSize, null);
106    }
107
108    /**
109     * Constructs.
110     *
111     * @param name
112     *            The unique name of this connection.
113     * @param os
114     *            The OutputStream.
115     * @param socket
116     *            The Socket.
117     * @param inetAddress
118     *            The Internet address of the host.
119     * @param host
120     *            The name of the host.
121     * @param port
122     *            The port number on the host.
123     * @param connectTimeoutMillis
124     *            the connect timeout in milliseconds.
125     * @param reconnectionDelayMillis
126     *            Reconnection interval.
127     * @param immediateFail
128     *            True if the write should fail if no socket is immediately available.
129     * @param layout
130     *            The Layout.
131     * @param bufferSize
132     *            The buffer size.
133     */
134    public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
135            final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
136            final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
137            final int bufferSize, final SocketOptions socketOptions) {
138        super(name, os, inetAddress, host, port, layout, true, bufferSize);
139        this.connectTimeoutMillis = connectTimeoutMillis;
140        this.reconnectionDelayMillis = reconnectionDelayMillis;
141        this.socket = socket;
142        this.immediateFail = immediateFail;
143        this.retry = reconnectionDelayMillis > 0;
144        if (socket == null) {
145            this.reconnector = createReconnector();
146            this.reconnector.start();
147        }
148        this.socketOptions = socketOptions;
149    }
150
151    /**
152     * Obtains a TcpSocketManager.
153     *
154     * @param host
155     *            The host to connect to.
156     * @param port
157     *            The port on the host.
158     * @param connectTimeoutMillis
159     *            the connect timeout in milliseconds
160     * @param reconnectDelayMillis
161     *            The interval to pause between retries.
162     * @param bufferSize
163     *            The buffer size.
164     * @return A TcpSocketManager.
165     * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}.
166     */
167    @Deprecated
168    public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
169            final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
170            final int bufferSize) {
171        return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
172                bufferSize, null);
173    }
174
175    /**
176     * Obtains a TcpSocketManager.
177     *
178     * @param host
179     *            The host to connect to.
180     * @param port
181     *            The port on the host.
182     * @param connectTimeoutMillis
183     *            the connect timeout in milliseconds
184     * @param reconnectDelayMillis
185     *            The interval to pause between retries.
186     * @param bufferSize
187     *            The buffer size.
188     * @return A TcpSocketManager.
189     */
190    public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
191            int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
192            final int bufferSize, final SocketOptions socketOptions) {
193        if (Strings.isEmpty(host)) {
194            throw new IllegalArgumentException("A host name is required");
195        }
196        if (port <= 0) {
197            port = DEFAULT_PORT;
198        }
199        if (reconnectDelayMillis == 0) {
200            reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
201        }
202        return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
203                connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
204    }
205
206    @SuppressWarnings("sync-override") // synchronization on "this" is done within the method
207    @Override
208    protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
209        if (socket == null) {
210            if (reconnector != null && !immediateFail) {
211                reconnector.latch();
212            }
213            if (socket == null) {
214                throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
215            }
216        }
217        synchronized (this) {
218            try {
219                writeAndFlush(bytes, offset, length, immediateFlush);
220            } catch (final IOException causeEx) {
221                if (retry && reconnector == null) {
222                    final String config = inetAddress + ":" + port;
223                    reconnector = createReconnector();
224                    try {
225                        reconnector.reconnect();
226                    } catch (final IOException reconnEx) {
227                        LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
228                                config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
229                        reconnector.start();
230                        throw new AppenderLoggingException(
231                                String.format("Error sending to %s for %s", getName(), config), causeEx);
232                    }
233                    try {
234                        writeAndFlush(bytes, offset, length, immediateFlush);
235                    } catch (final IOException e) {
236                        throw new AppenderLoggingException(
237                                String.format("Error writing to %s after reestablishing connection for %s", getName(),
238                                        config),
239                                causeEx);
240                    }
241                }
242            }
243        }
244    }
245
246    private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
247            throws IOException {
248        @SuppressWarnings("resource") // outputStream is managed by this class
249        final OutputStream outputStream = getOutputStream();
250        outputStream.write(bytes, offset, length);
251        if (immediateFlush) {
252            outputStream.flush();
253        }
254    }
255
256    @Override
257    protected synchronized boolean closeOutputStream() {
258        final boolean closed = super.closeOutputStream();
259        if (reconnector != null) {
260            reconnector.shutdown();
261            reconnector.interrupt();
262            reconnector = null;
263        }
264        final Socket oldSocket = socket;
265        socket = null;
266        if (oldSocket != null) {
267            try {
268                oldSocket.close();
269            } catch (final IOException e) {
270                LOGGER.error("Could not close socket {}", socket);
271                return false;
272            }
273        }
274        return closed;
275    }
276
277    public int getConnectTimeoutMillis() {
278        return connectTimeoutMillis;
279    }
280
281    /**
282     * Gets this TcpSocketManager's content format. Specified by:
283     * <ul>
284     * <li>Key: "protocol" Value: "tcp"</li>
285     * <li>Key: "direction" Value: "out"</li>
286     * </ul>
287     *
288     * @return Map of content format keys supporting TcpSocketManager
289     */
290    @Override
291    public Map<String, String> getContentFormat() {
292        final Map<String, String> result = new HashMap<>(super.getContentFormat());
293        result.put("protocol", "tcp");
294        result.put("direction", "out");
295        return result;
296    }
297
298    /**
299     * Handles reconnecting to a Socket on a Thread.
300     */
301    private class Reconnector extends Log4jThread {
302
303        private final CountDownLatch latch = new CountDownLatch(1);
304
305        private boolean shutdown = false;
306
307        private final Object owner;
308
309        public Reconnector(final OutputStreamManager owner) {
310            super("TcpSocketManager-Reconnector");
311            this.owner = owner;
312        }
313
314        public void latch() {
315            try {
316                latch.await();
317            } catch (final InterruptedException ex) {
318                // Ignore the exception.
319            }
320        }
321
322        public void shutdown() {
323            shutdown = true;
324        }
325
326        @Override
327        public void run() {
328            while (!shutdown) {
329                try {
330                    sleep(reconnectionDelayMillis);
331                    reconnect();
332                } catch (final InterruptedException ie) {
333                    LOGGER.debug("Reconnection interrupted.");
334                } catch (final ConnectException ex) {
335                    LOGGER.debug("{}:{} refused connection", host, port);
336                } catch (final IOException ioe) {
337                    LOGGER.debug("Unable to reconnect to {}:{}", host, port);
338                } finally {
339                    latch.countDown();
340                }
341            }
342        }
343
344        void reconnect() throws IOException {
345            List<InetSocketAddress> socketAddresses = FACTORY.resolver.resolveHost(host, port);
346            if (socketAddresses.size() == 1) {
347                LOGGER.debug("Reconnecting " + socketAddresses.get(0));
348                connect(socketAddresses.get(0));
349            } else {
350                IOException ioe = null;
351                for (InetSocketAddress socketAddress : socketAddresses) {
352                    try {
353                        LOGGER.debug("Reconnecting " + socketAddress);
354                        connect(socketAddress);
355                        return;
356                    } catch (IOException ex) {
357                        ioe = ex;
358                    }
359                }
360                throw ioe;
361            }
362        }
363
364        private void connect(InetSocketAddress socketAddress) throws IOException {
365            final Socket sock = createSocket(socketAddress);
366            @SuppressWarnings("resource") // newOS is managed by the enclosing Manager.
367            final OutputStream newOS = sock.getOutputStream();
368            InetAddress prev = socket != null ? socket.getInetAddress() : null;
369            synchronized (owner) {
370                Closer.closeSilently(getOutputStream());
371                setOutputStream(newOS);
372                socket = sock;
373                reconnector = null;
374                shutdown = true;
375            }
376            String type = prev != null && prev.getHostAddress().equals(socketAddress.getAddress().getHostAddress()) ?
377                    "reestablished" : "established";
378            LOGGER.debug("Connection to {}:{} {}: {}", host, port, type, socket);
379        }
380
381        @Override
382        public String toString() {
383            return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + "]";
384        }
385    }
386
387    private Reconnector createReconnector() {
388        final Reconnector recon = new Reconnector(this);
389        recon.setDaemon(true);
390        recon.setPriority(Thread.MIN_PRIORITY);
391        return recon;
392    }
393
394    protected Socket createSocket(final InetSocketAddress socketAddress) throws IOException {
395        return createSocket(socketAddress, socketOptions, connectTimeoutMillis);
396    }
397
398    protected static Socket createSocket(final InetSocketAddress socketAddress, final SocketOptions socketOptions,
399            final int connectTimeoutMillis) throws IOException {
400        LOGGER.debug("Creating socket {}", socketAddress.toString());
401        final Socket newSocket = new Socket();
402        if (socketOptions != null) {
403            // Not sure which options must be applied before or after the connect() call.
404            socketOptions.apply(newSocket);
405        }
406        newSocket.connect(socketAddress, connectTimeoutMillis);
407        if (socketOptions != null) {
408            // Not sure which options must be applied before or after the connect() call.
409            socketOptions.apply(newSocket);
410        }
411        return newSocket;
412    }
413
414    /**
415     * Data for the factory.
416     */
417    static class FactoryData {
418        protected final String host;
419        protected final int port;
420        protected final int connectTimeoutMillis;
421        protected final int reconnectDelayMillis;
422        protected final boolean immediateFail;
423        protected final Layout<? extends Serializable> layout;
424        protected final int bufferSize;
425        protected final SocketOptions socketOptions;
426
427        public FactoryData(final String host, final int port, final int connectTimeoutMillis,
428                final int reconnectDelayMillis, final boolean immediateFail,
429                final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
430            this.host = host;
431            this.port = port;
432            this.connectTimeoutMillis = connectTimeoutMillis;
433            this.reconnectDelayMillis = reconnectDelayMillis;
434            this.immediateFail = immediateFail;
435            this.layout = layout;
436            this.bufferSize = bufferSize;
437            this.socketOptions = socketOptions;
438        }
439
440        @Override
441        public String toString() {
442            return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
443                    + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
444                    + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
445        }
446    }
447
448    /**
449     * Factory to create a TcpSocketManager.
450     *
451     * @param <M>
452     *            The manager type.
453     * @param <T>
454     *            The factory data type.
455     */
456    protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
457            implements ManagerFactory<M, T> {
458
459        static HostResolver resolver = new HostResolver();
460
461        @SuppressWarnings("resource")
462        @Override
463        public M createManager(final String name, final T data) {
464            InetAddress inetAddress;
465            OutputStream os;
466            try {
467                inetAddress = InetAddress.getByName(data.host);
468            } catch (final UnknownHostException ex) {
469                LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
470                return null;
471            }
472            Socket socket = null;
473            try {
474                // LOG4J2-1042
475                socket = createSocket(data);
476                os = socket.getOutputStream();
477                return createManager(name, os, socket, inetAddress, data);
478            } catch (final IOException ex) {
479                LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
480                os = NullOutputStream.getInstance();
481            }
482            if (data.reconnectDelayMillis == 0) {
483                Closer.closeSilently(socket);
484                return null;
485            }
486            return createManager(name, os, null, inetAddress, data);
487        }
488
489        @SuppressWarnings("unchecked")
490        M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) {
491            return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
492                    data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
493                    data.bufferSize, data.socketOptions);
494        }
495
496        Socket createSocket(final T data) throws IOException {
497            List<InetSocketAddress> socketAddresses = resolver.resolveHost(data.host, data.port);
498            IOException ioe = null;
499            for (InetSocketAddress socketAddress : socketAddresses) {
500                try {
501                    return TcpSocketManager.createSocket(socketAddress, data.socketOptions, data.connectTimeoutMillis);
502                } catch (IOException ex) {
503                    ioe = ex;
504                }
505            }
506            throw new IOException(errorMessage(data, socketAddresses) , ioe);
507        }
508
509        protected String errorMessage(final T data, List<InetSocketAddress> socketAddresses) {
510            StringBuilder sb = new StringBuilder("Unable to create socket for ");
511            sb.append(data.host).append(" at port ").append(data.port);
512            if (socketAddresses.size() == 1) {
513                if (!socketAddresses.get(0).getAddress().getHostAddress().equals(data.host)) {
514                    sb.append(" using ip address ").append(socketAddresses.get(0).getAddress().getHostAddress());
515                    sb.append(" and port ").append(socketAddresses.get(0).getPort());
516                }
517            } else {
518                sb.append(" using ip addresses and ports ");
519                for (int i = 0; i < socketAddresses.size(); ++i) {
520                    if (i > 0) {
521                        sb.append(", ");
522                        sb.append(socketAddresses.get(i).getAddress().getHostAddress());
523                        sb.append(":").append(socketAddresses.get(i).getPort());
524                    }
525                }
526            }
527            return sb.toString();
528        }
529
530    }
531
532    /**
533     * This method is only for unit testing. It is not Thread-safe.
534     * @param resolver the HostResolver.
535     */
536    public static void setHostResolver(HostResolver resolver) {
537        TcpSocketManagerFactory.resolver = resolver;
538    }
539
540    public static class HostResolver {
541
542        public List<InetSocketAddress> resolveHost(String host, int port) throws UnknownHostException {
543            InetAddress[] addresses = InetAddress.getAllByName(host);
544            List<InetSocketAddress> socketAddresses = new ArrayList<>(addresses.length);
545            for (InetAddress address: addresses) {
546                socketAddresses.add(new InetSocketAddress(address, port));
547            }
548            return socketAddresses;
549        }
550    }
551
552    /**
553     * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
554     */
555    public SocketOptions getSocketOptions() {
556        return socketOptions;
557    }
558
559    /**
560     * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
561     */
562    public Socket getSocket() {
563        return socket;
564    }
565
566    public int getReconnectionDelayMillis() {
567        return reconnectionDelayMillis;
568    }
569
570    @Override
571    public String toString() {
572        return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
573                + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
574                + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
575                + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
576                + count + "]";
577    }
578
579}