// Copyright 2017 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.runtime; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; import java.io.OutputStream; /** * {@link OutputStream} suitably synchronized for producer-consumer use cases. * The method {@link #readAndReset()} allows to read the bytes accumulated so far * and simultaneously truncate precisely the bytes read. Moreover, upon such a reset * the amount of memory retained is reset to a small constant. This is a difference * with resecpt to the behaviour of the standard classes {@link ByteArrayOutputStream} * which only resets the index but keeps the array. This difference matters, as we need * to support output peeks without retaining this ammount of memory for the rest of the * build. * *

This class is expected to be used with the {@link BuildEventStreamer}. */ public class SynchronizedOutputStream extends OutputStream { // The maximal amount of bytes we intend to store in the buffer. However, // the requirement that a single write be written in one go is more important, // so the actual size we store in this buffer can be the maximum (not the sum) // of this value and the amount of bytes written in a single call to the // {@link write(byte[] buffer, int offset, int count)} method. private final long maxBufferedLength; private byte[] buf; private long count; private boolean discardAll; // The event streamer that is supposed to flush stdout/stderr. private BuildEventStreamer streamer; public SynchronizedOutputStream(long maxBufferedLength) { buf = new byte[64]; count = 0; discardAll = false; this.maxBufferedLength = maxBufferedLength; } public void registerStreamer(BuildEventStreamer streamer) { this.streamer = streamer; } public synchronized void setDiscardAll() { discardAll = true; count = 0; buf = null; } /** * Read the contents of the stream and simultaneously clear them. Also, reset the amount of * memory retained to a constant amount. */ public synchronized String readAndReset() { String content = new String(buf, 0, (int) count, UTF_8); buf = new byte[64]; count = 0; return content; } @Override public void write(int oneByte) throws IOException { if (discardAll) { return; } // We change the dependency with respect to that of the super class: write(int) // now calls write(int[], int, int) which is implemented without any dependencies. write(new byte[] {(byte) oneByte}, 0, 1); } @Override public void write(byte[] buffer, int offset, int count) throws IOException { // As we base the less common write(int) on this method, we may not depend not call write(int) // directly or indirectly (e.g., by calling super.write(int[], int, int)). synchronized (this) { if (discardAll) { return; } } boolean shouldFlush = false; // As we have to do the flushing outside the synchronized block, we have to expect // other writes to come immediately after flushing, so we have to do the check inside // a while loop. boolean didWrite = false; while (!didWrite) { synchronized (this) { if (this.count + (long) count < maxBufferedLength || this.count == 0) { if (this.count + (long) count >= (long) buf.length) { // We need to increase the buffer; if within the permissible range range for array // sizes, we at least double it, otherwise we only increase as far as needed. long newsize; if (2 * (long) buf.length + count < (long) Integer.MAX_VALUE) { newsize = 2 * (long) buf.length + count; } else { newsize = this.count + count; } byte[] newbuf = new byte[(int) newsize]; System.arraycopy(buf, 0, newbuf, 0, (int) this.count); this.buf = newbuf; } System.arraycopy(buffer, offset, buf, (int) this.count, count); this.count += (long) count; didWrite = true; } else { shouldFlush = true; } if (this.count >= maxBufferedLength) { shouldFlush = true; } } if (shouldFlush && streamer != null) { streamer.flush(); shouldFlush = false; } } } }