001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.io.Serializable;
022import java.nio.Buffer;
023import java.nio.ByteBuffer;
024import java.util.Objects;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.logging.log4j.core.Layout;
028import org.apache.logging.log4j.core.LoggerContext;
029import org.apache.logging.log4j.core.layout.ByteBufferDestination;
030import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper;
031import org.apache.logging.log4j.core.util.Constants;
032
033/**
034 * Manages an OutputStream so that it can be shared by multiple Appenders and will
035 * allow appenders to reconfigure without requiring a new stream.
036 */
037public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
038    protected final Layout<?> layout;
039    protected ByteBuffer byteBuffer;
040    private volatile OutputStream outputStream;
041    private boolean skipFooter;
042
043    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
044            final boolean writeHeader) {
045        this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
046    }
047
048    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
049            final boolean writeHeader, final int bufferSize) {
050        this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
051    }
052
053    /**
054     * @since 2.6
055     * @deprecated
056     */
057    @Deprecated
058    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
059            final boolean writeHeader, final ByteBuffer byteBuffer) {
060        super(null, streamName);
061        this.outputStream = os;
062        this.layout = layout;
063        if (writeHeader && layout != null) {
064            final byte[] header = layout.getHeader();
065            if (header != null) {
066                try {
067                    getOutputStream().write(header, 0, header.length);
068                } catch (final IOException e) {
069                    logError("Unable to write header", e);
070                }
071            }
072        }
073        this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
074    }
075
076    /**
077     * @since 2.7
078     */
079    protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
080            final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
081            final ByteBuffer byteBuffer) {
082        super(loggerContext, streamName);
083        if (createOnDemand && os != null) {
084            LOGGER.error(
085                    "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
086                    streamName);
087        }
088        this.layout = layout;
089        this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
090        this.outputStream = os;
091        if (writeHeader && layout != null) {
092            final byte[] header = layout.getHeader();
093            if (header != null) {
094                try {
095                    getOutputStream().write(header, 0, header.length);
096                } catch (final IOException e) {
097                    logError("Unable to write header for " + streamName, e);
098                }
099            }
100        }
101    }
102
103    /**
104     * Creates a Manager.
105     *
106     * @param name The name of the stream to manage.
107     * @param data The data to pass to the Manager.
108     * @param factory The factory to use to create the Manager.
109     * @param <T> The type of the OutputStreamManager.
110     * @return An OutputStreamManager.
111     */
112    public static <T> OutputStreamManager getManager(final String name, final T data,
113                                                 final ManagerFactory<? extends OutputStreamManager, T> factory) {
114        return AbstractManager.getManager(name, factory, data);
115    }
116
117    @SuppressWarnings("unused")
118    protected OutputStream createOutputStream() throws IOException {
119        throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
120    }
121
122    /**
123     * Indicate whether the footer should be skipped or not.
124     * @param skipFooter true if the footer should be skipped.
125     */
126    public void skipFooter(final boolean skipFooter) {
127        this.skipFooter = skipFooter;
128    }
129
130    /**
131     * Default hook to write footer during close.
132     */
133    @Override
134    public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
135        writeFooter();
136        return closeOutputStream();
137    }
138
139    /**
140     * Writes the footer.
141     */
142    protected void writeFooter() {
143        if (layout == null || skipFooter) {
144            return;
145        }
146        final byte[] footer = layout.getFooter();
147        if (footer != null) {
148            write(footer);
149        }
150    }
151
152    /**
153     * Returns the status of the stream.
154     * @return true if the stream is open, false if it is not.
155     */
156    public boolean isOpen() {
157        return getCount() > 0;
158    }
159
160    public boolean hasOutputStream() {
161        return outputStream != null;
162    }
163
164    protected OutputStream getOutputStream() throws IOException {
165        if (outputStream == null) {
166            outputStream = createOutputStream();
167        }
168        return outputStream;
169    }
170
171    protected void setOutputStream(final OutputStream os) {
172        final byte[] header = layout.getHeader();
173        if (header != null) {
174            try {
175                os.write(header, 0, header.length);
176                this.outputStream = os; // only update field if os.write() succeeded
177            } catch (final IOException ioe) {
178                logError("Unable to write header", ioe);
179            }
180        } else {
181            this.outputStream = os;
182        }
183    }
184
185    /**
186     * Some output streams synchronize writes while others do not.
187     * @param bytes The serialized Log event.
188     * @throws AppenderLoggingException if an error occurs.
189     */
190    protected void write(final byte[] bytes)  {
191        write(bytes, 0, bytes.length, false);
192    }
193
194    /**
195     * Some output streams synchronize writes while others do not.
196     * @param bytes The serialized Log event.
197     * @param immediateFlush If true, flushes after writing.
198     * @throws AppenderLoggingException if an error occurs.
199     */
200    protected void write(final byte[] bytes, final boolean immediateFlush)  {
201        write(bytes, 0, bytes.length, immediateFlush);
202    }
203
204    @Override
205    public void writeBytes(final byte[] data, final int offset, final int length) {
206        write(data, offset, length, false);
207    }
208
209    /**
210     * Some output streams synchronize writes while others do not. Synchronizing here insures that
211     * log events won't be intertwined.
212     * @param bytes The serialized Log event.
213     * @param offset The offset into the byte array.
214     * @param length The number of bytes to write.
215     * @throws AppenderLoggingException if an error occurs.
216     */
217    protected void write(final byte[] bytes, final int offset, final int length) {
218        writeBytes(bytes, offset, length);
219    }
220
221    /**
222     * Some output streams synchronize writes while others do not. Synchronizing here insures that
223     * log events won't be intertwined.
224     * @param bytes The serialized Log event.
225     * @param offset The offset into the byte array.
226     * @param length The number of bytes to write.
227     * @param immediateFlush flushes immediately after writing.
228     * @throws AppenderLoggingException if an error occurs.
229     */
230    protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
231        if (immediateFlush && byteBuffer.position() == 0) {
232            writeToDestination(bytes, offset, length);
233            flushDestination();
234            return;
235        }
236        if (length >= byteBuffer.capacity()) {
237            // if request length exceeds buffer capacity, flush the buffer and write the data directly
238            flush();
239            writeToDestination(bytes, offset, length);
240        } else {
241            if (length > byteBuffer.remaining()) {
242                flush();
243            }
244            byteBuffer.put(bytes, offset, length);
245        }
246        if (immediateFlush) {
247            flush();
248        }
249    }
250
251    /**
252     * Writes the specified section of the specified byte array to the stream.
253     *
254     * @param bytes the array containing data
255     * @param offset from where to write
256     * @param length how many bytes to write
257     * @since 2.6
258     */
259    protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
260        try {
261            getOutputStream().write(bytes, offset, length);
262        } catch (final IOException ex) {
263            throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
264        }
265    }
266
267    /**
268     * Calls {@code flush()} on the underlying output stream.
269     * @since 2.6
270     */
271    protected synchronized void flushDestination() {
272        final OutputStream stream = outputStream; // access volatile field only once per method
273        if (stream != null) {
274            try {
275                stream.flush();
276            } catch (final IOException ex) {
277                throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
278            }
279        }
280    }
281
282    /**
283     * Drains the ByteBufferDestination's buffer into the destination. By default this calls
284     * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
285     * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
286     *
287     * @see #flushDestination()
288     * @since 2.6
289     */
290    protected synchronized void flushBuffer(final ByteBuffer buf) {
291        ((Buffer) buf).flip();
292        if (buf.remaining() > 0) {
293            writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
294        }
295        buf.clear();
296    }
297
298    /**
299     * Flushes any buffers.
300     */
301    public synchronized void flush() {
302        flushBuffer(byteBuffer);
303        flushDestination();
304    }
305
306    protected synchronized boolean closeOutputStream() {
307        flush();
308        final OutputStream stream = outputStream; // access volatile field only once per method
309        if (stream == null || stream == System.out || stream == System.err) {
310            return true;
311        }
312        try {
313            stream.close();
314        } catch (final IOException ex) {
315            logError("Unable to close stream", ex);
316            return false;
317        }
318        return true;
319    }
320
321    /**
322     * Returns this {@code ByteBufferDestination}'s buffer.
323     * @return the buffer
324     * @since 2.6
325     */
326    @Override
327    public ByteBuffer getByteBuffer() {
328        return byteBuffer;
329    }
330
331    /**
332     * Drains the ByteBufferDestination's buffer into the destination. By default this calls
333     * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
334     * <p>
335     * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
336     * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
337     * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
338     * buffer.
339     * </p><p>
340     * To just flush the buffered contents to the underlying stream, call
341     * {@link #flushBuffer(ByteBuffer)} directly instead.
342     * </p>
343     *
344     * @param buf the buffer whose contents to write the the destination
345     * @return the specified buffer
346     * @since 2.6
347     */
348    @Override
349    public ByteBuffer drain(final ByteBuffer buf) {
350        flushBuffer(buf);
351        return buf;
352    }
353
354    @Override
355    public void writeBytes(final ByteBuffer data) {
356        if (data.remaining() == 0) {
357          return;
358        }
359        synchronized (this) {
360          ByteBufferDestinationHelper.writeToUnsynchronized(data, this);
361        }
362    }
363}