1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.io.IOException;
20 import java.io.OutputStream;
21 import java.io.Serializable;
22 import java.net.ConnectException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.Socket;
26 import java.net.UnknownHostException;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.concurrent.CountDownLatch;
32
33 import org.apache.logging.log4j.core.Layout;
34 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
35 import org.apache.logging.log4j.core.appender.ManagerFactory;
36 import org.apache.logging.log4j.core.appender.OutputStreamManager;
37 import org.apache.logging.log4j.core.util.Closer;
38 import org.apache.logging.log4j.core.util.Log4jThread;
39 import org.apache.logging.log4j.core.util.NullOutputStream;
40 import org.apache.logging.log4j.util.Strings;
41
42
43
44
45 public class TcpSocketManager extends AbstractSocketManager {
46
47
48
49 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
50
51
52
53 private static final int DEFAULT_PORT = 4560;
54
55 private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
56
57 private final int reconnectionDelayMillis;
58
59 private Reconnector reconnector;
60
61 private Socket socket;
62
63 private final SocketOptions socketOptions;
64
65 private final boolean retry;
66
67 private final boolean immediateFail;
68
69 private final int connectTimeoutMillis;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 @Deprecated
100 public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
101 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
102 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
103 final int bufferSize) {
104 this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
105 layout, bufferSize, null);
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
135 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
136 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
137 final int bufferSize, final SocketOptions socketOptions) {
138 super(name, os, inetAddress, host, port, layout, true, bufferSize);
139 this.connectTimeoutMillis = connectTimeoutMillis;
140 this.reconnectionDelayMillis = reconnectionDelayMillis;
141 this.socket = socket;
142 this.immediateFail = immediateFail;
143 this.retry = reconnectionDelayMillis > 0;
144 if (socket == null) {
145 this.reconnector = createReconnector();
146 this.reconnector.start();
147 }
148 this.socketOptions = socketOptions;
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 @Deprecated
168 public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
169 final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
170 final int bufferSize) {
171 return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
172 bufferSize, null);
173 }
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
191 int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
192 final int bufferSize, final SocketOptions socketOptions) {
193 if (Strings.isEmpty(host)) {
194 throw new IllegalArgumentException("A host name is required");
195 }
196 if (port <= 0) {
197 port = DEFAULT_PORT;
198 }
199 if (reconnectDelayMillis == 0) {
200 reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
201 }
202 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
203 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
204 }
205
206 @SuppressWarnings("sync-override")
207 @Override
208 protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
209 if (socket == null) {
210 if (reconnector != null && !immediateFail) {
211 reconnector.latch();
212 }
213 if (socket == null) {
214 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
215 }
216 }
217 synchronized (this) {
218 try {
219 writeAndFlush(bytes, offset, length, immediateFlush);
220 } catch (final IOException causeEx) {
221 if (retry && reconnector == null) {
222 final String config = inetAddress + ":" + port;
223 reconnector = createReconnector();
224 try {
225 reconnector.reconnect();
226 } catch (final IOException reconnEx) {
227 LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
228 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
229 reconnector.start();
230 throw new AppenderLoggingException(
231 String.format("Error sending to %s for %s", getName(), config), causeEx);
232 }
233 try {
234 writeAndFlush(bytes, offset, length, immediateFlush);
235 } catch (final IOException e) {
236 throw new AppenderLoggingException(
237 String.format("Error writing to %s after reestablishing connection for %s", getName(),
238 config),
239 causeEx);
240 }
241 }
242 }
243 }
244 }
245
246 private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
247 throws IOException {
248 @SuppressWarnings("resource")
249 final OutputStream outputStream = getOutputStream();
250 outputStream.write(bytes, offset, length);
251 if (immediateFlush) {
252 outputStream.flush();
253 }
254 }
255
256 @Override
257 protected synchronized boolean closeOutputStream() {
258 final boolean closed = super.closeOutputStream();
259 if (reconnector != null) {
260 reconnector.shutdown();
261 reconnector.interrupt();
262 reconnector = null;
263 }
264 final Socket oldSocket = socket;
265 socket = null;
266 if (oldSocket != null) {
267 try {
268 oldSocket.close();
269 } catch (final IOException e) {
270 LOGGER.error("Could not close socket {}", socket);
271 return false;
272 }
273 }
274 return closed;
275 }
276
277 public int getConnectTimeoutMillis() {
278 return connectTimeoutMillis;
279 }
280
281
282
283
284
285
286
287
288
289
290 @Override
291 public Map<String, String> getContentFormat() {
292 final Map<String, String> result = new HashMap<>(super.getContentFormat());
293 result.put("protocol", "tcp");
294 result.put("direction", "out");
295 return result;
296 }
297
298
299
300
301 private class Reconnector extends Log4jThread {
302
303 private final CountDownLatch latch = new CountDownLatch(1);
304
305 private boolean shutdown = false;
306
307 private final Object owner;
308
309 public Reconnector(final OutputStreamManager owner) {
310 super("TcpSocketManager-Reconnector");
311 this.owner = owner;
312 }
313
314 public void latch() {
315 try {
316 latch.await();
317 } catch (final InterruptedException ex) {
318
319 }
320 }
321
322 public void shutdown() {
323 shutdown = true;
324 }
325
326 @Override
327 public void run() {
328 while (!shutdown) {
329 try {
330 sleep(reconnectionDelayMillis);
331 reconnect();
332 } catch (final InterruptedException ie) {
333 LOGGER.debug("Reconnection interrupted.");
334 } catch (final ConnectException ex) {
335 LOGGER.debug("{}:{} refused connection", host, port);
336 } catch (final IOException ioe) {
337 LOGGER.debug("Unable to reconnect to {}:{}", host, port);
338 } finally {
339 latch.countDown();
340 }
341 }
342 }
343
344 void reconnect() throws IOException {
345 List<InetSocketAddress> socketAddresses = FACTORY.resolver.resolveHost(host, port);
346 if (socketAddresses.size() == 1) {
347 LOGGER.debug("Reconnecting " + socketAddresses.get(0));
348 connect(socketAddresses.get(0));
349 } else {
350 IOException ioe = null;
351 for (InetSocketAddress socketAddress : socketAddresses) {
352 try {
353 LOGGER.debug("Reconnecting " + socketAddress);
354 connect(socketAddress);
355 return;
356 } catch (IOException ex) {
357 ioe = ex;
358 }
359 }
360 throw ioe;
361 }
362 }
363
364 private void connect(InetSocketAddress socketAddress) throws IOException {
365 final Socket sock = createSocket(socketAddress);
366 @SuppressWarnings("resource")
367 final OutputStream newOS = sock.getOutputStream();
368 InetAddress prev = socket != null ? socket.getInetAddress() : null;
369 synchronized (owner) {
370 Closer.closeSilently(getOutputStream());
371 setOutputStream(newOS);
372 socket = sock;
373 reconnector = null;
374 shutdown = true;
375 }
376 String type = prev != null && prev.getHostAddress().equals(socketAddress.getAddress().getHostAddress()) ?
377 "reestablished" : "established";
378 LOGGER.debug("Connection to {}:{} {}: {}", host, port, type, socket);
379 }
380
381 @Override
382 public String toString() {
383 return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + "]";
384 }
385 }
386
387 private Reconnector createReconnector() {
388 final Reconnector recon = new Reconnector(this);
389 recon.setDaemon(true);
390 recon.setPriority(Thread.MIN_PRIORITY);
391 return recon;
392 }
393
394 protected Socket createSocket(final InetSocketAddress socketAddress) throws IOException {
395 return createSocket(socketAddress, socketOptions, connectTimeoutMillis);
396 }
397
398 protected static Socket createSocket(final InetSocketAddress socketAddress, final SocketOptions socketOptions,
399 final int connectTimeoutMillis) throws IOException {
400 LOGGER.debug("Creating socket {}", socketAddress.toString());
401 final Socket newSocket = new Socket();
402 if (socketOptions != null) {
403
404 socketOptions.apply(newSocket);
405 }
406 newSocket.connect(socketAddress, connectTimeoutMillis);
407 if (socketOptions != null) {
408
409 socketOptions.apply(newSocket);
410 }
411 return newSocket;
412 }
413
414
415
416
417 static class FactoryData {
418 protected final String host;
419 protected final int port;
420 protected final int connectTimeoutMillis;
421 protected final int reconnectDelayMillis;
422 protected final boolean immediateFail;
423 protected final Layout<? extends Serializable> layout;
424 protected final int bufferSize;
425 protected final SocketOptions socketOptions;
426
427 public FactoryData(final String host, final int port, final int connectTimeoutMillis,
428 final int reconnectDelayMillis, final boolean immediateFail,
429 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
430 this.host = host;
431 this.port = port;
432 this.connectTimeoutMillis = connectTimeoutMillis;
433 this.reconnectDelayMillis = reconnectDelayMillis;
434 this.immediateFail = immediateFail;
435 this.layout = layout;
436 this.bufferSize = bufferSize;
437 this.socketOptions = socketOptions;
438 }
439
440 @Override
441 public String toString() {
442 return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
443 + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
444 + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
445 }
446 }
447
448
449
450
451
452
453
454
455
456 protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
457 implements ManagerFactory<M, T> {
458
459 static HostResolver resolver = new HostResolver();
460
461 @SuppressWarnings("resource")
462 @Override
463 public M createManager(final String name, final T data) {
464 InetAddress inetAddress;
465 OutputStream os;
466 try {
467 inetAddress = InetAddress.getByName(data.host);
468 } catch (final UnknownHostException ex) {
469 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
470 return null;
471 }
472 Socket socket = null;
473 try {
474
475 socket = createSocket(data);
476 os = socket.getOutputStream();
477 return createManager(name, os, socket, inetAddress, data);
478 } catch (final IOException ex) {
479 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
480 os = NullOutputStream.getInstance();
481 }
482 if (data.reconnectDelayMillis == 0) {
483 Closer.closeSilently(socket);
484 return null;
485 }
486 return createManager(name, os, null, inetAddress, data);
487 }
488
489 @SuppressWarnings("unchecked")
490 M createManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress, final T data) {
491 return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
492 data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
493 data.bufferSize, data.socketOptions);
494 }
495
496 Socket createSocket(final T data) throws IOException {
497 List<InetSocketAddress> socketAddresses = resolver.resolveHost(data.host, data.port);
498 IOException ioe = null;
499 for (InetSocketAddress socketAddress : socketAddresses) {
500 try {
501 return TcpSocketManager.createSocket(socketAddress, data.socketOptions, data.connectTimeoutMillis);
502 } catch (IOException ex) {
503 ioe = ex;
504 }
505 }
506 throw new IOException(errorMessage(data, socketAddresses) , ioe);
507 }
508
509 protected String errorMessage(final T data, List<InetSocketAddress> socketAddresses) {
510 StringBuilder sb = new StringBuilder("Unable to create socket for ");
511 sb.append(data.host).append(" at port ").append(data.port);
512 if (socketAddresses.size() == 1) {
513 if (!socketAddresses.get(0).getAddress().getHostAddress().equals(data.host)) {
514 sb.append(" using ip address ").append(socketAddresses.get(0).getAddress().getHostAddress());
515 sb.append(" and port ").append(socketAddresses.get(0).getPort());
516 }
517 } else {
518 sb.append(" using ip addresses and ports ");
519 for (int i = 0; i < socketAddresses.size(); ++i) {
520 if (i > 0) {
521 sb.append(", ");
522 sb.append(socketAddresses.get(i).getAddress().getHostAddress());
523 sb.append(":").append(socketAddresses.get(i).getPort());
524 }
525 }
526 }
527 return sb.toString();
528 }
529
530 }
531
532
533
534
535
536 public static void setHostResolver(HostResolver resolver) {
537 TcpSocketManagerFactory.resolver = resolver;
538 }
539
540 public static class HostResolver {
541
542 public List<InetSocketAddress> resolveHost(String host, int port) throws UnknownHostException {
543 InetAddress[] addresses = InetAddress.getAllByName(host);
544 List<InetSocketAddress> socketAddresses = new ArrayList<>(addresses.length);
545 for (InetAddress address: addresses) {
546 socketAddresses.add(new InetSocketAddress(address, port));
547 }
548 return socketAddresses;
549 }
550 }
551
552
553
554
555 public SocketOptions getSocketOptions() {
556 return socketOptions;
557 }
558
559
560
561
562 public Socket getSocket() {
563 return socket;
564 }
565
566 public int getReconnectionDelayMillis() {
567 return reconnectionDelayMillis;
568 }
569
570 @Override
571 public String toString() {
572 return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
573 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
574 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
575 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
576 + count + "]";
577 }
578
579 }