diff options
author | 2016-03-22 08:02:18 +0000 | |
---|---|---|
committer | 2016-03-22 08:09:59 +0000 | |
commit | 4dd60f9cef8a206bac8f83af8153bb8e335479bc (patch) | |
tree | 4d737bef42bfb20f707bb5362b6bbdb41d30f2f4 /src/main/java/com/google/devtools/build/lib | |
parent | b4c00b6eead53ba9381bab12765b2c4ed98a61d1 (diff) |
Revamp the client/server communication protocol so that it is portable to Windows.
Progress towards #930.
--
MOS_MIGRATED_REVID=117799006
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib')
4 files changed, 62 insertions, 228 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java index 4302f3c88c..206030b45d 100644 --- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java @@ -14,8 +14,6 @@ package com.google.devtools.build.lib.server; -import static java.nio.charset.StandardCharsets.UTF_8; - import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; @@ -427,10 +425,15 @@ public final class RPCServer { * blaze.cc) to interface with Unix APIs. */ private static List<String> readRequest(InputStream input) throws IOException { - byte[] inputBytes = ByteStreams.toByteArray(input); - if (inputBytes.length == 0) { - return null; - } + byte[] sizeBuffer = new byte[4]; + ByteStreams.readFully(input, sizeBuffer); + int size = ((sizeBuffer[0] & 0xff) << 24) + + ((sizeBuffer[1] & 0xff) << 16) + + ((sizeBuffer[2] & 0xff) << 8) + + (sizeBuffer[3] & 0xff); + byte[] inputBytes = new byte[size]; + ByteStreams.readFully(input, inputBytes); + String s = new String(inputBytes, Charset.defaultCharset()); return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s)); } @@ -522,7 +525,10 @@ public final class RPCServer { // exit code. flushOutErr(); try { - controlChannel.write(("" + exitStatus + "\n").getBytes(UTF_8)); + controlChannel.write((exitStatus >> 24) & 0xff); + controlChannel.write((exitStatus >> 16) & 0xff); + controlChannel.write((exitStatus >> 8) & 0xff); + controlChannel.write(exitStatus & 0xff); controlChannel.flush(); LOG.info("" + exitStatus); } catch (IOException ignored) { diff --git a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java b/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java deleted file mode 100644 index 3589bb2fb0..0000000000 --- a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.server; - -import com.google.devtools.build.lib.util.Preconditions; - -import java.io.ByteArrayOutputStream; -import java.io.UnsupportedEncodingException; - -/** - * This class models a response from the {@link RPCServer}. This is a - * tuple of an error message and the exit status. The encoding of the response - * is extremely simple {@link #toString()}: - * - * <ul><li>Iff a message is present, the wire format is - * <pre>message + '\n' + exit code as string + '\n'</pre> - * </li> - * <li>Otherwise it's just the exit code as string + '\n'</li> - * </ul> - */ -final class ServerResponse { - - /** - * Parses an input string into a {@link ServerResponse} object. - */ - public static ServerResponse parseFrom(String input) { - if (input.charAt(input.length() - 1) != '\n') { - String msg = "Response must end with newline (" + input + ")"; - throw new IllegalArgumentException(msg); - } - int newlineAt = input.lastIndexOf('\n', input.length() - 2); - - final String exitStatusString; - final String errorMessage; - if (newlineAt == -1) { - errorMessage = ""; - exitStatusString = input.substring(0, input.length() - 1); - } else { - errorMessage = input.substring(0, newlineAt); - exitStatusString = input.substring(newlineAt + 1, input.length() - 1); - } - - return new ServerResponse(errorMessage, Integer.parseInt(exitStatusString)); - } - - /** - * Parses {@code bytes} into a {@link ServerResponse} instance, assuming - * Latin 1 encoding. - */ - public static ServerResponse parseFrom(byte[] bytes) { - try { - return parseFrom(new String(bytes, "ISO-8859-1")); - } catch (UnsupportedEncodingException e) { - throw new AssertionError(e); // Latin 1 is everywhere. - } - } - - /** - * Parses {@code bytes} into a {@link ServerResponse} instance, assuming - * Latin 1 encoding. - */ - public static ServerResponse parseFrom(ByteArrayOutputStream bytes) { - return parseFrom(bytes.toByteArray()); - } - - private final String errorMessage; - private final int exitStatus; - - /** - * Construct a new instance given an error message and an exit status. - */ - public ServerResponse(String errorMessage, int exitStatus) { - Preconditions.checkNotNull(errorMessage); - this.errorMessage = errorMessage; - this.exitStatus = exitStatus; - } - - /** - * The wire representation of this response object. - */ - @Override - public String toString() { - if (errorMessage.length() == 0) { - return Integer.toString(exitStatus) + '\n'; - } - return errorMessage + '\n' + exitStatus + '\n'; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ServerResponse)) { - return false; - } - ServerResponse otherResponse = (ServerResponse) other; - return exitStatus == otherResponse.exitStatus - && errorMessage.equals(otherResponse.errorMessage); - } - - @Override - public int hashCode() { - return exitStatus * 31 ^ errorMessage.hashCode(); - } - -} diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java index 051f218515..ca6d2b5e09 100644 --- a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java @@ -48,9 +48,6 @@ public final class StreamDemultiplexer extends OutputStream { } } - private static final byte AT = '@'; - private static final byte NEWLINE = '\n'; - /** * The output streams, conveniently in an array indexed by the marker byte. * Some of these will be null, most likely. @@ -64,22 +61,23 @@ public final class StreamDemultiplexer extends OutputStream { * parse things. */ private enum State { - EXPECT_CONTROL_STARTING_AT, EXPECT_MARKER_BYTE, - EXPECT_AT_OR_NEWLINE, - EXPECT_PAYLOAD_OR_NEWLINE + EXPECT_SIZE, + EXPECT_PAYLOAD, } - private State state = State.EXPECT_CONTROL_STARTING_AT; - private boolean addNewlineToPayload; + private final int[] sizeBuffer = new int[4]; + private State state = State.EXPECT_MARKER_BYTE; private OutputStream selectedStream; + private int currentSizeByte = 0; + private int payloadBytesLeft = 0; /** * Construct a new demultiplexer. The {@code smallestMarkerByte} indicates * the marker byte we would expect for {@code outputStreams[0]} to be used. * So, if this first stream is your stdout and you're using the * {@link StreamMultiplexer}, then you will need to set this to - * {@code '1'}. Because {@link StreamDemultiplexer} extends + * {@code 1}. Because {@link StreamDemultiplexer} extends * {@link OutputStream}, this constructor effectively creates an * {@link OutputStream} instance which demultiplexes the tagged data client * code writes to it into {@code outputStreams}. @@ -95,70 +93,35 @@ public final class StreamDemultiplexer extends OutputStream { public void write(int b) throws IOException { // This dispatch traverses the finite state machine / grammar. switch (state) { - case EXPECT_CONTROL_STARTING_AT: - parseControlStartingAt((byte) b); - resetFields(); - break; case EXPECT_MARKER_BYTE: - parseMarkerByte((byte) b); + parseMarkerByte(b); break; - case EXPECT_AT_OR_NEWLINE: - parseAtOrNewline((byte) b); + case EXPECT_SIZE: + parseSize(b); break; - case EXPECT_PAYLOAD_OR_NEWLINE: - parsePayloadOrNewline((byte) b); + case EXPECT_PAYLOAD: + parsePayload(b); break; } } - /** - * Handles {@link State#EXPECT_PAYLOAD_OR_NEWLINE}, which is the payload - * we are actually transporting over the wire. At this point we can rely - * on a stream having been preselected into {@link #selectedStream}, and - * also we will add a newline if {@link #addNewlineToPayload} is set. - * Flushes at the end of every payload segment. - */ - private void parsePayloadOrNewline(byte b) throws IOException { - if (b == NEWLINE) { - if (addNewlineToPayload) { - selectedStream.write(NEWLINE); - } - selectedStream.flush(); - state = State.EXPECT_CONTROL_STARTING_AT; - } else { - selectedStream.write(b); - selectedStream.flush(); // slow? + private void parseSize(int b) { + sizeBuffer[currentSizeByte] = b; + currentSizeByte += 1; + if (currentSizeByte == 4) { + state = State.EXPECT_PAYLOAD; + payloadBytesLeft = (sizeBuffer[0] << 24) + + (sizeBuffer[1] << 16) + + (sizeBuffer[2] << 8) + + sizeBuffer[3]; } } /** - * Handles {@link State#EXPECT_AT_OR_NEWLINE}, which is either the - * suppress newline indicator (at) at the end of a control line, or the end - * of a control line. - */ - private void parseAtOrNewline(byte b) throws IOException { - if (b == NEWLINE) { - state = State.EXPECT_PAYLOAD_OR_NEWLINE; - } else if (b == AT) { - addNewlineToPayload = false; - } else { - throw new IOException("Expected @ or \\n. (" + b + ")"); - } - } - - /** - * Reset the fields that are affected by our state. - */ - private void resetFields() { - selectedStream = null; - addNewlineToPayload = true; - } - - /** * Handles {@link State#EXPECT_MARKER_BYTE}. The byte determines which stream * we will be using, and will set {@link #selectedStream}. */ - private void parseMarkerByte(byte markerByte) throws IOException { + private void parseMarkerByte(int markerByte) throws IOException { if (markerByte < 0 || markerByte > Byte.MAX_VALUE) { String msg = "Illegal marker byte (" + markerByte + ")"; throw new IllegalArgumentException(msg); @@ -168,19 +131,15 @@ public final class StreamDemultiplexer extends OutputStream { throw new IOException("stream " + markerByte + " not registered."); } selectedStream = outputStreams[markerByte]; - state = State.EXPECT_AT_OR_NEWLINE; + state = State.EXPECT_SIZE; + currentSizeByte = 0; } - /** - * Handles {@link State#EXPECT_CONTROL_STARTING_AT}, the very first '@' with - * which each message starts. - */ - private void parseControlStartingAt(byte b) throws IOException { - if (b != AT) { - throw new IOException("Expected control starting @. (" + b + ", " - + (char) b + ")"); + private void parsePayload(int b) throws IOException { + selectedStream.write(b); + payloadBytesLeft -= 1; + if (payloadBytesLeft == 0) { + state = State.EXPECT_MARKER_BYTE; } - state = State.EXPECT_MARKER_BYTE; } - } diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java index 263b98a274..209dd302bc 100644 --- a/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java @@ -28,37 +28,28 @@ import java.io.OutputStream; * end of a networking connection can simply read the tagged lines and then act * on them within a sigle thread. * - * The format of the tagged output stream is as follows: + * The format of the tagged output stream is reasonably simple: + * <ol> + * <li> + * Marker byte indicating whether that chunk is for stdout (1), stderr (2) or the control + * stream (3). + * </li> + * <li> + * 4 bytes indicating the length of the chunk in high-endian format. + * </li> + * <li> + * The payload (as many bytes as the length field before) + * </li> + * </ol>> * - * <pre> - * combined :: = [ control_line payload ... ]+ - * control_line :: = '@' marker '@'? '\n' - * payload :: = r'^[^\n]*\n' - * </pre> * - * So basically: - * <ul> - * <li>Control lines alternate with payload lines</li> - * <li>Both types of lines end with a newline, and never have a newline in - * them.</li> - * <li>The marker indicates which stream we mean. - * For now, '1'=stdout, '2'=stderr.</li> - * <li>The optional second '@' indicates that the following line is - * incomplete.</li> - * </ul> - * - * This format is optimized for easy interpretation by a Python client, but it's - * also a compromise in that it's still easy to interpret by a human (let's say - * you have to read the traffic over a wire for some reason). */ @ThreadSafe public final class StreamMultiplexer { - public static final byte STDOUT_MARKER = '1'; - public static final byte STDERR_MARKER = '2'; - public static final byte CONTROL_MARKER = '3'; - - private static final byte AT = '@'; + public static final byte STDOUT_MARKER = 1; + public static final byte STDERR_MARKER = 2; + public static final byte CONTROL_MARKER = 3; private final Object mutex = new Object(); private final OutputStream multiplexed; @@ -82,19 +73,13 @@ public final class StreamMultiplexer { multiplexed.flush(); return; } - byte lastByte = buffer[len - 1]; - boolean lineIsIncomplete = lastByte != NEWLINE; - multiplexed.write(AT); multiplexed.write(markerByte); - if (lineIsIncomplete) { - multiplexed.write(AT); - } - multiplexed.write(NEWLINE); + multiplexed.write((len >> 24) & 0xff); + multiplexed.write((len >> 16) & 0xff); + multiplexed.write((len >> 8) & 0xff); + multiplexed.write(len & 0xff); multiplexed.write(buffer, 0, len); - if (lineIsIncomplete) { - multiplexed.write(NEWLINE); - } multiplexed.flush(); } len = 0; |