diff options
author | 2017-05-11 08:00:24 -0400 | |
---|---|---|
committer | 2017-05-11 10:49:36 -0400 | |
commit | 8bd798d69a20d16f6017d5ec68fab7400c66143a (patch) | |
tree | 25cbe11b6e6a4979faf2b023b45e571790a27ffb /src/main | |
parent | 686dd375601aa0633d882664fc1542d0a2832352 (diff) |
BEP: send stdout/stderr in smaller chunks
While sending chunks of stdout/stderr when a progress event is sent anyway
is a good idea, we cannot entirely rely on this, as the amount of information
buffered might grow too big. So set a fixed limit after which we flush out
stdout and stderr; nevertheless, we still make sure we send buffers that were
given to us in a single call to write in one go.
Change-Id: Ie27bcf7d50671e003babd13cdb1d3f7fc1cb232f
PiperOrigin-RevId: 155736641
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java | 19 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java | 92 |
2 files changed, 92 insertions, 19 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java index e55ca36cb3..3b077f61c9 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java @@ -359,6 +359,25 @@ public class BuildEventStreamer implements EventHandler { } } + void flush() { + BuildEvent updateEvent; + synchronized (this) { + String out = null; + String err = null; + if (outErrProvider != null) { + out = outErrProvider.getOut(); + err = outErrProvider.getErr(); + } + updateEvent = ProgressEvent.progressUpdate(progressCount, out, err); + progressCount++; + announcedEvents.addAll(updateEvent.getChildrenEvents()); + postedEvents.add(updateEvent.getEventId()); + } + for (BuildEventTransport transport : transports) { + transport.sendBuildEvent(updateEvent, artifactGroupNamer); + } + } + @VisibleForTesting ImmutableSet<BuildEventTransport> getTransports() { return ImmutableSet.copyOf(transports); diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java index 9fa92186fb..0d983639e8 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java @@ -72,54 +72,102 @@ public class BuildEventStreamerModule extends BlazeModule { */ private static class SynchronizedOutputStream extends OutputStream { + // The maximal amount of bytes we intend to store in the buffer. However, + // the requirement that a single write be written in one go is more important, + // so the actual size we store in this buffer can be the maximum (not the sum) + // of this value and the amount of bytes written in a single call to the + // {@link write(byte[] buffer, int offset, int count)} method. + private static final long MAX_BUFFERED_LENGTH = 10 * 1024; + private byte[] buf; - private int count; + private long count; private boolean discardAll; + // The event streamer that is supposed to flush stdout/stderr. + private BuildEventStreamer streamer; + SynchronizedOutputStream() { buf = new byte[64]; count = 0; discardAll = false; } + void registerStreamer(BuildEventStreamer streamer) { + this.streamer = streamer; + } + public synchronized void setDiscardAll() { discardAll = true; count = 0; buf = null; } - @Override - public synchronized void write(int oneByte) throws IOException { - if (discardAll) { - return; - } - if (count == buf.length) { - byte[] newbuf = new byte[count * 2 + 1]; - System.arraycopy(buf, 0, newbuf, 0, count); - buf = newbuf; - } - buf[count++] = (byte) oneByte; - } - /** * Read the contents of the stream and simultaneously clear them. Also, reset the amount of * memory retained to a constant amount. */ synchronized String readAndReset() { - String content = new String(buf, 0, count, UTF_8); + String content = new String(buf, 0, (int) count, UTF_8); buf = new byte[64]; count = 0; return content; } - // While technically not needed, it is still a better user experience to have a write - // enter the stream in one go. @Override - public synchronized void write(byte[] buffer, int offset, int count) throws IOException { + public void write(int oneByte) throws IOException { if (discardAll) { return; } - super.write(buffer, offset, count); + // We change the dependency with respect to that of the super class: write(int) + // now calls write(int[], int, int) which is implemented without any dependencies. + write(new byte[] {(byte) oneByte}, 0, 1); + } + + @Override + public void write(byte[] buffer, int offset, int count) throws IOException { + // As we base the less common write(int) on this method, we may not depend not call write(int) + // directly or indirectly (e.g., by calling super.write(int[], int, int)). + synchronized (this) { + if (discardAll) { + return; + } + } + boolean shouldFlush = false; + // As we have to do the flushing outside the synchronized block, we have to expect + // other writes to come immediately after flushing, so we have to do the check inside + // a while loop. + boolean didWrite = false; + while (!didWrite) { + synchronized (this) { + if (this.count + (long) count < MAX_BUFFERED_LENGTH || this.count == 0) { + if (this.count + (long) count >= (long) buf.length) { + // We need to increase the buffer; if within the permissible range range for array + // sizes, we at least double it, otherwise we only increase as far as needed. + long newsize; + if (2 * (long) buf.length + count < (long) Integer.MAX_VALUE) { + newsize = 2 * (long) buf.length + count; + } else { + newsize = this.count + count; + } + byte[] newbuf = new byte[(int) newsize]; + System.arraycopy(buf, 0, newbuf, 0, (int) this.count); + this.buf = newbuf; + } + System.arraycopy(buffer, offset, buf, (int) this.count, count); + this.count += (long) count; + didWrite = true; + } else { + shouldFlush = true; + } + if (this.count >= MAX_BUFFERED_LENGTH) { + shouldFlush = true; + } + } + if (shouldFlush && streamer != null) { + streamer.flush(); + shouldFlush = false; + } + } } } @@ -180,6 +228,12 @@ public class BuildEventStreamerModule extends BlazeModule { return theErr.readAndReset(); } }); + if (theErr != null) { + theErr.registerStreamer(streamer); + } + if (theOut != null) { + theOut.registerStreamer(streamer); + } } else { // If there is no streamer to consume the output, we should not try to accumulate it. this.out.setDiscardAll(); |