001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache license, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the license for the specific language governing permissions and
015 * limitations under the license.
016 */
017package org.apache.logging.log4j.core.appender;
018
019import java.io.IOException;
020import java.io.OutputStream;
021import java.io.Serializable;
022import java.nio.Buffer;
023import java.nio.ByteBuffer;
024import java.util.Objects;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.logging.log4j.core.Layout;
028import org.apache.logging.log4j.core.LoggerContext;
029import org.apache.logging.log4j.core.layout.ByteBufferDestination;
030import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper;
031import org.apache.logging.log4j.core.util.Constants;
032
033/**
034 * Manages an OutputStream so that it can be shared by multiple Appenders and will
035 * allow appenders to reconfigure without requiring a new stream.
036 */
037public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
038    protected final Layout<?> layout;
039    protected ByteBuffer byteBuffer;
040    private volatile OutputStream outputStream;
041    private boolean skipFooter;
042
043    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
044            final boolean writeHeader) {
045        // Can't use new ctor because it throws an exception
046        this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
047    }
048
049    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
050            final boolean writeHeader, final int bufferSize) {
051        // Can't use new ctor because it throws an exception
052        this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
053    }
054
055    /**
056     * @since 2.6
057     * @deprecated
058     */
059    @Deprecated
060    protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
061            final boolean writeHeader, final ByteBuffer byteBuffer) {
062        super(null, streamName);
063        this.outputStream = os;
064        this.layout = layout;
065        if (writeHeader && layout != null) {
066            final byte[] header = layout.getHeader();
067            if (header != null) {
068                try {
069                    getOutputStream().write(header, 0, header.length);
070                } catch (final IOException e) {
071                    logError("Unable to write header", e);
072                }
073            }
074        }
075        this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
076    }
077
078    /**
079     * @since 2.7
080     */
081    protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
082            final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
083            final ByteBuffer byteBuffer) {
084        super(loggerContext, streamName);
085        if (createOnDemand && os != null) {
086            LOGGER.error(
087                    "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
088                    streamName);
089        }
090        this.layout = layout;
091        this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
092        this.outputStream = os;
093        if (writeHeader && layout != null) {
094            final byte[] header = layout.getHeader();
095            if (header != null) {
096                try {
097                    getOutputStream().write(header, 0, header.length);
098                } catch (final IOException e) {
099                    logError("Unable to write header for " + streamName, e);
100                }
101            }
102        }
103    }
104
105    /**
106     * Creates a Manager.
107     *
108     * @param name The name of the stream to manage.
109     * @param data The data to pass to the Manager.
110     * @param factory The factory to use to create the Manager.
111     * @param <T> The type of the OutputStreamManager.
112     * @return An OutputStreamManager.
113     */
114    public static <T> OutputStreamManager getManager(final String name, final T data,
115                                                 final ManagerFactory<? extends OutputStreamManager, T> factory) {
116        return AbstractManager.getManager(name, factory, data);
117    }
118
119    @SuppressWarnings("unused")
120    protected OutputStream createOutputStream() throws IOException {
121        throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
122    }
123
124    /**
125     * Indicate whether the footer should be skipped or not.
126     * @param skipFooter true if the footer should be skipped.
127     */
128    public void skipFooter(final boolean skipFooter) {
129        this.skipFooter = skipFooter;
130    }
131
132    /**
133     * Default hook to write footer during close.
134     */
135    @Override
136    public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
137        writeFooter();
138        return closeOutputStream();
139    }
140
141    /**
142     * Writes the footer.
143     */
144    protected void writeFooter() {
145        if (layout == null || skipFooter) {
146            return;
147        }
148        final byte[] footer = layout.getFooter();
149        if (footer != null) {
150            write(footer);
151        }
152    }
153
154    /**
155     * Returns the status of the stream.
156     * @return true if the stream is open, false if it is not.
157     */
158    public boolean isOpen() {
159        return getCount() > 0;
160    }
161
162    public boolean hasOutputStream() {
163        return outputStream != null;
164    }
165
166    protected OutputStream getOutputStream() throws IOException {
167        if (outputStream == null) {
168            outputStream = createOutputStream();
169        }
170        return outputStream;
171    }
172
173    protected void setOutputStream(final OutputStream os) {
174        final byte[] header = layout.getHeader();
175        if (header != null) {
176            try {
177                os.write(header, 0, header.length);
178                this.outputStream = os; // only update field if os.write() succeeded
179            } catch (final IOException ioe) {
180                logError("Unable to write header", ioe);
181            }
182        } else {
183            this.outputStream = os;
184        }
185    }
186
187    /**
188     * Some output streams synchronize writes while others do not.
189     * @param bytes The serialized Log event.
190     * @throws AppenderLoggingException if an error occurs.
191     */
192    protected void write(final byte[] bytes)  {
193        write(bytes, 0, bytes.length, false);
194    }
195
196    /**
197     * Some output streams synchronize writes while others do not.
198     * @param bytes The serialized Log event.
199     * @param immediateFlush If true, flushes after writing.
200     * @throws AppenderLoggingException if an error occurs.
201     */
202    protected void write(final byte[] bytes, final boolean immediateFlush)  {
203        write(bytes, 0, bytes.length, immediateFlush);
204    }
205
206    @Override
207    public void writeBytes(final byte[] data, final int offset, final int length) {
208        write(data, offset, length, false);
209    }
210
211    /**
212     * Some output streams synchronize writes while others do not. Synchronizing here insures that
213     * log events won't be intertwined.
214     * @param bytes The serialized Log event.
215     * @param offset The offset into the byte array.
216     * @param length The number of bytes to write.
217     * @throws AppenderLoggingException if an error occurs.
218     */
219    protected void write(final byte[] bytes, final int offset, final int length) {
220        writeBytes(bytes, offset, length);
221    }
222
223    /**
224     * Some output streams synchronize writes while others do not. Synchronizing here insures that
225     * log events won't be intertwined.
226     * @param bytes The serialized Log event.
227     * @param offset The offset into the byte array.
228     * @param length The number of bytes to write.
229     * @param immediateFlush flushes immediately after writing.
230     * @throws AppenderLoggingException if an error occurs.
231     */
232    protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
233        if (immediateFlush && byteBuffer.position() == 0) {
234            writeToDestination(bytes, offset, length);
235            flushDestination();
236            return;
237        }
238        if (length >= byteBuffer.capacity()) {
239            // if request length exceeds buffer capacity, flush the buffer and write the data directly
240            flush();
241            writeToDestination(bytes, offset, length);
242        } else {
243            if (length > byteBuffer.remaining()) {
244                flush();
245            }
246            byteBuffer.put(bytes, offset, length);
247        }
248        if (immediateFlush) {
249            flush();
250        }
251    }
252
253    /**
254     * Writes the specified section of the specified byte array to the stream.
255     *
256     * @param bytes the array containing data
257     * @param offset from where to write
258     * @param length how many bytes to write
259     * @since 2.6
260     */
261    protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
262        try {
263            getOutputStream().write(bytes, offset, length);
264        } catch (final IOException ex) {
265            throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
266        }
267    }
268
269    /**
270     * Calls {@code flush()} on the underlying output stream.
271     * @since 2.6
272     */
273    protected synchronized void flushDestination() {
274        final OutputStream stream = outputStream; // access volatile field only once per method
275        if (stream != null) {
276            try {
277                stream.flush();
278            } catch (final IOException ex) {
279                throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
280            }
281        }
282    }
283
284    /**
285     * Drains the ByteBufferDestination's buffer into the destination. By default this calls
286     * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
287     * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
288     *
289     * @see #flushDestination()
290     * @since 2.6
291     */
292    protected synchronized void flushBuffer(final ByteBuffer buf) {
293        ((Buffer) buf).flip();
294        if (buf.remaining() > 0) {
295            writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
296        }
297        buf.clear();
298    }
299
300    /**
301     * Flushes any buffers.
302     */
303    public synchronized void flush() {
304        flushBuffer(byteBuffer);
305        flushDestination();
306    }
307
308    protected synchronized boolean closeOutputStream() {
309        flush();
310        final OutputStream stream = outputStream; // access volatile field only once per method
311        if (stream == null || stream == System.out || stream == System.err) {
312            return true;
313        }
314        try {
315            stream.close();
316        } catch (final IOException ex) {
317            logError("Unable to close stream", ex);
318            return false;
319        }
320        return true;
321    }
322
323    /**
324     * Returns this {@code ByteBufferDestination}'s buffer.
325     * @return the buffer
326     * @since 2.6
327     */
328    @Override
329    public ByteBuffer getByteBuffer() {
330        return byteBuffer;
331    }
332
333    /**
334     * Drains the ByteBufferDestination's buffer into the destination. By default this calls
335     * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
336     * <p>
337     * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
338     * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
339     * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
340     * buffer.
341     * </p><p>
342     * To just flush the buffered contents to the underlying stream, call
343     * {@link #flushBuffer(ByteBuffer)} directly instead.
344     * </p>
345     *
346     * @param buf the buffer whose contents to write the the destination
347     * @return the specified buffer
348     * @since 2.6
349     */
350    @Override
351    public ByteBuffer drain(final ByteBuffer buf) {
352        flushBuffer(buf);
353        return buf;
354    }
355
356    @Override
357    public void writeBytes(final ByteBuffer data) {
358        if (data.remaining() == 0) {
359          return;
360        }
361        synchronized (this) {
362          ByteBufferDestinationHelper.writeToUnsynchronized(data, this);
363        }
364    }
365}