aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main
diff options
context:
space:
mode:
authorGravatar Klaus Aehlig <aehlig@google.com>2017-05-11 08:00:24 -0400
committerGravatar Kristina Chodorow <kchodorow@google.com>2017-05-11 10:49:36 -0400
commit8bd798d69a20d16f6017d5ec68fab7400c66143a (patch)
tree25cbe11b6e6a4979faf2b023b45e571790a27ffb /src/main
parent686dd375601aa0633d882664fc1542d0a2832352 (diff)
BEP: send stdout/stderr in smaller chunks
While sending chunks of stdout/stderr when a progress event is sent anyway is a good idea, we cannot entirely rely on this, as the amount of information buffered might grow too big. So set a fixed limit after which we flush out stdout and stderr; nevertheless, we still make sure we send buffers that were given to us in a single call to write in one go. Change-Id: Ie27bcf7d50671e003babd13cdb1d3f7fc1cb232f PiperOrigin-RevId: 155736641
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java92
2 files changed, 92 insertions, 19 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
index e55ca36cb3..3b077f61c9 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -359,6 +359,25 @@ public class BuildEventStreamer implements EventHandler {
}
}
+ void flush() {
+ BuildEvent updateEvent;
+ synchronized (this) {
+ String out = null;
+ String err = null;
+ if (outErrProvider != null) {
+ out = outErrProvider.getOut();
+ err = outErrProvider.getErr();
+ }
+ updateEvent = ProgressEvent.progressUpdate(progressCount, out, err);
+ progressCount++;
+ announcedEvents.addAll(updateEvent.getChildrenEvents());
+ postedEvents.add(updateEvent.getEventId());
+ }
+ for (BuildEventTransport transport : transports) {
+ transport.sendBuildEvent(updateEvent, artifactGroupNamer);
+ }
+ }
+
@VisibleForTesting
ImmutableSet<BuildEventTransport> getTransports() {
return ImmutableSet.copyOf(transports);
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java
index 9fa92186fb..0d983639e8 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamerModule.java
@@ -72,54 +72,102 @@ public class BuildEventStreamerModule extends BlazeModule {
*/
private static class SynchronizedOutputStream extends OutputStream {
+ // The maximal amount of bytes we intend to store in the buffer. However,
+ // the requirement that a single write be written in one go is more important,
+ // so the actual size we store in this buffer can be the maximum (not the sum)
+ // of this value and the amount of bytes written in a single call to the
+ // {@link write(byte[] buffer, int offset, int count)} method.
+ private static final long MAX_BUFFERED_LENGTH = 10 * 1024;
+
private byte[] buf;
- private int count;
+ private long count;
private boolean discardAll;
+ // The event streamer that is supposed to flush stdout/stderr.
+ private BuildEventStreamer streamer;
+
SynchronizedOutputStream() {
buf = new byte[64];
count = 0;
discardAll = false;
}
+ void registerStreamer(BuildEventStreamer streamer) {
+ this.streamer = streamer;
+ }
+
public synchronized void setDiscardAll() {
discardAll = true;
count = 0;
buf = null;
}
- @Override
- public synchronized void write(int oneByte) throws IOException {
- if (discardAll) {
- return;
- }
- if (count == buf.length) {
- byte[] newbuf = new byte[count * 2 + 1];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
- buf[count++] = (byte) oneByte;
- }
-
/**
* Read the contents of the stream and simultaneously clear them. Also, reset the amount of
* memory retained to a constant amount.
*/
synchronized String readAndReset() {
- String content = new String(buf, 0, count, UTF_8);
+ String content = new String(buf, 0, (int) count, UTF_8);
buf = new byte[64];
count = 0;
return content;
}
- // While technically not needed, it is still a better user experience to have a write
- // enter the stream in one go.
@Override
- public synchronized void write(byte[] buffer, int offset, int count) throws IOException {
+ public void write(int oneByte) throws IOException {
if (discardAll) {
return;
}
- super.write(buffer, offset, count);
+ // We change the dependency with respect to that of the super class: write(int)
+ // now calls write(int[], int, int) which is implemented without any dependencies.
+ write(new byte[] {(byte) oneByte}, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] buffer, int offset, int count) throws IOException {
+ // As we base the less common write(int) on this method, we may not depend not call write(int)
+ // directly or indirectly (e.g., by calling super.write(int[], int, int)).
+ synchronized (this) {
+ if (discardAll) {
+ return;
+ }
+ }
+ boolean shouldFlush = false;
+ // As we have to do the flushing outside the synchronized block, we have to expect
+ // other writes to come immediately after flushing, so we have to do the check inside
+ // a while loop.
+ boolean didWrite = false;
+ while (!didWrite) {
+ synchronized (this) {
+ if (this.count + (long) count < MAX_BUFFERED_LENGTH || this.count == 0) {
+ if (this.count + (long) count >= (long) buf.length) {
+ // We need to increase the buffer; if within the permissible range range for array
+ // sizes, we at least double it, otherwise we only increase as far as needed.
+ long newsize;
+ if (2 * (long) buf.length + count < (long) Integer.MAX_VALUE) {
+ newsize = 2 * (long) buf.length + count;
+ } else {
+ newsize = this.count + count;
+ }
+ byte[] newbuf = new byte[(int) newsize];
+ System.arraycopy(buf, 0, newbuf, 0, (int) this.count);
+ this.buf = newbuf;
+ }
+ System.arraycopy(buffer, offset, buf, (int) this.count, count);
+ this.count += (long) count;
+ didWrite = true;
+ } else {
+ shouldFlush = true;
+ }
+ if (this.count >= MAX_BUFFERED_LENGTH) {
+ shouldFlush = true;
+ }
+ }
+ if (shouldFlush && streamer != null) {
+ streamer.flush();
+ shouldFlush = false;
+ }
+ }
}
}
@@ -180,6 +228,12 @@ public class BuildEventStreamerModule extends BlazeModule {
return theErr.readAndReset();
}
});
+ if (theErr != null) {
+ theErr.registerStreamer(streamer);
+ }
+ if (theOut != null) {
+ theOut.registerStreamer(streamer);
+ }
} else {
// If there is no streamer to consume the output, we should not try to accumulate it.
this.out.setDiscardAll();