001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.net;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.io.Serializable;
022import java.net.ConnectException;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.Socket;
026import java.net.UnknownHostException;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.concurrent.CountDownLatch;
030
031import org.apache.logging.log4j.core.Layout;
032import org.apache.logging.log4j.core.appender.AppenderLoggingException;
033import org.apache.logging.log4j.core.appender.ManagerFactory;
034import org.apache.logging.log4j.core.appender.OutputStreamManager;
035import org.apache.logging.log4j.core.util.Closer;
036import org.apache.logging.log4j.core.util.Log4jThread;
037import org.apache.logging.log4j.core.util.NullOutputStream;
038import org.apache.logging.log4j.util.Strings;
039
040/**
041 * Manager of TCP Socket connections.
042 */
043public class TcpSocketManager extends AbstractSocketManager {
044    /**
045     * The default reconnection delay (30000 milliseconds or 30 seconds).
046     */
047    public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
048    /**
049     * The default port number of remote logging server (4560).
050     */
051    private static final int DEFAULT_PORT = 4560;
052
053    private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
054
055    private final int reconnectionDelayMillis;
056
057    private Reconnector reconnector;
058
059    private Socket socket;
060
061    private final SocketOptions socketOptions;
062
063    private final boolean retry;
064
065    private final boolean immediateFail;
066
067    private final int connectTimeoutMillis;
068
069    /**
070     * Constructs.
071     *
072     * @param name
073     *            The unique name of this connection.
074     * @param os
075     *            The OutputStream.
076     * @param socket
077     *            The Socket.
078     * @param inetAddress
079     *            The Internet address of the host.
080     * @param host
081     *            The name of the host.
082     * @param port
083     *            The port number on the host.
084     * @param connectTimeoutMillis
085     *            the connect timeout in milliseconds.
086     * @param reconnectionDelayMillis
087     *            Reconnection interval.
088     * @param immediateFail
089     *            True if the write should fail if no socket is immediately available.
090     * @param layout
091     *            The Layout.
092     * @param bufferSize
093     *            The buffer size.
094     * @deprecated Use
095     *             {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}.
096     */
097    @Deprecated
098    public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
099            final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
100            final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
101            final int bufferSize) {
102        this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
103                layout, bufferSize, null);
104    }
105
106    /**
107     * Constructs.
108     *
109     * @param name
110     *            The unique name of this connection.
111     * @param os
112     *            The OutputStream.
113     * @param socket
114     *            The Socket.
115     * @param inetAddress
116     *            The Internet address of the host.
117     * @param host
118     *            The name of the host.
119     * @param port
120     *            The port number on the host.
121     * @param connectTimeoutMillis
122     *            the connect timeout in milliseconds.
123     * @param reconnectionDelayMillis
124     *            Reconnection interval.
125     * @param immediateFail
126     *            True if the write should fail if no socket is immediately available.
127     * @param layout
128     *            The Layout.
129     * @param bufferSize
130     *            The buffer size.
131     */
132    public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
133            final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
134            final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
135            final int bufferSize, final SocketOptions socketOptions) {
136        super(name, os, inetAddress, host, port, layout, true, bufferSize);
137        this.connectTimeoutMillis = connectTimeoutMillis;
138        this.reconnectionDelayMillis = reconnectionDelayMillis;
139        this.socket = socket;
140        this.immediateFail = immediateFail;
141        this.retry = reconnectionDelayMillis > 0;
142        if (socket == null) {
143            this.reconnector = createReconnector();
144            this.reconnector.start();
145        }
146        this.socketOptions = socketOptions;
147    }
148
149    /**
150     * Obtains a TcpSocketManager.
151     *
152     * @param host
153     *            The host to connect to.
154     * @param port
155     *            The port on the host.
156     * @param connectTimeoutMillis
157     *            the connect timeout in milliseconds
158     * @param reconnectDelayMillis
159     *            The interval to pause between retries.
160     * @param bufferSize
161     *            The buffer size.
162     * @return A TcpSocketManager.
163     * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}.
164     */
165    @Deprecated
166    public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
167            final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
168            final int bufferSize) {
169        return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
170                bufferSize, null);
171    }
172
173    /**
174     * Obtains a TcpSocketManager.
175     *
176     * @param host
177     *            The host to connect to.
178     * @param port
179     *            The port on the host.
180     * @param connectTimeoutMillis
181     *            the connect timeout in milliseconds
182     * @param reconnectDelayMillis
183     *            The interval to pause between retries.
184     * @param bufferSize
185     *            The buffer size.
186     * @return A TcpSocketManager.
187     */
188    public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
189            int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
190            final int bufferSize, final SocketOptions socketOptions) {
191        if (Strings.isEmpty(host)) {
192            throw new IllegalArgumentException("A host name is required");
193        }
194        if (port <= 0) {
195            port = DEFAULT_PORT;
196        }
197        if (reconnectDelayMillis == 0) {
198            reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
199        }
200        return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
201                connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
202    }
203
204    @SuppressWarnings("sync-override") // synchronization on "this" is done within the method
205    @Override
206    protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
207        if (socket == null) {
208            if (reconnector != null && !immediateFail) {
209                reconnector.latch();
210            }
211            if (socket == null) {
212                throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
213            }
214        }
215        synchronized (this) {
216            try {
217                writeAndFlush(bytes, offset, length, immediateFlush);
218            } catch (final IOException causeEx) {
219                if (retry && reconnector == null) {
220                    final String config = inetAddress + ":" + port;
221                    reconnector = createReconnector();
222                    try {
223                        reconnector.reconnect();
224                    } catch (final IOException reconnEx) {
225                        LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
226                                config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
227                        reconnector.start();
228                        throw new AppenderLoggingException(
229                                String.format("Error sending to %s for %s", getName(), config), causeEx);
230                    }
231                    try {
232                        writeAndFlush(bytes, offset, length, immediateFlush);
233                    } catch (final IOException e) {
234                        throw new AppenderLoggingException(
235                                String.format("Error writing to %s after reestablishing connection for %s", getName(),
236                                        config),
237                                causeEx);
238                    }
239                }
240            }
241        }
242    }
243
244    private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
245            throws IOException {
246        @SuppressWarnings("resource") // outputStream is managed by this class
247        final OutputStream outputStream = getOutputStream();
248        outputStream.write(bytes, offset, length);
249        if (immediateFlush) {
250            outputStream.flush();
251        }
252    }
253
254    @Override
255    protected synchronized boolean closeOutputStream() {
256        final boolean closed = super.closeOutputStream();
257        if (reconnector != null) {
258            reconnector.shutdown();
259            reconnector.interrupt();
260            reconnector = null;
261        }
262        final Socket oldSocket = socket;
263        socket = null;
264        if (oldSocket != null) {
265            try {
266                oldSocket.close();
267            } catch (final IOException e) {
268                LOGGER.error("Could not close socket {}", socket);
269                return false;
270            }
271        }
272        return closed;
273    }
274
275    public int getConnectTimeoutMillis() {
276        return connectTimeoutMillis;
277    }
278
279    /**
280     * Gets this TcpSocketManager's content format. Specified by:
281     * <ul>
282     * <li>Key: "protocol" Value: "tcp"</li>
283     * <li>Key: "direction" Value: "out"</li>
284     * </ul>
285     *
286     * @return Map of content format keys supporting TcpSocketManager
287     */
288    @Override
289    public Map<String, String> getContentFormat() {
290        final Map<String, String> result = new HashMap<>(super.getContentFormat());
291        result.put("protocol", "tcp");
292        result.put("direction", "out");
293        return result;
294    }
295
296    /**
297     * Handles reconnecting to a Socket on a Thread.
298     */
299    private class Reconnector extends Log4jThread {
300
301        private final CountDownLatch latch = new CountDownLatch(1);
302
303        private boolean shutdown = false;
304
305        private final Object owner;
306
307        public Reconnector(final OutputStreamManager owner) {
308            super("TcpSocketManager-Reconnector");
309            this.owner = owner;
310        }
311
312        public void latch() {
313            try {
314                latch.await();
315            } catch (final InterruptedException ex) {
316                // Ignore the exception.
317            }
318        }
319
320        public void shutdown() {
321            shutdown = true;
322        }
323
324        @Override
325        public void run() {
326            while (!shutdown) {
327                try {
328                    sleep(reconnectionDelayMillis);
329                    reconnect();
330                } catch (final InterruptedException ie) {
331                    LOGGER.debug("Reconnection interrupted.");
332                } catch (final ConnectException ex) {
333                    LOGGER.debug("{}:{} refused connection", host, port);
334                } catch (final IOException ioe) {
335                    LOGGER.debug("Unable to reconnect to {}:{}", host, port);
336                } finally {
337                    latch.countDown();
338                }
339            }
340        }
341
342        void reconnect() throws IOException {
343            final Socket sock = createSocket(inetAddress.getHostName(), port);
344            @SuppressWarnings("resource") // newOS is managed by the enclosing Manager.
345            final OutputStream newOS = sock.getOutputStream();
346            synchronized (owner) {
347                Closer.closeSilently(getOutputStream());
348                setOutputStream(newOS);
349                socket = sock;
350                reconnector = null;
351                shutdown = true;
352            }
353            LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket);
354        }
355
356        @Override
357        public String toString() {
358            return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + ", owner=" + owner + "]";
359        }
360    }
361
362    private Reconnector createReconnector() {
363        final Reconnector recon = new Reconnector(this);
364        recon.setDaemon(true);
365        recon.setPriority(Thread.MIN_PRIORITY);
366        return recon;
367    }
368
369    protected Socket createSocket(final String host, final int port) throws IOException {
370        return createSocket(host, port, socketOptions, connectTimeoutMillis);
371    }
372
373    protected static Socket createSocket(final String host, final int port, final SocketOptions socketOptions,
374            final int connectTimeoutMillis) throws IOException {
375        LOGGER.debug("Creating socket {}:{}", host, port);
376        final Socket newSocket = new Socket();
377        if (socketOptions != null) {
378            // Not sure which options must be applied before or after the connect() call.
379            socketOptions.apply(newSocket);
380        }
381        newSocket.connect(new InetSocketAddress(host, port), connectTimeoutMillis);
382        if (socketOptions != null) {
383            // Not sure which options must be applied before or after the connect() call.
384            socketOptions.apply(newSocket);
385        }
386        return newSocket;
387    }
388
389    /**
390     * Data for the factory.
391     */
392    static class FactoryData {
393        protected final String host;
394        protected final int port;
395        protected final int connectTimeoutMillis;
396        protected final int reconnectDelayMillis;
397        protected final boolean immediateFail;
398        protected final Layout<? extends Serializable> layout;
399        protected final int bufferSize;
400        protected final SocketOptions socketOptions;
401
402        public FactoryData(final String host, final int port, final int connectTimeoutMillis,
403                final int reconnectDelayMillis, final boolean immediateFail,
404                final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
405            this.host = host;
406            this.port = port;
407            this.connectTimeoutMillis = connectTimeoutMillis;
408            this.reconnectDelayMillis = reconnectDelayMillis;
409            this.immediateFail = immediateFail;
410            this.layout = layout;
411            this.bufferSize = bufferSize;
412            this.socketOptions = socketOptions;
413        }
414
415        @Override
416        public String toString() {
417            return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
418                    + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
419                    + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
420        }
421    }
422
423    /**
424     * Factory to create a TcpSocketManager.
425     *
426     * @param <M>
427     *            The manager type.
428     * @param <T>
429     *            The factory data type.
430     */
431    protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
432            implements ManagerFactory<M, T> {
433
434        @SuppressWarnings("resource")
435        @Override
436        public M createManager(final String name, final T data) {
437            InetAddress inetAddress;
438            OutputStream os;
439            try {
440                inetAddress = InetAddress.getByName(data.host);
441            } catch (final UnknownHostException ex) {
442                LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
443                return null;
444            }
445            Socket socket = null;
446            try {
447                // LOG4J2-1042
448                socket = createSocket(data);
449                os = socket.getOutputStream();
450                return createManager(name, os, socket, inetAddress, data);
451            } catch (final IOException ex) {
452                LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
453                os = NullOutputStream.getInstance();
454            }
455            if (data.reconnectDelayMillis == 0) {
456                Closer.closeSilently(socket);
457                return null;
458            }
459            return createManager(name, os, null, inetAddress, data);
460        }
461
462        @SuppressWarnings("unchecked")
463        M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) {
464            return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
465                    data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
466                    data.bufferSize, data.socketOptions);
467        }
468
469        Socket createSocket(final T data) throws IOException {
470            return TcpSocketManager.createSocket(data.host, data.port, data.socketOptions, data.connectTimeoutMillis);
471        }
472
473    }
474
475    /**
476     * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
477     */
478    public SocketOptions getSocketOptions() {
479        return socketOptions;
480    }
481
482    /**
483     * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
484     */
485    public Socket getSocket() {
486        return socket;
487    }
488
489    public int getReconnectionDelayMillis() {
490        return reconnectionDelayMillis;
491    }
492
493    @Override
494    public String toString() {
495        return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
496                + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
497                + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
498                + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
499                + count + "]";
500    }
501
502}