001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache license, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the license for the specific language governing permissions and 015 * limitations under the license. 016 */ 017package org.apache.logging.log4j.core.appender; 018 019import java.io.IOException; 020import java.io.OutputStream; 021import java.io.Serializable; 022import java.nio.Buffer; 023import java.nio.ByteBuffer; 024import java.util.Objects; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.logging.log4j.core.Layout; 028import org.apache.logging.log4j.core.LoggerContext; 029import org.apache.logging.log4j.core.layout.ByteBufferDestination; 030import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper; 031import org.apache.logging.log4j.core.util.Constants; 032 033/** 034 * Manages an OutputStream so that it can be shared by multiple Appenders and will 035 * allow appenders to reconfigure without requiring a new stream. 036 */ 037public class OutputStreamManager extends AbstractManager implements ByteBufferDestination { 038 protected final Layout<?> layout; 039 protected ByteBuffer byteBuffer; 040 private volatile OutputStream outputStream; 041 private boolean skipFooter; 042 043 protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout, 044 final boolean writeHeader) { 045 this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE); 046 } 047 048 protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout, 049 final boolean writeHeader, final int bufferSize) { 050 this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize])); 051 } 052 053 /** 054 * @since 2.6 055 * @deprecated 056 */ 057 @Deprecated 058 protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout, 059 final boolean writeHeader, final ByteBuffer byteBuffer) { 060 super(null, streamName); 061 this.outputStream = os; 062 this.layout = layout; 063 if (writeHeader && layout != null) { 064 final byte[] header = layout.getHeader(); 065 if (header != null) { 066 try { 067 getOutputStream().write(header, 0, header.length); 068 } catch (final IOException e) { 069 logError("Unable to write header", e); 070 } 071 } 072 } 073 this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer"); 074 } 075 076 /** 077 * @since 2.7 078 */ 079 protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName, 080 final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader, 081 final ByteBuffer byteBuffer) { 082 super(loggerContext, streamName); 083 if (createOnDemand && os != null) { 084 LOGGER.error( 085 "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.", 086 streamName); 087 } 088 this.layout = layout; 089 this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer"); 090 this.outputStream = os; 091 if (writeHeader && layout != null) { 092 final byte[] header = layout.getHeader(); 093 if (header != null) { 094 try { 095 getOutputStream().write(header, 0, header.length); 096 } catch (final IOException e) { 097 logError("Unable to write header for " + streamName, e); 098 } 099 } 100 } 101 } 102 103 /** 104 * Creates a Manager. 105 * 106 * @param name The name of the stream to manage. 107 * @param data The data to pass to the Manager. 108 * @param factory The factory to use to create the Manager. 109 * @param <T> The type of the OutputStreamManager. 110 * @return An OutputStreamManager. 111 */ 112 public static <T> OutputStreamManager getManager(final String name, final T data, 113 final ManagerFactory<? extends OutputStreamManager, T> factory) { 114 return AbstractManager.getManager(name, factory, data); 115 } 116 117 @SuppressWarnings("unused") 118 protected OutputStream createOutputStream() throws IOException { 119 throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()"); 120 } 121 122 /** 123 * Indicate whether the footer should be skipped or not. 124 * @param skipFooter true if the footer should be skipped. 125 */ 126 public void skipFooter(final boolean skipFooter) { 127 this.skipFooter = skipFooter; 128 } 129 130 /** 131 * Default hook to write footer during close. 132 */ 133 @Override 134 public boolean releaseSub(final long timeout, final TimeUnit timeUnit) { 135 writeFooter(); 136 return closeOutputStream(); 137 } 138 139 /** 140 * Writes the footer. 141 */ 142 protected void writeFooter() { 143 if (layout == null || skipFooter) { 144 return; 145 } 146 final byte[] footer = layout.getFooter(); 147 if (footer != null) { 148 write(footer); 149 } 150 } 151 152 /** 153 * Returns the status of the stream. 154 * @return true if the stream is open, false if it is not. 155 */ 156 public boolean isOpen() { 157 return getCount() > 0; 158 } 159 160 public boolean hasOutputStream() { 161 return outputStream != null; 162 } 163 164 protected OutputStream getOutputStream() throws IOException { 165 if (outputStream == null) { 166 outputStream = createOutputStream(); 167 } 168 return outputStream; 169 } 170 171 protected void setOutputStream(final OutputStream os) { 172 final byte[] header = layout.getHeader(); 173 if (header != null) { 174 try { 175 os.write(header, 0, header.length); 176 this.outputStream = os; // only update field if os.write() succeeded 177 } catch (final IOException ioe) { 178 logError("Unable to write header", ioe); 179 } 180 } else { 181 this.outputStream = os; 182 } 183 } 184 185 /** 186 * Some output streams synchronize writes while others do not. 187 * @param bytes The serialized Log event. 188 * @throws AppenderLoggingException if an error occurs. 189 */ 190 protected void write(final byte[] bytes) { 191 write(bytes, 0, bytes.length, false); 192 } 193 194 /** 195 * Some output streams synchronize writes while others do not. 196 * @param bytes The serialized Log event. 197 * @param immediateFlush If true, flushes after writing. 198 * @throws AppenderLoggingException if an error occurs. 199 */ 200 protected void write(final byte[] bytes, final boolean immediateFlush) { 201 write(bytes, 0, bytes.length, immediateFlush); 202 } 203 204 @Override 205 public void writeBytes(final byte[] data, final int offset, final int length) { 206 write(data, offset, length, false); 207 } 208 209 /** 210 * Some output streams synchronize writes while others do not. Synchronizing here insures that 211 * log events won't be intertwined. 212 * @param bytes The serialized Log event. 213 * @param offset The offset into the byte array. 214 * @param length The number of bytes to write. 215 * @throws AppenderLoggingException if an error occurs. 216 */ 217 protected void write(final byte[] bytes, final int offset, final int length) { 218 writeBytes(bytes, offset, length); 219 } 220 221 /** 222 * Some output streams synchronize writes while others do not. Synchronizing here insures that 223 * log events won't be intertwined. 224 * @param bytes The serialized Log event. 225 * @param offset The offset into the byte array. 226 * @param length The number of bytes to write. 227 * @param immediateFlush flushes immediately after writing. 228 * @throws AppenderLoggingException if an error occurs. 229 */ 230 protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) { 231 if (immediateFlush && byteBuffer.position() == 0) { 232 writeToDestination(bytes, offset, length); 233 flushDestination(); 234 return; 235 } 236 if (length >= byteBuffer.capacity()) { 237 // if request length exceeds buffer capacity, flush the buffer and write the data directly 238 flush(); 239 writeToDestination(bytes, offset, length); 240 } else { 241 if (length > byteBuffer.remaining()) { 242 flush(); 243 } 244 byteBuffer.put(bytes, offset, length); 245 } 246 if (immediateFlush) { 247 flush(); 248 } 249 } 250 251 /** 252 * Writes the specified section of the specified byte array to the stream. 253 * 254 * @param bytes the array containing data 255 * @param offset from where to write 256 * @param length how many bytes to write 257 * @since 2.6 258 */ 259 protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) { 260 try { 261 getOutputStream().write(bytes, offset, length); 262 } catch (final IOException ex) { 263 throw new AppenderLoggingException("Error writing to stream " + getName(), ex); 264 } 265 } 266 267 /** 268 * Calls {@code flush()} on the underlying output stream. 269 * @since 2.6 270 */ 271 protected synchronized void flushDestination() { 272 final OutputStream stream = outputStream; // access volatile field only once per method 273 if (stream != null) { 274 try { 275 stream.flush(); 276 } catch (final IOException ex) { 277 throw new AppenderLoggingException("Error flushing stream " + getName(), ex); 278 } 279 } 280 } 281 282 /** 283 * Drains the ByteBufferDestination's buffer into the destination. By default this calls 284 * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents. 285 * The underlying stream is not {@linkplain OutputStream#flush() flushed}. 286 * 287 * @see #flushDestination() 288 * @since 2.6 289 */ 290 protected synchronized void flushBuffer(final ByteBuffer buf) { 291 ((Buffer) buf).flip(); 292 if (buf.remaining() > 0) { 293 writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); 294 } 295 buf.clear(); 296 } 297 298 /** 299 * Flushes any buffers. 300 */ 301 public synchronized void flush() { 302 flushBuffer(byteBuffer); 303 flushDestination(); 304 } 305 306 protected synchronized boolean closeOutputStream() { 307 flush(); 308 final OutputStream stream = outputStream; // access volatile field only once per method 309 if (stream == null || stream == System.out || stream == System.err) { 310 return true; 311 } 312 try { 313 stream.close(); 314 } catch (final IOException ex) { 315 logError("Unable to close stream", ex); 316 return false; 317 } 318 return true; 319 } 320 321 /** 322 * Returns this {@code ByteBufferDestination}'s buffer. 323 * @return the buffer 324 * @since 2.6 325 */ 326 @Override 327 public ByteBuffer getByteBuffer() { 328 return byteBuffer; 329 } 330 331 /** 332 * Drains the ByteBufferDestination's buffer into the destination. By default this calls 333 * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override. 334 * <p> 335 * Do not call this method lightly! For some subclasses this is a very expensive operation. For example, 336 * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region 337 * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its 338 * buffer. 339 * </p><p> 340 * To just flush the buffered contents to the underlying stream, call 341 * {@link #flushBuffer(ByteBuffer)} directly instead. 342 * </p> 343 * 344 * @param buf the buffer whose contents to write the the destination 345 * @return the specified buffer 346 * @since 2.6 347 */ 348 @Override 349 public ByteBuffer drain(final ByteBuffer buf) { 350 flushBuffer(buf); 351 return buf; 352 } 353 354 @Override 355 public void writeBytes(final ByteBuffer data) { 356 if (data.remaining() == 0) { 357 return; 358 } 359 synchronized (this) { 360 ByteBufferDestinationHelper.writeToUnsynchronized(data, this); 361 } 362 } 363}