1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.io.OutputStream;
22 import java.io.Serializable;
23 import java.net.ConnectException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.Socket;
27 import java.net.UnknownHostException;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.concurrent.CountDownLatch;
31
32 import org.apache.logging.log4j.core.Layout;
33 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
34 import org.apache.logging.log4j.core.appender.ManagerFactory;
35 import org.apache.logging.log4j.core.appender.OutputStreamManager;
36 import org.apache.logging.log4j.util.Strings;
37
38
39
40
41 public class TcpSocketManager extends AbstractSocketManager {
42
43
44
45 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
46
47
48
49 private static final int DEFAULT_PORT = 4560;
50
51 private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
52
53 private final int reconnectionDelay;
54
55 private Reconnector connector;
56
57 private Socket socket;
58
59 private final boolean retry;
60
61 private final boolean immediateFail;
62
63 private final int connectTimeoutMillis;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
79 final String host, final int port, int connectTimeoutMillis, final int delay,
80 final boolean immediateFail, final Layout<? extends Serializable> layout) {
81 super(name, os, inetAddress, host, port, layout);
82 this.connectTimeoutMillis = connectTimeoutMillis;
83 this.reconnectionDelay = delay;
84 this.socket = sock;
85 this.immediateFail = immediateFail;
86 retry = delay > 0;
87 if (sock == null) {
88 connector = new Reconnector(this);
89 connector.setDaemon(true);
90 connector.setPriority(Thread.MIN_PRIORITY);
91 connector.start();
92 }
93 }
94
95
96
97
98
99
100
101
102
103 public static TcpSocketManager getSocketManager(final String host, int port, int connectTimeoutMillis,
104 int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) {
105 if (Strings.isEmpty(host)) {
106 throw new IllegalArgumentException("A host name is required");
107 }
108 if (port <= 0) {
109 port = DEFAULT_PORT;
110 }
111 if (delayMillis == 0) {
112 delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
113 }
114 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
115 connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY);
116 }
117
118 @Override
119 protected void write(final byte[] bytes, final int offset, final int length) {
120 if (socket == null) {
121 if (connector != null && !immediateFail) {
122 connector.latch();
123 }
124 if (socket == null) {
125 final String msg = "Error writing to " + getName() + " socket not available";
126 throw new AppenderLoggingException(msg);
127 }
128 }
129 synchronized (this) {
130 try {
131 getOutputStream().write(bytes, offset, length);
132 } catch (final IOException ex) {
133 if (retry && connector == null) {
134 connector = new Reconnector(this);
135 connector.setDaemon(true);
136 connector.setPriority(Thread.MIN_PRIORITY);
137 connector.start();
138 }
139 final String msg = "Error writing to " + getName();
140 throw new AppenderLoggingException(msg, ex);
141 }
142 }
143 }
144
145 @Override
146 protected synchronized void close() {
147 super.close();
148 if (connector != null) {
149 connector.shutdown();
150 connector.interrupt();
151 connector = null;
152 }
153 }
154
155 public int getConnectTimeoutMillis() {
156 return connectTimeoutMillis;
157 }
158
159
160
161
162
163
164
165
166
167
168 @Override
169 public Map<String, String> getContentFormat() {
170 final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
171 result.put("protocol", "tcp");
172 result.put("direction", "out");
173 return result;
174 }
175
176
177
178
179 private class Reconnector extends Thread {
180
181 private final CountDownLatch latch = new CountDownLatch(1);
182
183 private boolean shutdown = false;
184
185 private final Object owner;
186
187 public Reconnector(final OutputStreamManager owner) {
188 this.owner = owner;
189 }
190
191 public void latch() {
192 try {
193 latch.await();
194 } catch (final InterruptedException ex) {
195
196 }
197 }
198
199 public void shutdown() {
200 shutdown = true;
201 }
202
203 @Override
204 public void run() {
205 while (!shutdown) {
206 try {
207 sleep(reconnectionDelay);
208 final Socket sock = createSocket(inetAddress, port);
209 final OutputStream newOS = sock.getOutputStream();
210 synchronized (owner) {
211 try {
212 getOutputStream().close();
213 } catch (final IOException ioe) {
214
215 }
216
217 setOutputStream(newOS);
218 socket = sock;
219 connector = null;
220 shutdown = true;
221 }
222 LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
223 } catch (final InterruptedException ie) {
224 LOGGER.debug("Reconnection interrupted.");
225 } catch (final ConnectException ex) {
226 LOGGER.debug(host + ':' + port + " refused connection");
227 } catch (final IOException ioe) {
228 LOGGER.debug("Unable to reconnect to " + host + ':' + port);
229 } finally {
230 latch.countDown();
231 }
232 }
233 }
234 }
235
236 protected Socket createSocket(final InetAddress host, final int port) throws IOException {
237 return createSocket(host.getHostName(), port);
238 }
239
240 protected Socket createSocket(final String host, final int port) throws IOException {
241 final InetSocketAddress address = new InetSocketAddress(host, port);
242 final Socket newSocket = new Socket();
243 newSocket.connect(address, connectTimeoutMillis);
244 return newSocket;
245 }
246
247
248
249
250 private static class FactoryData {
251 private final String host;
252 private final int port;
253 private final int connectTimeoutMillis;
254 private final int delayMillis;
255 private final boolean immediateFail;
256 private final Layout<? extends Serializable> layout;
257
258 public FactoryData(final String host, final int port, int connectTimeoutMillis, final int delayMillis,
259 final boolean immediateFail, final Layout<? extends Serializable> layout) {
260 this.host = host;
261 this.port = port;
262 this.connectTimeoutMillis = connectTimeoutMillis;
263 this.delayMillis = delayMillis;
264 this.immediateFail = immediateFail;
265 this.layout = layout;
266 }
267 }
268
269
270
271
272 protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
273 @Override
274 public TcpSocketManager createManager(final String name, final FactoryData data) {
275
276 InetAddress inetAddress;
277 OutputStream os;
278 try {
279 inetAddress = InetAddress.getByName(data.host);
280 } catch (final UnknownHostException ex) {
281 LOGGER.error("Could not find address of " + data.host, ex);
282 return null;
283 }
284 try {
285 final Socket socket = new Socket(data.host, data.port);
286 os = socket.getOutputStream();
287 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
288 data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout);
289 } catch (final IOException ex) {
290 LOGGER.error("TcpSocketManager (" + name + ") " + ex);
291 os = new ByteArrayOutputStream();
292 }
293 if (data.delayMillis == 0) {
294 return null;
295 }
296 return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
297 data.delayMillis, data.immediateFail, data.layout);
298 }
299 }
300
301 }