From 4dd60f9cef8a206bac8f83af8153bb8e335479bc Mon Sep 17 00:00:00 2001 From: Lukacs Berki Date: Tue, 22 Mar 2016 08:02:18 +0000 Subject: Revamp the client/server communication protocol so that it is portable to Windows. Progress towards #930. -- MOS_MIGRATED_REVID=117799006 --- src/main/cpp/blaze.cc | 167 ++++++++++++--------- .../devtools/build/lib/server/RPCServer.java | 20 ++- .../devtools/build/lib/server/ServerResponse.java | 116 -------------- .../build/lib/util/io/StreamDemultiplexer.java | 99 ++++-------- .../build/lib/util/io/StreamMultiplexer.java | 55 +++---- .../devtools/build/lib/server/RPCServerTest.java | 11 +- .../build/lib/server/RPCTestingClient.java | 21 ++- .../build/lib/util/io/StreamDemultiplexerTest.java | 65 ++++---- .../io/StreamMultiplexerParallelStressTest.java | 3 +- .../build/lib/util/io/StreamMultiplexerTest.java | 61 ++++---- 10 files changed, 253 insertions(+), 365 deletions(-) delete mode 100644 src/main/java/com/google/devtools/build/lib/server/ServerResponse.java (limited to 'src') diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc index 0bd75b5aa4..0fa9b75535 100644 --- a/src/main/cpp/blaze.cc +++ b/src/main/cpp/blaze.cc @@ -1107,22 +1107,45 @@ static void handler(int signum) { } +static ATTRIBUTE_NORETURN void server_eof() { + // e.g. external SIGKILL of server, misplaced System.exit() in the server, + // or a JVM crash. Print out the jvm.out file in case there's something + // useful. + fprintf(stderr, "Error: unexpected EOF from %s server.\n" + "Contents of '%s':\n", globals->options.GetProductName().c_str(), + globals->jvm_log_file.c_str()); + WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str()); + exit(GetExitCodeForAbruptExit(*globals)); +} + // Reads a single char from the specified stream. -static char read_server_char(FILE *fp) { - int c = getc(fp); - if (c == EOF) { - // e.g. external SIGKILL of server, misplaced System.exit() in the server, - // or a JVM crash. Print out the jvm.out file in case there's something - // useful. - fprintf(stderr, "Error: unexpected EOF from %s server.\n" - "Contents of '%s':\n", globals->options.GetProductName().c_str(), - globals->jvm_log_file.c_str()); - WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str()); - exit(GetExitCodeForAbruptExit(*globals)); - } - return static_cast(c); +static unsigned char read_server_char(int fd) { + unsigned char result; + if (read(fd, &result, 1) != 1) { + server_eof(); + } + return result; +} + +static unsigned int read_server_int(int fd) { + unsigned char buffer[4]; + unsigned char *p = buffer; + int remaining = 4; + + while (remaining > 0) { + int bytes_read = read(fd, p, remaining); + if (bytes_read <= 0) { + server_eof(); + } + + remaining -= bytes_read; + p += bytes_read; + } + + return (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) + buffer[3]; } + // Constructs the command line for a server request, static string BuildServerRequest() { vector arg_vector; @@ -1143,6 +1166,23 @@ static string BuildServerRequest() { return request; } +static char server_output_buffer[8192]; + +static void forward_server_output(int socket, int output) { + unsigned int remaining = read_server_int(socket); + while (remaining > 0) { + int bytes = remaining > 8192 ? 8192 : remaining; + bytes = read(socket, server_output_buffer, bytes); + if (bytes <= 0) { + server_eof(); + } + + remaining -= bytes; + // Not much we can do if this doesn't work + write(output, server_output_buffer, bytes); + } +} + // Performs all I/O for a single client request to the server, and // shuts down the client (by exit or signal). static ATTRIBUTE_NORETURN void SendServerRequest() { @@ -1185,8 +1225,6 @@ static ATTRIBUTE_NORETURN void SendServerRequest() { } } - FILE *fp = fdopen(socket, "r"); // use buffering for reads--it's faster - if (VerboseLogging()) { fprintf(stderr, "Connected (server pid=%d).\n", globals->server_pid); } @@ -1205,15 +1243,18 @@ static ATTRIBUTE_NORETURN void SendServerRequest() { signal(SIGPIPE, handler); signal(SIGQUIT, handler); - // Send request and shutdown the write half of the connection: - // (Request is written in a single chunk.) - if (write(socket, request.data(), request.size()) != request.size()) { + // Send request (Request is written in a single chunk.) + char request_size[4]; + request_size[0] = (request.size() >> 24) & 0xff; + request_size[1] = (request.size() >> 16) & 0xff; + request_size[2] = (request.size() >> 8) & 0xff; + request_size[3] = (request.size()) & 0xff; + if (write(socket, request_size, 4) != 4) { pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed"); } - // In this (totally bizarre) protocol, this is the - // client's way of saying "um, that's the end of the request". - if (shutdown(socket, SHUT_WR) == -1) { - pdie(blaze_exit_code::INTERNAL_ERROR, "shutdown(WR) failed"); + + if (write(socket, request.data(), request.size()) != request.size()) { + pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed"); } // Wait until we receive some response from the server. @@ -1247,58 +1288,48 @@ static ATTRIBUTE_NORETURN void SendServerRequest() { } } - // Read and demux the response. This protocol is awful. + // Read and demux the response. + const int TAG_STDOUT = 1; + const int TAG_STDERR = 2; + const int TAG_CONTROL = 3; for (;;) { - // Read one line: - char at = read_server_char(fp); - assert(at == '@'); - (void) at; // avoid warning about unused variable - char tag = read_server_char(fp); - assert(tag == '1' || tag == '2' || tag == '3'); - char at_or_newline = read_server_char(fp); - bool second_at = at_or_newline == '@'; - if (second_at) { - at_or_newline = read_server_char(fp); - } - assert(at_or_newline == '\n'); - - if (tag == '3') { - // In this (totally bizarre) protocol, this is the - // server's way of saying "um, that's the end of the response". - break; - } - FILE *stream = tag == '1' ? stdout : stderr; - for (;;) { - char c = read_server_char(fp); - if (c == '\n') { - if (!second_at) fputc(c, stream); - fflush(stream); + // Read the tag + char tag = read_server_char(socket); + switch (tag) { + // stdout + case TAG_STDOUT: + forward_server_output(socket, 1); break; - } else { - fputc(c, stream); - } - } - } - char line[255]; - if (fgets(line, sizeof line, fp) == NULL || - !isdigit(line[0])) { - die(blaze_exit_code::INTERNAL_ERROR, - "Error: can't read exit code from server."); - } - int exit_code; - blaze_util::safe_strto32(line, &exit_code); + // stderr + case TAG_STDERR: + forward_server_output(socket, 2); + break; - close(socket); // might fail EINTR, just ignore. + // Control stream. Currently only used for reporting the exit code. + case TAG_CONTROL: + if (globals->received_signal) { + // Kill ourselves with the same signal, so that callers see the + // right WTERMSIG value. + signal(globals->received_signal, SIG_DFL); + raise(globals->received_signal); + exit(1); // (in case raise didn't kill us for some reason) + } else { + unsigned int length = read_server_int(socket); + if (length != 4) { + server_eof(); + } + unsigned int exit_code = read_server_int(socket); + exit(exit_code); + } + break; // Control never gets here, only for code beauty - if (globals->received_signal) { // Kill ourselves with the same signal, so - // that callers see the right WTERMSIG value. - signal(globals->received_signal, SIG_DFL); - raise(globals->received_signal); - exit(1); // (in case raise didn't kill us for some reason) + default: + fprintf(stderr, "bad tag %d\n", tag); + server_eof(); + break; // Control never gets here, only for code beauty + } } - - exit(exit_code); } // Parse the options, storing parsed values in globals. diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java index 4302f3c88c..206030b45d 100644 --- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java @@ -14,8 +14,6 @@ package com.google.devtools.build.lib.server; -import static java.nio.charset.StandardCharsets.UTF_8; - import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; @@ -427,10 +425,15 @@ public final class RPCServer { * blaze.cc) to interface with Unix APIs. */ private static List readRequest(InputStream input) throws IOException { - byte[] inputBytes = ByteStreams.toByteArray(input); - if (inputBytes.length == 0) { - return null; - } + byte[] sizeBuffer = new byte[4]; + ByteStreams.readFully(input, sizeBuffer); + int size = ((sizeBuffer[0] & 0xff) << 24) + + ((sizeBuffer[1] & 0xff) << 16) + + ((sizeBuffer[2] & 0xff) << 8) + + (sizeBuffer[3] & 0xff); + byte[] inputBytes = new byte[size]; + ByteStreams.readFully(input, inputBytes); + String s = new String(inputBytes, Charset.defaultCharset()); return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s)); } @@ -522,7 +525,10 @@ public final class RPCServer { // exit code. flushOutErr(); try { - controlChannel.write(("" + exitStatus + "\n").getBytes(UTF_8)); + controlChannel.write((exitStatus >> 24) & 0xff); + controlChannel.write((exitStatus >> 16) & 0xff); + controlChannel.write((exitStatus >> 8) & 0xff); + controlChannel.write(exitStatus & 0xff); controlChannel.flush(); LOG.info("" + exitStatus); } catch (IOException ignored) { diff --git a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java b/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java deleted file mode 100644 index 3589bb2fb0..0000000000 --- a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.server; - -import com.google.devtools.build.lib.util.Preconditions; - -import java.io.ByteArrayOutputStream; -import java.io.UnsupportedEncodingException; - -/** - * This class models a response from the {@link RPCServer}. This is a - * tuple of an error message and the exit status. The encoding of the response - * is extremely simple {@link #toString()}: - * - *
  • Iff a message is present, the wire format is - *
    message + '\n' + exit code as string + '\n'
    - *
  • - *
  • Otherwise it's just the exit code as string + '\n'
  • - *
- */ -final class ServerResponse { - - /** - * Parses an input string into a {@link ServerResponse} object. - */ - public static ServerResponse parseFrom(String input) { - if (input.charAt(input.length() - 1) != '\n') { - String msg = "Response must end with newline (" + input + ")"; - throw new IllegalArgumentException(msg); - } - int newlineAt = input.lastIndexOf('\n', input.length() - 2); - - final String exitStatusString; - final String errorMessage; - if (newlineAt == -1) { - errorMessage = ""; - exitStatusString = input.substring(0, input.length() - 1); - } else { - errorMessage = input.substring(0, newlineAt); - exitStatusString = input.substring(newlineAt + 1, input.length() - 1); - } - - return new ServerResponse(errorMessage, Integer.parseInt(exitStatusString)); - } - - /** - * Parses {@code bytes} into a {@link ServerResponse} instance, assuming - * Latin 1 encoding. - */ - public static ServerResponse parseFrom(byte[] bytes) { - try { - return parseFrom(new String(bytes, "ISO-8859-1")); - } catch (UnsupportedEncodingException e) { - throw new AssertionError(e); // Latin 1 is everywhere. - } - } - - /** - * Parses {@code bytes} into a {@link ServerResponse} instance, assuming - * Latin 1 encoding. - */ - public static ServerResponse parseFrom(ByteArrayOutputStream bytes) { - return parseFrom(bytes.toByteArray()); - } - - private final String errorMessage; - private final int exitStatus; - - /** - * Construct a new instance given an error message and an exit status. - */ - public ServerResponse(String errorMessage, int exitStatus) { - Preconditions.checkNotNull(errorMessage); - this.errorMessage = errorMessage; - this.exitStatus = exitStatus; - } - - /** - * The wire representation of this response object. - */ - @Override - public String toString() { - if (errorMessage.length() == 0) { - return Integer.toString(exitStatus) + '\n'; - } - return errorMessage + '\n' + exitStatus + '\n'; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ServerResponse)) { - return false; - } - ServerResponse otherResponse = (ServerResponse) other; - return exitStatus == otherResponse.exitStatus - && errorMessage.equals(otherResponse.errorMessage); - } - - @Override - public int hashCode() { - return exitStatus * 31 ^ errorMessage.hashCode(); - } - -} diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java index 051f218515..ca6d2b5e09 100644 --- a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java @@ -48,9 +48,6 @@ public final class StreamDemultiplexer extends OutputStream { } } - private static final byte AT = '@'; - private static final byte NEWLINE = '\n'; - /** * The output streams, conveniently in an array indexed by the marker byte. * Some of these will be null, most likely. @@ -64,22 +61,23 @@ public final class StreamDemultiplexer extends OutputStream { * parse things. */ private enum State { - EXPECT_CONTROL_STARTING_AT, EXPECT_MARKER_BYTE, - EXPECT_AT_OR_NEWLINE, - EXPECT_PAYLOAD_OR_NEWLINE + EXPECT_SIZE, + EXPECT_PAYLOAD, } - private State state = State.EXPECT_CONTROL_STARTING_AT; - private boolean addNewlineToPayload; + private final int[] sizeBuffer = new int[4]; + private State state = State.EXPECT_MARKER_BYTE; private OutputStream selectedStream; + private int currentSizeByte = 0; + private int payloadBytesLeft = 0; /** * Construct a new demultiplexer. The {@code smallestMarkerByte} indicates * the marker byte we would expect for {@code outputStreams[0]} to be used. * So, if this first stream is your stdout and you're using the * {@link StreamMultiplexer}, then you will need to set this to - * {@code '1'}. Because {@link StreamDemultiplexer} extends + * {@code 1}. Because {@link StreamDemultiplexer} extends * {@link OutputStream}, this constructor effectively creates an * {@link OutputStream} instance which demultiplexes the tagged data client * code writes to it into {@code outputStreams}. @@ -95,70 +93,35 @@ public final class StreamDemultiplexer extends OutputStream { public void write(int b) throws IOException { // This dispatch traverses the finite state machine / grammar. switch (state) { - case EXPECT_CONTROL_STARTING_AT: - parseControlStartingAt((byte) b); - resetFields(); - break; case EXPECT_MARKER_BYTE: - parseMarkerByte((byte) b); + parseMarkerByte(b); break; - case EXPECT_AT_OR_NEWLINE: - parseAtOrNewline((byte) b); + case EXPECT_SIZE: + parseSize(b); break; - case EXPECT_PAYLOAD_OR_NEWLINE: - parsePayloadOrNewline((byte) b); + case EXPECT_PAYLOAD: + parsePayload(b); break; } } - /** - * Handles {@link State#EXPECT_PAYLOAD_OR_NEWLINE}, which is the payload - * we are actually transporting over the wire. At this point we can rely - * on a stream having been preselected into {@link #selectedStream}, and - * also we will add a newline if {@link #addNewlineToPayload} is set. - * Flushes at the end of every payload segment. - */ - private void parsePayloadOrNewline(byte b) throws IOException { - if (b == NEWLINE) { - if (addNewlineToPayload) { - selectedStream.write(NEWLINE); - } - selectedStream.flush(); - state = State.EXPECT_CONTROL_STARTING_AT; - } else { - selectedStream.write(b); - selectedStream.flush(); // slow? + private void parseSize(int b) { + sizeBuffer[currentSizeByte] = b; + currentSizeByte += 1; + if (currentSizeByte == 4) { + state = State.EXPECT_PAYLOAD; + payloadBytesLeft = (sizeBuffer[0] << 24) + + (sizeBuffer[1] << 16) + + (sizeBuffer[2] << 8) + + sizeBuffer[3]; } } - /** - * Handles {@link State#EXPECT_AT_OR_NEWLINE}, which is either the - * suppress newline indicator (at) at the end of a control line, or the end - * of a control line. - */ - private void parseAtOrNewline(byte b) throws IOException { - if (b == NEWLINE) { - state = State.EXPECT_PAYLOAD_OR_NEWLINE; - } else if (b == AT) { - addNewlineToPayload = false; - } else { - throw new IOException("Expected @ or \\n. (" + b + ")"); - } - } - - /** - * Reset the fields that are affected by our state. - */ - private void resetFields() { - selectedStream = null; - addNewlineToPayload = true; - } - /** * Handles {@link State#EXPECT_MARKER_BYTE}. The byte determines which stream * we will be using, and will set {@link #selectedStream}. */ - private void parseMarkerByte(byte markerByte) throws IOException { + private void parseMarkerByte(int markerByte) throws IOException { if (markerByte < 0 || markerByte > Byte.MAX_VALUE) { String msg = "Illegal marker byte (" + markerByte + ")"; throw new IllegalArgumentException(msg); @@ -168,19 +131,15 @@ public final class StreamDemultiplexer extends OutputStream { throw new IOException("stream " + markerByte + " not registered."); } selectedStream = outputStreams[markerByte]; - state = State.EXPECT_AT_OR_NEWLINE; + state = State.EXPECT_SIZE; + currentSizeByte = 0; } - /** - * Handles {@link State#EXPECT_CONTROL_STARTING_AT}, the very first '@' with - * which each message starts. - */ - private void parseControlStartingAt(byte b) throws IOException { - if (b != AT) { - throw new IOException("Expected control starting @. (" + b + ", " - + (char) b + ")"); + private void parsePayload(int b) throws IOException { + selectedStream.write(b); + payloadBytesLeft -= 1; + if (payloadBytesLeft == 0) { + state = State.EXPECT_MARKER_BYTE; } - state = State.EXPECT_MARKER_BYTE; } - } diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java index 263b98a274..209dd302bc 100644 --- a/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java @@ -28,37 +28,28 @@ import java.io.OutputStream; * end of a networking connection can simply read the tagged lines and then act * on them within a sigle thread. * - * The format of the tagged output stream is as follows: + * The format of the tagged output stream is reasonably simple: + *
    + *
  1. + * Marker byte indicating whether that chunk is for stdout (1), stderr (2) or the control + * stream (3). + *
  2. + *
  3. + * 4 bytes indicating the length of the chunk in high-endian format. + *
  4. + *
  5. + * The payload (as many bytes as the length field before) + *
  6. + *
> * - *
- * combined :: = [ control_line payload ... ]+
- * control_line :: = '@' marker '@'? '\n'
- * payload :: = r'^[^\n]*\n'
- * 
* - * So basically: - *
    - *
  • Control lines alternate with payload lines
  • - *
  • Both types of lines end with a newline, and never have a newline in - * them.
  • - *
  • The marker indicates which stream we mean. - * For now, '1'=stdout, '2'=stderr.
  • - *
  • The optional second '@' indicates that the following line is - * incomplete.
  • - *
- * - * This format is optimized for easy interpretation by a Python client, but it's - * also a compromise in that it's still easy to interpret by a human (let's say - * you have to read the traffic over a wire for some reason). */ @ThreadSafe public final class StreamMultiplexer { - public static final byte STDOUT_MARKER = '1'; - public static final byte STDERR_MARKER = '2'; - public static final byte CONTROL_MARKER = '3'; - - private static final byte AT = '@'; + public static final byte STDOUT_MARKER = 1; + public static final byte STDERR_MARKER = 2; + public static final byte CONTROL_MARKER = 3; private final Object mutex = new Object(); private final OutputStream multiplexed; @@ -82,19 +73,13 @@ public final class StreamMultiplexer { multiplexed.flush(); return; } - byte lastByte = buffer[len - 1]; - boolean lineIsIncomplete = lastByte != NEWLINE; - multiplexed.write(AT); multiplexed.write(markerByte); - if (lineIsIncomplete) { - multiplexed.write(AT); - } - multiplexed.write(NEWLINE); + multiplexed.write((len >> 24) & 0xff); + multiplexed.write((len >> 16) & 0xff); + multiplexed.write((len >> 8) & 0xff); + multiplexed.write(len & 0xff); multiplexed.write(buffer, 0, len); - if (lineIsIncomplete) { - multiplexed.write(NEWLINE); - } multiplexed.flush(); } len = 0; diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java b/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java index 5d57544062..f81a0384f3 100644 --- a/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java +++ b/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java @@ -102,27 +102,26 @@ public class RPCServerTest { FileSystemUtils.deleteTree(serverDir); } - private void runTestRequest(String request, int ret, String out, String err, - String control) throws Exception { - assertEquals(new ServerResponse(control, ret), client.sendRequest(request)); + private void runTestRequest(String request, int ret, String out, String err) throws Exception { + assertEquals(ret, client.sendRequest(request)); assertEquals(out, outErr.outAsLatin1()); assertThat(outErr.errAsLatin1()).contains(err); } @Test public void testUnknownCommand() throws Exception { - runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n", ""); + runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); } @Test public void testEmptyBlazeCommand() throws Exception { - runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n", ""); + runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); } @Test public void testWorkspaceDies() throws Exception { assertTrue(serverThread.isAlive()); - runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR, ""); + runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR); Thread.sleep(HEALTH_CHECK_MILLIS * 2); assertTrue(serverThread.isAlive()); diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java index e24e0bb3ac..4a659a14f8 100644 --- a/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java +++ b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java @@ -43,7 +43,7 @@ public class RPCTestingClient { this.outErr = outErr; } - public ServerResponse sendRequest(String command, String... params) + public int sendRequest(String command, String... params) throws Exception { String request = command; for (String param : params) { @@ -52,24 +52,35 @@ public class RPCTestingClient { return sendRequest(request); } - public ServerResponse sendRequest(String request) throws Exception { + public int sendRequest(String request) throws Exception { LocalClientSocket connection = new LocalClientSocket(); connection.connect(new LocalSocketAddress(socketFile.getPathFile())); try { OutputStream out = connection.getOutputStream(); - out.write(request.getBytes(UTF_8)); + byte[] requestBytes = request.getBytes(UTF_8); + byte[] requestLength = new byte[4]; + requestLength[0] = (byte) (requestBytes.length << 24); + requestLength[1] = (byte) ((requestBytes.length << 16) & 0xff); + requestLength[2] = (byte) ((requestBytes.length << 8) & 0xff); + requestLength[3] = (byte) (requestBytes.length & 0xff); + out.write(requestLength); + out.write(requestBytes); out.flush(); connection.shutdownOutput(); OutputStream stdout = outErr.getOutputStream(); OutputStream stderr = outErr.getErrorStream(); ByteArrayOutputStream control = new ByteArrayOutputStream(); - StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', + StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, stdout, stderr, control); ByteStreams.copy(connection.getInputStream(), demux); demux.flush(); - return ServerResponse.parseFrom(control); + byte[] controlBytes = control.toByteArray(); + return (((int) controlBytes[0]) << 24) + + (((int) controlBytes[1]) << 16) + + (((int) controlBytes[2]) << 8) + + ((int) controlBytes[3]); } finally { connection.close(); } diff --git a/src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java b/src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java index 47d693116a..786eb6c42b 100644 --- a/src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java +++ b/src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java @@ -16,8 +16,6 @@ package com.google.devtools.build.lib.util.io; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import com.google.devtools.build.lib.util.StringUtilities; - import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -25,6 +23,7 @@ import org.junit.runners.JUnit4; import java.io.ByteArrayOutputStream; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; import java.util.Random; /** @@ -37,14 +36,6 @@ public class StreamDemultiplexerTest { private ByteArrayOutputStream err = new ByteArrayOutputStream(); private ByteArrayOutputStream ctl = new ByteArrayOutputStream(); - private byte[] lines(String... lines) { - try { - return StringUtilities.joinLines(lines).getBytes("ISO-8859-1"); - } catch (UnsupportedEncodingException e) { - throw new AssertionError(e); - } - } - private String toAnsi(ByteArrayOutputStream stream) { try { return new String(stream.toByteArray(), "ISO-8859-1"); @@ -63,8 +54,8 @@ public class StreamDemultiplexerTest { @Test public void testHelloWorldOnStandardOut() throws Exception { - byte[] multiplexed = lines("@1@", "Hello, world."); - try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out)) { + byte[] multiplexed = chunk(1, "Hello, world."); + try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out)) { demux.write(multiplexed); } assertEquals("Hello, world.", out.toString("ISO-8859-1")); @@ -72,8 +63,8 @@ public class StreamDemultiplexerTest { @Test public void testOutErrCtl() throws Exception { - byte[] multiplexed = lines("@1@", "out", "@2@", "err", "@3@", "ctl", ""); - try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out, err, ctl)) { + byte[] multiplexed = concat(chunk(1, "out"), chunk(2, "err"), chunk(3, "ctl")); + try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out, err, ctl)) { demux.write(multiplexed); } assertEquals("out", toAnsi(out)); @@ -83,26 +74,16 @@ public class StreamDemultiplexerTest { @Test public void testWithoutLineBreaks() throws Exception { - byte[] multiplexed = lines("@1@", "just ", "@1@", "one ", "@1@", "line", ""); - try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out)) { + byte[] multiplexed = concat(chunk(1, "just "), chunk(1, "one "), chunk(1, "line")); + try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out)) { demux.write(multiplexed); } assertEquals("just one line", out.toString("ISO-8859-1")); } - @Test - public void testLineBreaks() throws Exception { - byte[] multiplexed = lines("@1", "two", "@1", "lines", ""); - try (StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out)) { - demux.write(multiplexed); - demux.flush(); - assertEquals("two\nlines\n", out.toString("ISO-8859-1")); - } - } - @Test public void testMultiplexAndBackWithHelloWorld() throws Exception { - StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out); + StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out); StreamMultiplexer mux = new StreamMultiplexer(demux); OutputStream out = mux.createStdout(); out.write(inAnsi("Hello, world.")); @@ -112,7 +93,7 @@ public class StreamDemultiplexerTest { @Test public void testMultiplexDemultiplexBinaryStress() throws Exception { - StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out, err, ctl); + StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out, err, ctl); StreamMultiplexer mux = new StreamMultiplexer(demux); OutputStream[] muxOuts = {mux.createStdout(), mux.createStderr(), mux.createControl()}; ByteArrayOutputStream[] expectedOuts = @@ -132,4 +113,32 @@ public class StreamDemultiplexerTest { assertArrayEquals(expectedOuts[1].toByteArray(), err.toByteArray()); assertArrayEquals(expectedOuts[2].toByteArray(), ctl.toByteArray()); } + + private static byte[] chunk(int stream, String payload) { + byte[] payloadBytes = payload.getBytes(Charset.defaultCharset()); + byte[] result = new byte[payloadBytes.length + 5]; + + System.arraycopy(payloadBytes, 0, result, 5, payloadBytes.length); + result[0] = (byte) stream; + result[1] = (byte) (payloadBytes.length >> 24); + result[2] = (byte) ((payloadBytes.length >> 16) & 0xff); + result[3] = (byte) ((payloadBytes.length >> 8) & 0xff); + result[4] = (byte) (payloadBytes.length & 0xff); + return result; + } + + private static byte[] concat(byte[]... chunks) { + int length = 0; + for (byte[] chunk : chunks) { + length += chunk.length; + } + + byte[] result = new byte[length]; + int previousChunks = 0; + for (byte[] chunk : chunks) { + System.arraycopy(chunk, 0, result, previousChunks, chunk.length); + previousChunks += chunk.length; + } + return result; + } } diff --git a/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java b/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java index 932850fb0c..0c99a16d08 100644 --- a/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java +++ b/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java @@ -49,8 +49,7 @@ public class StreamMultiplexerParallelStressTest { */ OutputStream devNull = ByteStreams.nullOutputStream(); - StreamDemultiplexer demux = new StreamDemultiplexer((byte)'1', - devNull, devNull, devNull); + StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, devNull, devNull, devNull); /** * The multiplexer under test. diff --git a/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java b/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java index 26664a155d..6d3f2c3292 100644 --- a/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java +++ b/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.util.io; -import static com.google.devtools.build.lib.util.StringUtilities.joinLines; + +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -27,6 +28,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.util.Arrays; /** * Test for {@link StreamMultiplexer}. @@ -61,42 +63,41 @@ public class StreamMultiplexerTest { return string.getBytes("ISO-8859-1"); } - private static String getLatin(byte[] bytes) - throws UnsupportedEncodingException { - return new String(bytes, "ISO-8859-1"); - } - @Test - public void testHelloWorldOnStdOut() throws IOException { + public void testHelloWorldOnStdOut() throws Exception { out.write(getLatin("Hello, world.")); out.flush(); - assertEquals(joinLines("@1@", "Hello, world.", ""), - getLatin(multiplexed.toByteArray())); + assertMessage(multiplexed.toByteArray(), 0, "Hello, world."); } @Test public void testInterleavedStdoutStderrControl() throws Exception { + int start = 0; out.write(getLatin("Hello, stdout.")); out.flush(); + assertMessage(multiplexed.toByteArray(), start, "Hello, stdout."); + start = multiplexed.toByteArray().length; + err.write(getLatin("Hello, stderr.")); err.flush(); + assertMessage(multiplexed.toByteArray(), start, "Hello, stderr."); + start = multiplexed.toByteArray().length; + ctl.write(getLatin("Hello, control.")); ctl.flush(); + assertMessage(multiplexed.toByteArray(), start, "Hello, control."); + start = multiplexed.toByteArray().length; + out.write(getLatin("... and back!")); out.flush(); - assertEquals(joinLines("@1@", "Hello, stdout.", - "@2@", "Hello, stderr.", - "@3@", "Hello, control.", - "@1@", "... and back!", - ""), - getLatin(multiplexed.toByteArray())); + assertMessage(multiplexed.toByteArray(), start, "... and back!"); } @Test public void testWillNotCommitToUnderlyingStreamUnlessFlushOrNewline() throws Exception { out.write(getLatin("There are no newline characters in here, so it won't" + - " get written just yet.")); + " get written just yet.")); assertArrayEquals(multiplexed.toByteArray(), new byte[0]); } @@ -105,16 +106,11 @@ public class StreamMultiplexerTest { out.write(getLatin("No newline just yet, so no flushing. ")); assertArrayEquals(multiplexed.toByteArray(), new byte[0]); out.write(getLatin("OK, here we go:\nAnd more to come.")); - - String expected = joinLines("@1", - "No newline just yet, so no flushing. OK, here we go:", ""); - - assertEquals(expected, getLatin(multiplexed.toByteArray())); - + assertMessage( + multiplexed.toByteArray(), 0, "No newline just yet, so no flushing. OK, here we go:\n"); + int firstMessageLength = multiplexed.toByteArray().length; out.write((byte) '\n'); - expected += joinLines("@1", "And more to come.", ""); - - assertEquals(expected, getLatin(multiplexed.toByteArray())); + assertMessage(multiplexed.toByteArray(), firstMessageLength, "And more to come.\n"); } @Test @@ -122,14 +118,14 @@ public class StreamMultiplexerTest { out.write(getLatin("Don't forget to flush!")); assertArrayEquals(new byte[0], multiplexed.toByteArray()); out.flush(); // now the output will appear in multiplexed. - assertEquals(joinLines("@1@", "Don't forget to flush!", ""), - getLatin(multiplexed.toByteArray())); + assertStartsWith(multiplexed.toByteArray(), 1, 0, 0, 0); + assertMessage(multiplexed.toByteArray(), 0, "Don't forget to flush!"); } @Test public void testByteEncoding() throws IOException { OutputStream devNull = ByteStreams.nullOutputStream(); - StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', devNull); + StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, devNull); StreamMultiplexer mux = new StreamMultiplexer(demux); OutputStream out = mux.createStdout(); @@ -145,4 +141,13 @@ public class StreamMultiplexerTest { out.write(10); } + private static void assertStartsWith(byte[] actual, int... expectedPrefix){ + for (int i = 0; i < expectedPrefix.length; i++) { + assertThat(actual[i]).isEqualTo(expectedPrefix[i]); + } + } + + private static void assertMessage(byte[] actual, int start, String expected) throws Exception { + assertThat(Arrays.copyOfRange(actual, start + 5, actual.length)).isEqualTo(getLatin(expected)); + } } -- cgit v1.2.3