aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-03-22 08:02:18 +0000
committerGravatar Lukacs Berki <lberki@google.com>2016-03-22 08:09:59 +0000
commit4dd60f9cef8a206bac8f83af8153bb8e335479bc (patch)
tree4d737bef42bfb20f707bb5362b6bbdb41d30f2f4 /src/main/java/com/google/devtools/build/lib
parentb4c00b6eead53ba9381bab12765b2c4ed98a61d1 (diff)
Revamp the client/server communication protocol so that it is portable to Windows.
Progress towards #930. -- MOS_MIGRATED_REVID=117799006
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/RPCServer.java20
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/ServerResponse.java116
-rw-r--r--src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java99
-rw-r--r--src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java55
4 files changed, 62 insertions, 228 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
index 4302f3c88c..206030b45d 100644
--- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
@@ -14,8 +14,6 @@
package com.google.devtools.build.lib.server;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
@@ -427,10 +425,15 @@ public final class RPCServer {
* blaze.cc) to interface with Unix APIs.
*/
private static List<String> readRequest(InputStream input) throws IOException {
- byte[] inputBytes = ByteStreams.toByteArray(input);
- if (inputBytes.length == 0) {
- return null;
- }
+ byte[] sizeBuffer = new byte[4];
+ ByteStreams.readFully(input, sizeBuffer);
+ int size = ((sizeBuffer[0] & 0xff) << 24)
+ + ((sizeBuffer[1] & 0xff) << 16)
+ + ((sizeBuffer[2] & 0xff) << 8)
+ + (sizeBuffer[3] & 0xff);
+ byte[] inputBytes = new byte[size];
+ ByteStreams.readFully(input, inputBytes);
+
String s = new String(inputBytes, Charset.defaultCharset());
return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s));
}
@@ -522,7 +525,10 @@ public final class RPCServer {
// exit code.
flushOutErr();
try {
- controlChannel.write(("" + exitStatus + "\n").getBytes(UTF_8));
+ controlChannel.write((exitStatus >> 24) & 0xff);
+ controlChannel.write((exitStatus >> 16) & 0xff);
+ controlChannel.write((exitStatus >> 8) & 0xff);
+ controlChannel.write(exitStatus & 0xff);
controlChannel.flush();
LOG.info("" + exitStatus);
} catch (IOException ignored) {
diff --git a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java b/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java
deleted file mode 100644
index 3589bb2fb0..0000000000
--- a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java
+++ /dev/null
@@ -1,116 +0,0 @@
-// Copyright 2014 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.server;
-
-import com.google.devtools.build.lib.util.Preconditions;
-
-import java.io.ByteArrayOutputStream;
-import java.io.UnsupportedEncodingException;
-
-/**
- * This class models a response from the {@link RPCServer}. This is a
- * tuple of an error message and the exit status. The encoding of the response
- * is extremely simple {@link #toString()}:
- *
- * <ul><li>Iff a message is present, the wire format is
- * <pre>message + '\n' + exit code as string + '\n'</pre>
- * </li>
- * <li>Otherwise it's just the exit code as string + '\n'</li>
- * </ul>
- */
-final class ServerResponse {
-
- /**
- * Parses an input string into a {@link ServerResponse} object.
- */
- public static ServerResponse parseFrom(String input) {
- if (input.charAt(input.length() - 1) != '\n') {
- String msg = "Response must end with newline (" + input + ")";
- throw new IllegalArgumentException(msg);
- }
- int newlineAt = input.lastIndexOf('\n', input.length() - 2);
-
- final String exitStatusString;
- final String errorMessage;
- if (newlineAt == -1) {
- errorMessage = "";
- exitStatusString = input.substring(0, input.length() - 1);
- } else {
- errorMessage = input.substring(0, newlineAt);
- exitStatusString = input.substring(newlineAt + 1, input.length() - 1);
- }
-
- return new ServerResponse(errorMessage, Integer.parseInt(exitStatusString));
- }
-
- /**
- * Parses {@code bytes} into a {@link ServerResponse} instance, assuming
- * Latin 1 encoding.
- */
- public static ServerResponse parseFrom(byte[] bytes) {
- try {
- return parseFrom(new String(bytes, "ISO-8859-1"));
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e); // Latin 1 is everywhere.
- }
- }
-
- /**
- * Parses {@code bytes} into a {@link ServerResponse} instance, assuming
- * Latin 1 encoding.
- */
- public static ServerResponse parseFrom(ByteArrayOutputStream bytes) {
- return parseFrom(bytes.toByteArray());
- }
-
- private final String errorMessage;
- private final int exitStatus;
-
- /**
- * Construct a new instance given an error message and an exit status.
- */
- public ServerResponse(String errorMessage, int exitStatus) {
- Preconditions.checkNotNull(errorMessage);
- this.errorMessage = errorMessage;
- this.exitStatus = exitStatus;
- }
-
- /**
- * The wire representation of this response object.
- */
- @Override
- public String toString() {
- if (errorMessage.length() == 0) {
- return Integer.toString(exitStatus) + '\n';
- }
- return errorMessage + '\n' + exitStatus + '\n';
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof ServerResponse)) {
- return false;
- }
- ServerResponse otherResponse = (ServerResponse) other;
- return exitStatus == otherResponse.exitStatus
- && errorMessage.equals(otherResponse.errorMessage);
- }
-
- @Override
- public int hashCode() {
- return exitStatus * 31 ^ errorMessage.hashCode();
- }
-
-}
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
index 051f218515..ca6d2b5e09 100644
--- a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
+++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
@@ -48,9 +48,6 @@ public final class StreamDemultiplexer extends OutputStream {
}
}
- private static final byte AT = '@';
- private static final byte NEWLINE = '\n';
-
/**
* The output streams, conveniently in an array indexed by the marker byte.
* Some of these will be null, most likely.
@@ -64,22 +61,23 @@ public final class StreamDemultiplexer extends OutputStream {
* parse things.
*/
private enum State {
- EXPECT_CONTROL_STARTING_AT,
EXPECT_MARKER_BYTE,
- EXPECT_AT_OR_NEWLINE,
- EXPECT_PAYLOAD_OR_NEWLINE
+ EXPECT_SIZE,
+ EXPECT_PAYLOAD,
}
- private State state = State.EXPECT_CONTROL_STARTING_AT;
- private boolean addNewlineToPayload;
+ private final int[] sizeBuffer = new int[4];
+ private State state = State.EXPECT_MARKER_BYTE;
private OutputStream selectedStream;
+ private int currentSizeByte = 0;
+ private int payloadBytesLeft = 0;
/**
* Construct a new demultiplexer. The {@code smallestMarkerByte} indicates
* the marker byte we would expect for {@code outputStreams[0]} to be used.
* So, if this first stream is your stdout and you're using the
* {@link StreamMultiplexer}, then you will need to set this to
- * {@code '1'}. Because {@link StreamDemultiplexer} extends
+ * {@code 1}. Because {@link StreamDemultiplexer} extends
* {@link OutputStream}, this constructor effectively creates an
* {@link OutputStream} instance which demultiplexes the tagged data client
* code writes to it into {@code outputStreams}.
@@ -95,70 +93,35 @@ public final class StreamDemultiplexer extends OutputStream {
public void write(int b) throws IOException {
// This dispatch traverses the finite state machine / grammar.
switch (state) {
- case EXPECT_CONTROL_STARTING_AT:
- parseControlStartingAt((byte) b);
- resetFields();
- break;
case EXPECT_MARKER_BYTE:
- parseMarkerByte((byte) b);
+ parseMarkerByte(b);
break;
- case EXPECT_AT_OR_NEWLINE:
- parseAtOrNewline((byte) b);
+ case EXPECT_SIZE:
+ parseSize(b);
break;
- case EXPECT_PAYLOAD_OR_NEWLINE:
- parsePayloadOrNewline((byte) b);
+ case EXPECT_PAYLOAD:
+ parsePayload(b);
break;
}
}
- /**
- * Handles {@link State#EXPECT_PAYLOAD_OR_NEWLINE}, which is the payload
- * we are actually transporting over the wire. At this point we can rely
- * on a stream having been preselected into {@link #selectedStream}, and
- * also we will add a newline if {@link #addNewlineToPayload} is set.
- * Flushes at the end of every payload segment.
- */
- private void parsePayloadOrNewline(byte b) throws IOException {
- if (b == NEWLINE) {
- if (addNewlineToPayload) {
- selectedStream.write(NEWLINE);
- }
- selectedStream.flush();
- state = State.EXPECT_CONTROL_STARTING_AT;
- } else {
- selectedStream.write(b);
- selectedStream.flush(); // slow?
+ private void parseSize(int b) {
+ sizeBuffer[currentSizeByte] = b;
+ currentSizeByte += 1;
+ if (currentSizeByte == 4) {
+ state = State.EXPECT_PAYLOAD;
+ payloadBytesLeft = (sizeBuffer[0] << 24)
+ + (sizeBuffer[1] << 16)
+ + (sizeBuffer[2] << 8)
+ + sizeBuffer[3];
}
}
/**
- * Handles {@link State#EXPECT_AT_OR_NEWLINE}, which is either the
- * suppress newline indicator (at) at the end of a control line, or the end
- * of a control line.
- */
- private void parseAtOrNewline(byte b) throws IOException {
- if (b == NEWLINE) {
- state = State.EXPECT_PAYLOAD_OR_NEWLINE;
- } else if (b == AT) {
- addNewlineToPayload = false;
- } else {
- throw new IOException("Expected @ or \\n. (" + b + ")");
- }
- }
-
- /**
- * Reset the fields that are affected by our state.
- */
- private void resetFields() {
- selectedStream = null;
- addNewlineToPayload = true;
- }
-
- /**
* Handles {@link State#EXPECT_MARKER_BYTE}. The byte determines which stream
* we will be using, and will set {@link #selectedStream}.
*/
- private void parseMarkerByte(byte markerByte) throws IOException {
+ private void parseMarkerByte(int markerByte) throws IOException {
if (markerByte < 0 || markerByte > Byte.MAX_VALUE) {
String msg = "Illegal marker byte (" + markerByte + ")";
throw new IllegalArgumentException(msg);
@@ -168,19 +131,15 @@ public final class StreamDemultiplexer extends OutputStream {
throw new IOException("stream " + markerByte + " not registered.");
}
selectedStream = outputStreams[markerByte];
- state = State.EXPECT_AT_OR_NEWLINE;
+ state = State.EXPECT_SIZE;
+ currentSizeByte = 0;
}
- /**
- * Handles {@link State#EXPECT_CONTROL_STARTING_AT}, the very first '@' with
- * which each message starts.
- */
- private void parseControlStartingAt(byte b) throws IOException {
- if (b != AT) {
- throw new IOException("Expected control starting @. (" + b + ", "
- + (char) b + ")");
+ private void parsePayload(int b) throws IOException {
+ selectedStream.write(b);
+ payloadBytesLeft -= 1;
+ if (payloadBytesLeft == 0) {
+ state = State.EXPECT_MARKER_BYTE;
}
- state = State.EXPECT_MARKER_BYTE;
}
-
}
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java
index 263b98a274..209dd302bc 100644
--- a/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java
+++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java
@@ -28,37 +28,28 @@ import java.io.OutputStream;
* end of a networking connection can simply read the tagged lines and then act
* on them within a sigle thread.
*
- * The format of the tagged output stream is as follows:
+ * The format of the tagged output stream is reasonably simple:
+ * <ol>
+ * <li>
+ * Marker byte indicating whether that chunk is for stdout (1), stderr (2) or the control
+ * stream (3).
+ * </li>
+ * <li>
+ * 4 bytes indicating the length of the chunk in high-endian format.
+ * </li>
+ * <li>
+ * The payload (as many bytes as the length field before)
+ * </li>
+ * </ol>>
*
- * <pre>
- * combined :: = [ control_line payload ... ]+
- * control_line :: = '@' marker '@'? '\n'
- * payload :: = r'^[^\n]*\n'
- * </pre>
*
- * So basically:
- * <ul>
- * <li>Control lines alternate with payload lines</li>
- * <li>Both types of lines end with a newline, and never have a newline in
- * them.</li>
- * <li>The marker indicates which stream we mean.
- * For now, '1'=stdout, '2'=stderr.</li>
- * <li>The optional second '@' indicates that the following line is
- * incomplete.</li>
- * </ul>
- *
- * This format is optimized for easy interpretation by a Python client, but it's
- * also a compromise in that it's still easy to interpret by a human (let's say
- * you have to read the traffic over a wire for some reason).
*/
@ThreadSafe
public final class StreamMultiplexer {
- public static final byte STDOUT_MARKER = '1';
- public static final byte STDERR_MARKER = '2';
- public static final byte CONTROL_MARKER = '3';
-
- private static final byte AT = '@';
+ public static final byte STDOUT_MARKER = 1;
+ public static final byte STDERR_MARKER = 2;
+ public static final byte CONTROL_MARKER = 3;
private final Object mutex = new Object();
private final OutputStream multiplexed;
@@ -82,19 +73,13 @@ public final class StreamMultiplexer {
multiplexed.flush();
return;
}
- byte lastByte = buffer[len - 1];
- boolean lineIsIncomplete = lastByte != NEWLINE;
- multiplexed.write(AT);
multiplexed.write(markerByte);
- if (lineIsIncomplete) {
- multiplexed.write(AT);
- }
- multiplexed.write(NEWLINE);
+ multiplexed.write((len >> 24) & 0xff);
+ multiplexed.write((len >> 16) & 0xff);
+ multiplexed.write((len >> 8) & 0xff);
+ multiplexed.write(len & 0xff);
multiplexed.write(buffer, 0, len);
- if (lineIsIncomplete) {
- multiplexed.write(NEWLINE);
- }
multiplexed.flush();
}
len = 0;