1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.io.File;
20 import java.io.IOException;
21 import java.io.OutputStream;
22 import java.io.RandomAccessFile;
23 import java.io.Serializable;
24 import java.lang.reflect.Method;
25 import java.nio.ByteBuffer;
26 import java.nio.ByteOrder;
27 import java.nio.MappedByteBuffer;
28 import java.nio.channels.FileChannel;
29 import java.security.AccessController;
30 import java.security.PrivilegedActionException;
31 import java.security.PrivilegedExceptionAction;
32 import java.util.HashMap;
33 import java.util.Map;
34 import java.util.Objects;
35
36 import org.apache.logging.log4j.core.Layout;
37 import org.apache.logging.log4j.core.util.Closer;
38 import org.apache.logging.log4j.core.util.FileUtils;
39 import org.apache.logging.log4j.core.util.NullOutputStream;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public class MemoryMappedFileManager extends OutputStreamManager {
60
61
62
63 static final int DEFAULT_REGION_LENGTH = 32 * 1024 * 1024;
64 private static final int MAX_REMAP_COUNT = 10;
65 private static final MemoryMappedFileManagerFactory FACTORY = new MemoryMappedFileManagerFactory();
66 private static final double NANOS_PER_MILLISEC = 1000.0 * 1000.0;
67
68 private final boolean immediateFlush;
69 private final int regionLength;
70 private final String advertiseURI;
71 private final RandomAccessFile randomAccessFile;
72 private final ThreadLocal<Boolean> isEndOfBatch = new ThreadLocal<>();
73 private MappedByteBuffer mappedBuffer;
74 private long mappingOffset;
75
76 protected MemoryMappedFileManager(final RandomAccessFile file, final String fileName, final OutputStream os,
77 final boolean immediateFlush, final long position, final int regionLength, final String advertiseURI,
78 final Layout<? extends Serializable> layout, final boolean writeHeader) throws IOException {
79 super(os, fileName, layout, writeHeader, ByteBuffer.wrap(new byte[0]));
80 this.immediateFlush = immediateFlush;
81 this.randomAccessFile = Objects.requireNonNull(file, "RandomAccessFile");
82 this.regionLength = regionLength;
83 this.advertiseURI = advertiseURI;
84 this.isEndOfBatch.set(Boolean.FALSE);
85 this.mappedBuffer = mmap(randomAccessFile.getChannel(), getFileName(), position, regionLength);
86 this.byteBuffer = mappedBuffer;
87 this.mappingOffset = position;
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101 public static MemoryMappedFileManager getFileManager(final String fileName, final boolean append,
102 final boolean immediateFlush, final int regionLength, final String advertiseURI,
103 final Layout<? extends Serializable> layout) {
104 return narrow(MemoryMappedFileManager.class, getManager(fileName, new FactoryData(append, immediateFlush,
105 regionLength, advertiseURI, layout), FACTORY));
106 }
107
108 public Boolean isEndOfBatch() {
109 return isEndOfBatch.get();
110 }
111
112 public void setEndOfBatch(final boolean endOfBatch) {
113 this.isEndOfBatch.set(Boolean.valueOf(endOfBatch));
114 }
115
116 @Override
117 protected synchronized void write(final byte[] bytes, int offset, int length, final boolean immediateFlush) {
118 while (length > mappedBuffer.remaining()) {
119 final int chunk = mappedBuffer.remaining();
120 mappedBuffer.put(bytes, offset, chunk);
121 offset += chunk;
122 length -= chunk;
123 remap();
124 }
125 mappedBuffer.put(bytes, offset, length);
126
127
128
129 }
130
131 private synchronized void remap() {
132 final long offset = this.mappingOffset + mappedBuffer.position();
133 final int length = mappedBuffer.remaining() + regionLength;
134 try {
135 unsafeUnmap(mappedBuffer);
136 final long fileLength = randomAccessFile.length() + regionLength;
137 LOGGER.debug("{} {} extending {} by {} bytes to {}", getClass().getSimpleName(), getName(), getFileName(),
138 regionLength, fileLength);
139
140 final long startNanos = System.nanoTime();
141 randomAccessFile.setLength(fileLength);
142 final float millis = (float) ((System.nanoTime() - startNanos) / NANOS_PER_MILLISEC);
143 LOGGER.debug("{} {} extended {} OK in {} millis", getClass().getSimpleName(), getName(), getFileName(),
144 millis);
145
146 mappedBuffer = mmap(randomAccessFile.getChannel(), getFileName(), offset, length);
147 this.byteBuffer = mappedBuffer;
148 mappingOffset = offset;
149 } catch (final Exception ex) {
150 logError("Unable to remap", ex);
151 }
152 }
153
154 @Override
155 public synchronized void flush() {
156 mappedBuffer.force();
157 }
158
159 @Override
160 public synchronized boolean closeOutputStream() {
161 final long position = mappedBuffer.position();
162 final long length = mappingOffset + position;
163 try {
164 unsafeUnmap(mappedBuffer);
165 } catch (final Exception ex) {
166 logError("Unable to unmap MappedBuffer", ex);
167 }
168 try {
169 LOGGER.debug("MMapAppender closing. Setting {} length to {} (offset {} + position {})", getFileName(),
170 length, mappingOffset, position);
171 randomAccessFile.setLength(length);
172 randomAccessFile.close();
173 return true;
174 } catch (final IOException ex) {
175 logError("Unable to close MemoryMappedFile", ex);
176 return false;
177 }
178 }
179
180 public static MappedByteBuffer mmap(final FileChannel fileChannel, final String fileName, final long start,
181 final int size) throws IOException {
182 for (int i = 1;; i++) {
183 try {
184 LOGGER.debug("MMapAppender remapping {} start={}, size={}", fileName, start, size);
185
186 final long startNanos = System.nanoTime();
187 final MappedByteBuffer map = fileChannel.map(FileChannel.MapMode.READ_WRITE, start, size);
188 map.order(ByteOrder.nativeOrder());
189
190 final float millis = (float) ((System.nanoTime() - startNanos) / NANOS_PER_MILLISEC);
191 LOGGER.debug("MMapAppender remapped {} OK in {} millis", fileName, millis);
192
193 return map;
194 } catch (final IOException e) {
195 if (e.getMessage() == null || !e.getMessage().endsWith("user-mapped section open")) {
196 throw e;
197 }
198 LOGGER.debug("Remap attempt {}/{} failed. Retrying...", i, MAX_REMAP_COUNT, e);
199 if (i < MAX_REMAP_COUNT) {
200 Thread.yield();
201 } else {
202 try {
203 Thread.sleep(1);
204 } catch (final InterruptedException ignored) {
205 Thread.currentThread().interrupt();
206 throw e;
207 }
208 }
209 }
210 }
211 }
212
213 private static void unsafeUnmap(final MappedByteBuffer mbb) throws PrivilegedActionException {
214 LOGGER.debug("MMapAppender unmapping old buffer...");
215 final long startNanos = System.nanoTime();
216 AccessController.doPrivileged(new PrivilegedExceptionAction<Object>() {
217 @Override
218 public Object run() throws Exception {
219 final Method getCleanerMethod = mbb.getClass().getMethod("cleaner");
220 getCleanerMethod.setAccessible(true);
221 final Object cleaner = getCleanerMethod.invoke(mbb);
222 final Method cleanMethod = cleaner.getClass().getMethod("clean");
223 cleanMethod.invoke(cleaner);
224 return null;
225 }
226 });
227 final float millis = (float) ((System.nanoTime() - startNanos) / NANOS_PER_MILLISEC);
228 LOGGER.debug("MMapAppender unmapped buffer OK in {} millis", millis);
229 }
230
231
232
233
234
235
236 public String getFileName() {
237 return getName();
238 }
239
240
241
242
243
244
245 public int getRegionLength() {
246 return regionLength;
247 }
248
249
250
251
252
253
254
255 public boolean isImmediateFlush() {
256 return immediateFlush;
257 }
258
259
260
261
262
263
264
265
266
267 @Override
268 public Map<String, String> getContentFormat() {
269 final Map<String, String> result = new HashMap<>(super.getContentFormat());
270 result.put("fileURI", advertiseURI);
271 return result;
272 }
273
274 @Override
275 protected void flushBuffer(final ByteBuffer buffer) {
276
277 }
278
279 @Override
280 public ByteBuffer getByteBuffer() {
281 return mappedBuffer;
282 }
283
284 @Override
285 public ByteBuffer drain(final ByteBuffer buf) {
286 remap();
287 return mappedBuffer;
288 }
289
290
291
292
293 private static class FactoryData {
294 private final boolean append;
295 private final boolean immediateFlush;
296 private final int regionLength;
297 private final String advertiseURI;
298 private final Layout<? extends Serializable> layout;
299
300
301
302
303
304
305
306
307
308
309 public FactoryData(final boolean append, final boolean immediateFlush, final int regionLength,
310 final String advertiseURI, final Layout<? extends Serializable> layout) {
311 this.append = append;
312 this.immediateFlush = immediateFlush;
313 this.regionLength = regionLength;
314 this.advertiseURI = advertiseURI;
315 this.layout = layout;
316 }
317 }
318
319
320
321
322 private static class MemoryMappedFileManagerFactory
323 implements ManagerFactory<MemoryMappedFileManager, FactoryData> {
324
325
326
327
328
329
330
331
332 @SuppressWarnings("resource")
333 @Override
334 public MemoryMappedFileManager createManager(final String name, final FactoryData data) {
335 final File file = new File(name);
336 if (!data.append) {
337 file.delete();
338 }
339
340 final boolean writeHeader = !data.append || !file.exists();
341 final OutputStream os = NullOutputStream.getInstance();
342 RandomAccessFile raf = null;
343 try {
344 FileUtils.makeParentDirs(file);
345 raf = new RandomAccessFile(name, "rw");
346 final long position = (data.append) ? raf.length() : 0;
347 raf.setLength(position + data.regionLength);
348 return new MemoryMappedFileManager(raf, name, os, data.immediateFlush, position, data.regionLength,
349 data.advertiseURI, data.layout, writeHeader);
350 } catch (final Exception ex) {
351 LOGGER.error("MemoryMappedFileManager (" + name + ") " + ex, ex);
352 Closer.closeSilently(raf);
353 }
354 return null;
355 }
356 }
357 }