001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.io.Serializable;
022import java.net.ConnectException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.Socket;
026import java.net.UnknownHostException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.concurrent.CountDownLatch;
032
033import org.apache.logging.log4j.core.Layout;
034import org.apache.logging.log4j.core.appender.AppenderLoggingException;
035import org.apache.logging.log4j.core.appender.ManagerFactory;
036import org.apache.logging.log4j.core.appender.OutputStreamManager;
037import org.apache.logging.log4j.core.util.Closer;
038import org.apache.logging.log4j.core.util.Log4jThread;
039import org.apache.logging.log4j.core.util.NullOutputStream;
040import org.apache.logging.log4j.util.Strings;
041
042/**
043 * Manager of TCP Socket connections.
044 */
045public class TcpSocketManager extends AbstractSocketManager {
046    /**
047     * The default reconnection delay (30000 milliseconds or 30 seconds).
048     */
049    public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
050    /**
051     * The default port number of remote logging server (4560).
052     */
053    private static final int DEFAULT_PORT = 4560;
054
055    private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
056
057    private final int reconnectionDelayMillis;
058
059    private Reconnector reconnector;
060
061    private Socket socket;
062
063    private final SocketOptions socketOptions;
064
065    private final boolean retry;
066
067    private final boolean immediateFail;
068
069    private final int connectTimeoutMillis;
070
071    /**
072     * Constructs.
073     *
074     * @param name
075     *            The unique name of this connection.
076     * @param os
077     *            The OutputStream.
078     * @param socket
079     *            The Socket.
080     * @param inetAddress
081     *            The Internet address of the host.
082     * @param host
083     *            The name of the host.
084     * @param port
085     *            The port number on the host.
086     * @param connectTimeoutMillis
087     *            the connect timeout in milliseconds.
088     * @param reconnectionDelayMillis
089     *            Reconnection interval.
090     * @param immediateFail
091     *            True if the write should fail if no socket is immediately available.
092     * @param layout
093     *            The Layout.
094     * @param bufferSize
095     *            The buffer size.
096     * @deprecated Use
097     *             {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}.
098     */
099    @Deprecated
100    public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
101            final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
102            final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
103            final int bufferSize) {
104        this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
105                layout, bufferSize, null);
106    }
107
108    /**
109     * Constructs.
110     *
111     * @param name
112     *            The unique name of this connection.
113     * @param os
114     *            The OutputStream.
115     * @param socket
116     *            The Socket.
117     * @param inetAddress
118     *            The Internet address of the host.
119     * @param host
120     *            The name of the host.
121     * @param port
122     *            The port number on the host.
123     * @param connectTimeoutMillis
124     *            the connect timeout in milliseconds.
125     * @param reconnectionDelayMillis
126     *            Reconnection interval.
127     * @param immediateFail
128     *            True if the write should fail if no socket is immediately available.
129     * @param layout
130     *            The Layout.
131     * @param bufferSize
132     *            The buffer size.
133     */
134    public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
135            final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
136            final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
137            final int bufferSize, final SocketOptions socketOptions) {
138        super(name, os, inetAddress, host, port, layout, true, bufferSize);
139        this.connectTimeoutMillis = connectTimeoutMillis;
140        this.reconnectionDelayMillis = reconnectionDelayMillis;
141        this.socket = socket;
142        this.immediateFail = immediateFail;
143        this.retry = reconnectionDelayMillis > 0;
144        if (socket == null) {
145            this.reconnector = createReconnector();
146            this.reconnector.start();
147        }
148        this.socketOptions = socketOptions;
149    }
150
151    /**
152     * Obtains a TcpSocketManager.
153     *
154     * @param host
155     *            The host to connect to.
156     * @param port
157     *            The port on the host.
158     * @param connectTimeoutMillis
159     *            the connect timeout in milliseconds
160     * @param reconnectDelayMillis
161     *            The interval to pause between retries.
162     * @param bufferSize
163     *            The buffer size.
164     * @return A TcpSocketManager.
165     * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}.
166     */
167    @Deprecated
168    public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
169            final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
170            final int bufferSize) {
171        return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
172                bufferSize, null);
173    }
174
175    /**
176     * Obtains a TcpSocketManager.
177     *
178     * @param host
179     *            The host to connect to.
180     * @param port
181     *            The port on the host.
182     * @param connectTimeoutMillis
183     *            the connect timeout in milliseconds
184     * @param reconnectDelayMillis
185     *            The interval to pause between retries.
186     * @param bufferSize
187     *            The buffer size.
188     * @return A TcpSocketManager.
189     */
190    public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
191            int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
192            final int bufferSize, final SocketOptions socketOptions) {
193        if (Strings.isEmpty(host)) {
194            throw new IllegalArgumentException("A host name is required");
195        }
196        if (port <= 0) {
197            port = DEFAULT_PORT;
198        }
199        if (reconnectDelayMillis == 0) {
200            reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
201        }
202        return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
203                connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
204    }
205
206    @SuppressWarnings("sync-override") // synchronization on "this" is done within the method
207    @Override
208    protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
209        if (socket == null) {
210            if (reconnector != null && !immediateFail) {
211                reconnector.latch();
212            }
213            if (socket == null) {
214                throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
215            }
216        }
217        synchronized (this) {
218            try {
219                writeAndFlush(bytes, offset, length, immediateFlush);
220            } catch (final IOException causeEx) {
221                final String config = inetAddress + ":" + port;
222                if (retry && reconnector == null) {
223                    reconnector = createReconnector();
224                    try {
225                        reconnector.reconnect();
226                    } catch (final IOException reconnEx) {
227                        LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
228                                config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
229                        reconnector.start();
230                        throw new AppenderLoggingException(
231                                String.format("Error sending to %s for %s", getName(), config), causeEx);
232                    }
233                    try {
234                        writeAndFlush(bytes, offset, length, immediateFlush);
235                    } catch (final IOException e) {
236                        throw new AppenderLoggingException(
237                                String.format("Error writing to %s after reestablishing connection for %s", getName(),
238                                        config),
239                                causeEx);
240                    }
241                    return;
242                }
243                final String message = String.format("Error writing to %s for connection %s", getName(), config);
244                throw new AppenderLoggingException(message, causeEx);
245            }
246        }
247    }
248
249    private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
250            throws IOException {
251        @SuppressWarnings("resource") // outputStream is managed by this class
252        final OutputStream outputStream = getOutputStream();
253        outputStream.write(bytes, offset, length);
254        if (immediateFlush) {
255            outputStream.flush();
256        }
257    }
258
259    @Override
260    protected synchronized boolean closeOutputStream() {
261        final boolean closed = super.closeOutputStream();
262        if (reconnector != null) {
263            reconnector.shutdown();
264            reconnector.interrupt();
265            reconnector = null;
266        }
267        final Socket oldSocket = socket;
268        socket = null;
269        if (oldSocket != null) {
270            try {
271                oldSocket.close();
272            } catch (final IOException e) {
273                LOGGER.error("Could not close socket {}", socket);
274                return false;
275            }
276        }
277        return closed;
278    }
279
280    public int getConnectTimeoutMillis() {
281        return connectTimeoutMillis;
282    }
283
284    /**
285     * Gets this TcpSocketManager's content format. Specified by:
286     * <ul>
287     * <li>Key: "protocol" Value: "tcp"</li>
288     * <li>Key: "direction" Value: "out"</li>
289     * </ul>
290     *
291     * @return Map of content format keys supporting TcpSocketManager
292     */
293    @Override
294    public Map<String, String> getContentFormat() {
295        final Map<String, String> result = new HashMap<>(super.getContentFormat());
296        result.put("protocol", "tcp");
297        result.put("direction", "out");
298        return result;
299    }
300
301    /**
302     * Handles reconnecting to a Socket on a Thread.
303     */
304    private class Reconnector extends Log4jThread {
305
306        private final CountDownLatch latch = new CountDownLatch(1);
307
308        private boolean shutdown = false;
309
310        private final Object owner;
311
312        public Reconnector(final OutputStreamManager owner) {
313            super("TcpSocketManager-Reconnector");
314            this.owner = owner;
315        }
316
317        public void latch() {
318            try {
319                latch.await();
320            } catch (final InterruptedException ex) {
321                // Ignore the exception.
322            }
323        }
324
325        public void shutdown() {
326            shutdown = true;
327        }
328
329        @Override
330        public void run() {
331            while (!shutdown) {
332                try {
333                    sleep(reconnectionDelayMillis);
334                    reconnect();
335                } catch (final InterruptedException ie) {
336                    LOGGER.debug("Reconnection interrupted.");
337                } catch (final ConnectException ex) {
338                    LOGGER.debug("{}:{} refused connection", host, port);
339                } catch (final IOException ioe) {
340                    LOGGER.debug("Unable to reconnect to {}:{}", host, port);
341                } finally {
342                    latch.countDown();
343                }
344            }
345        }
346
347        void reconnect() throws IOException {
348            List<InetSocketAddress> socketAddresses = FACTORY.resolver.resolveHost(host, port);
349            if (socketAddresses.size() == 1) {
350                LOGGER.debug("Reconnecting " + socketAddresses.get(0));
351                connect(socketAddresses.get(0));
352            } else {
353                IOException ioe = null;
354                for (InetSocketAddress socketAddress : socketAddresses) {
355                    try {
356                        LOGGER.debug("Reconnecting " + socketAddress);
357                        connect(socketAddress);
358                        return;
359                    } catch (IOException ex) {
360                        ioe = ex;
361                    }
362                }
363                throw ioe;
364            }
365        }
366
367        private void connect(InetSocketAddress socketAddress) throws IOException {
368            final Socket sock = createSocket(socketAddress);
369            @SuppressWarnings("resource") // newOS is managed by the enclosing Manager.
370            final OutputStream newOS = sock.getOutputStream();
371            InetAddress prev = socket != null ? socket.getInetAddress() : null;
372            synchronized (owner) {
373                Closer.closeSilently(getOutputStream());
374                setOutputStream(newOS);
375                socket = sock;
376                reconnector = null;
377                shutdown = true;
378            }
379            String type = prev != null && prev.getHostAddress().equals(socketAddress.getAddress().getHostAddress()) ?
380                    "reestablished" : "established";
381            LOGGER.debug("Connection to {}:{} {}: {}", host, port, type, socket);
382        }
383
384        @Override
385        public String toString() {
386            return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + "]";
387        }
388    }
389
390    private Reconnector createReconnector() {
391        final Reconnector recon = new Reconnector(this);
392        recon.setDaemon(true);
393        recon.setPriority(Thread.MIN_PRIORITY);
394        return recon;
395    }
396
397    protected Socket createSocket(final InetSocketAddress socketAddress) throws IOException {
398        return createSocket(socketAddress, socketOptions, connectTimeoutMillis);
399    }
400
401    protected static Socket createSocket(final InetSocketAddress socketAddress, final SocketOptions socketOptions,
402            final int connectTimeoutMillis) throws IOException {
403        LOGGER.debug("Creating socket {}", socketAddress.toString());
404        final Socket newSocket = new Socket();
405        if (socketOptions != null) {
406            // Not sure which options must be applied before or after the connect() call.
407            socketOptions.apply(newSocket);
408        }
409        newSocket.connect(socketAddress, connectTimeoutMillis);
410        if (socketOptions != null) {
411            // Not sure which options must be applied before or after the connect() call.
412            socketOptions.apply(newSocket);
413        }
414        return newSocket;
415    }
416
417    /**
418     * Data for the factory.
419     */
420    static class FactoryData {
421        protected final String host;
422        protected final int port;
423        protected final int connectTimeoutMillis;
424        protected final int reconnectDelayMillis;
425        protected final boolean immediateFail;
426        protected final Layout<? extends Serializable> layout;
427        protected final int bufferSize;
428        protected final SocketOptions socketOptions;
429
430        public FactoryData(final String host, final int port, final int connectTimeoutMillis,
431                final int reconnectDelayMillis, final boolean immediateFail,
432                final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
433            this.host = host;
434            this.port = port;
435            this.connectTimeoutMillis = connectTimeoutMillis;
436            this.reconnectDelayMillis = reconnectDelayMillis;
437            this.immediateFail = immediateFail;
438            this.layout = layout;
439            this.bufferSize = bufferSize;
440            this.socketOptions = socketOptions;
441        }
442
443        @Override
444        public String toString() {
445            return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
446                    + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
447                    + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
448        }
449    }
450
451    /**
452     * Factory to create a TcpSocketManager.
453     *
454     * @param <M>
455     *            The manager type.
456     * @param <T>
457     *            The factory data type.
458     */
459    protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
460            implements ManagerFactory<M, T> {
461
462        static HostResolver resolver = new HostResolver();
463
464        @SuppressWarnings("resource")
465        @Override
466        public M createManager(final String name, final T data) {
467            InetAddress inetAddress;
468            OutputStream os;
469            try {
470                inetAddress = InetAddress.getByName(data.host);
471            } catch (final UnknownHostException ex) {
472                LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
473                return null;
474            }
475            Socket socket = null;
476            try {
477                // LOG4J2-1042
478                socket = createSocket(data);
479                os = socket.getOutputStream();
480                return createManager(name, os, socket, inetAddress, data);
481            } catch (final IOException ex) {
482                LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex);
483                os = NullOutputStream.getInstance();
484            }
485            if (data.reconnectDelayMillis == 0) {
486                Closer.closeSilently(socket);
487                return null;
488            }
489            return createManager(name, os, null, inetAddress, data);
490        }
491
492        @SuppressWarnings("unchecked")
493        M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) {
494            return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
495                    data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
496                    data.bufferSize, data.socketOptions);
497        }
498
499        Socket createSocket(final T data) throws IOException {
500            List<InetSocketAddress> socketAddresses = resolver.resolveHost(data.host, data.port);
501            IOException ioe = null;
502            for (InetSocketAddress socketAddress : socketAddresses) {
503                try {
504                    return TcpSocketManager.createSocket(socketAddress, data.socketOptions, data.connectTimeoutMillis);
505                } catch (IOException ex) {
506                    ioe = ex;
507                }
508            }
509            throw new IOException(errorMessage(data, socketAddresses) , ioe);
510        }
511
512        protected String errorMessage(final T data, List<InetSocketAddress> socketAddresses) {
513            StringBuilder sb = new StringBuilder("Unable to create socket for ");
514            sb.append(data.host).append(" at port ").append(data.port);
515            if (socketAddresses.size() == 1) {
516                if (!socketAddresses.get(0).getAddress().getHostAddress().equals(data.host)) {
517                    sb.append(" using ip address ").append(socketAddresses.get(0).getAddress().getHostAddress());
518                    sb.append(" and port ").append(socketAddresses.get(0).getPort());
519                }
520            } else {
521                sb.append(" using ip addresses and ports ");
522                for (int i = 0; i < socketAddresses.size(); ++i) {
523                    if (i > 0) {
524                        sb.append(", ");
525                        sb.append(socketAddresses.get(i).getAddress().getHostAddress());
526                        sb.append(":").append(socketAddresses.get(i).getPort());
527                    }
528                }
529            }
530            return sb.toString();
531        }
532
533    }
534
535    /**
536     * This method is only for unit testing. It is not Thread-safe.
537     * @param resolver the HostResolver.
538     */
539    public static void setHostResolver(HostResolver resolver) {
540        TcpSocketManagerFactory.resolver = resolver;
541    }
542
543    public static class HostResolver {
544
545        public List<InetSocketAddress> resolveHost(String host, int port) throws UnknownHostException {
546            InetAddress[] addresses = InetAddress.getAllByName(host);
547            List<InetSocketAddress> socketAddresses = new ArrayList<>(addresses.length);
548            for (InetAddress address: addresses) {
549                socketAddresses.add(new InetSocketAddress(address, port));
550            }
551            return socketAddresses;
552        }
553    }
554
555    /**
556     * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
557     */
558    public SocketOptions getSocketOptions() {
559        return socketOptions;
560    }
561
562    /**
563     * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
564     */
565    public Socket getSocket() {
566        return socket;
567    }
568
569    public int getReconnectionDelayMillis() {
570        return reconnectionDelayMillis;
571    }
572
573    @Override
574    public String toString() {
575        return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
576                + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
577                + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
578                + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
579                + count + "]";
580    }
581
582}