aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-03-22 08:02:18 +0000
committerGravatar Lukacs Berki <lberki@google.com>2016-03-22 08:09:59 +0000
commit4dd60f9cef8a206bac8f83af8153bb8e335479bc (patch)
tree4d737bef42bfb20f707bb5362b6bbdb41d30f2f4 /src
parentb4c00b6eead53ba9381bab12765b2c4ed98a61d1 (diff)
Revamp the client/server communication protocol so that it is portable to Windows.
Progress towards #930. -- MOS_MIGRATED_REVID=117799006
Diffstat (limited to 'src')
-rw-r--r--src/main/cpp/blaze.cc167
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/RPCServer.java20
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/ServerResponse.java116
-rw-r--r--src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java99
-rw-r--r--src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java55
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java11
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java21
-rw-r--r--src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java65
-rw-r--r--src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java3
-rw-r--r--src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java61
10 files changed, 253 insertions, 365 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index 0bd75b5aa4..0fa9b75535 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -1107,22 +1107,45 @@ static void handler(int signum) {
}
+static ATTRIBUTE_NORETURN void server_eof() {
+ // e.g. external SIGKILL of server, misplaced System.exit() in the server,
+ // or a JVM crash. Print out the jvm.out file in case there's something
+ // useful.
+ fprintf(stderr, "Error: unexpected EOF from %s server.\n"
+ "Contents of '%s':\n", globals->options.GetProductName().c_str(),
+ globals->jvm_log_file.c_str());
+ WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str());
+ exit(GetExitCodeForAbruptExit(*globals));
+}
+
// Reads a single char from the specified stream.
-static char read_server_char(FILE *fp) {
- int c = getc(fp);
- if (c == EOF) {
- // e.g. external SIGKILL of server, misplaced System.exit() in the server,
- // or a JVM crash. Print out the jvm.out file in case there's something
- // useful.
- fprintf(stderr, "Error: unexpected EOF from %s server.\n"
- "Contents of '%s':\n", globals->options.GetProductName().c_str(),
- globals->jvm_log_file.c_str());
- WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str());
- exit(GetExitCodeForAbruptExit(*globals));
- }
- return static_cast<char>(c);
+static unsigned char read_server_char(int fd) {
+ unsigned char result;
+ if (read(fd, &result, 1) != 1) {
+ server_eof();
+ }
+ return result;
+}
+
+static unsigned int read_server_int(int fd) {
+ unsigned char buffer[4];
+ unsigned char *p = buffer;
+ int remaining = 4;
+
+ while (remaining > 0) {
+ int bytes_read = read(fd, p, remaining);
+ if (bytes_read <= 0) {
+ server_eof();
+ }
+
+ remaining -= bytes_read;
+ p += bytes_read;
+ }
+
+ return (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) + buffer[3];
}
+
// Constructs the command line for a server request,
static string BuildServerRequest() {
vector<string> arg_vector;
@@ -1143,6 +1166,23 @@ static string BuildServerRequest() {
return request;
}
+static char server_output_buffer[8192];
+
+static void forward_server_output(int socket, int output) {
+ unsigned int remaining = read_server_int(socket);
+ while (remaining > 0) {
+ int bytes = remaining > 8192 ? 8192 : remaining;
+ bytes = read(socket, server_output_buffer, bytes);
+ if (bytes <= 0) {
+ server_eof();
+ }
+
+ remaining -= bytes;
+ // Not much we can do if this doesn't work
+ write(output, server_output_buffer, bytes);
+ }
+}
+
// Performs all I/O for a single client request to the server, and
// shuts down the client (by exit or signal).
static ATTRIBUTE_NORETURN void SendServerRequest() {
@@ -1185,8 +1225,6 @@ static ATTRIBUTE_NORETURN void SendServerRequest() {
}
}
- FILE *fp = fdopen(socket, "r"); // use buffering for reads--it's faster
-
if (VerboseLogging()) {
fprintf(stderr, "Connected (server pid=%d).\n", globals->server_pid);
}
@@ -1205,15 +1243,18 @@ static ATTRIBUTE_NORETURN void SendServerRequest() {
signal(SIGPIPE, handler);
signal(SIGQUIT, handler);
- // Send request and shutdown the write half of the connection:
- // (Request is written in a single chunk.)
- if (write(socket, request.data(), request.size()) != request.size()) {
+ // Send request (Request is written in a single chunk.)
+ char request_size[4];
+ request_size[0] = (request.size() >> 24) & 0xff;
+ request_size[1] = (request.size() >> 16) & 0xff;
+ request_size[2] = (request.size() >> 8) & 0xff;
+ request_size[3] = (request.size()) & 0xff;
+ if (write(socket, request_size, 4) != 4) {
pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed");
}
- // In this (totally bizarre) protocol, this is the
- // client's way of saying "um, that's the end of the request".
- if (shutdown(socket, SHUT_WR) == -1) {
- pdie(blaze_exit_code::INTERNAL_ERROR, "shutdown(WR) failed");
+
+ if (write(socket, request.data(), request.size()) != request.size()) {
+ pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed");
}
// Wait until we receive some response from the server.
@@ -1247,58 +1288,48 @@ static ATTRIBUTE_NORETURN void SendServerRequest() {
}
}
- // Read and demux the response. This protocol is awful.
+ // Read and demux the response.
+ const int TAG_STDOUT = 1;
+ const int TAG_STDERR = 2;
+ const int TAG_CONTROL = 3;
for (;;) {
- // Read one line:
- char at = read_server_char(fp);
- assert(at == '@');
- (void) at; // avoid warning about unused variable
- char tag = read_server_char(fp);
- assert(tag == '1' || tag == '2' || tag == '3');
- char at_or_newline = read_server_char(fp);
- bool second_at = at_or_newline == '@';
- if (second_at) {
- at_or_newline = read_server_char(fp);
- }
- assert(at_or_newline == '\n');
-
- if (tag == '3') {
- // In this (totally bizarre) protocol, this is the
- // server's way of saying "um, that's the end of the response".
- break;
- }
- FILE *stream = tag == '1' ? stdout : stderr;
- for (;;) {
- char c = read_server_char(fp);
- if (c == '\n') {
- if (!second_at) fputc(c, stream);
- fflush(stream);
+ // Read the tag
+ char tag = read_server_char(socket);
+ switch (tag) {
+ // stdout
+ case TAG_STDOUT:
+ forward_server_output(socket, 1);
break;
- } else {
- fputc(c, stream);
- }
- }
- }
- char line[255];
- if (fgets(line, sizeof line, fp) == NULL ||
- !isdigit(line[0])) {
- die(blaze_exit_code::INTERNAL_ERROR,
- "Error: can't read exit code from server.");
- }
- int exit_code;
- blaze_util::safe_strto32(line, &exit_code);
+ // stderr
+ case TAG_STDERR:
+ forward_server_output(socket, 2);
+ break;
- close(socket); // might fail EINTR, just ignore.
+ // Control stream. Currently only used for reporting the exit code.
+ case TAG_CONTROL:
+ if (globals->received_signal) {
+ // Kill ourselves with the same signal, so that callers see the
+ // right WTERMSIG value.
+ signal(globals->received_signal, SIG_DFL);
+ raise(globals->received_signal);
+ exit(1); // (in case raise didn't kill us for some reason)
+ } else {
+ unsigned int length = read_server_int(socket);
+ if (length != 4) {
+ server_eof();
+ }
+ unsigned int exit_code = read_server_int(socket);
+ exit(exit_code);
+ }
+ break; // Control never gets here, only for code beauty
- if (globals->received_signal) { // Kill ourselves with the same signal, so
- // that callers see the right WTERMSIG value.
- signal(globals->received_signal, SIG_DFL);
- raise(globals->received_signal);
- exit(1); // (in case raise didn't kill us for some reason)
+ default:
+ fprintf(stderr, "bad tag %d\n", tag);
+ server_eof();
+ break; // Control never gets here, only for code beauty
+ }
}
-
- exit(exit_code);
}
// Parse the options, storing parsed values in globals.
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
index 4302f3c88c..206030b45d 100644
--- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
@@ -14,8 +14,6 @@
package com.google.devtools.build.lib.server;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
@@ -427,10 +425,15 @@ public final class RPCServer {
* blaze.cc) to interface with Unix APIs.
*/
private static List<String> readRequest(InputStream input) throws IOException {
- byte[] inputBytes = ByteStreams.toByteArray(input);
- if (inputBytes.length == 0) {
- return null;
- }
+ byte[] sizeBuffer = new byte[4];
+ ByteStreams.readFully(input, sizeBuffer);
+ int size = ((sizeBuffer[0] & 0xff) << 24)
+ + ((sizeBuffer[1] & 0xff) << 16)
+ + ((sizeBuffer[2] & 0xff) << 8)
+ + (sizeBuffer[3] & 0xff);
+ byte[] inputBytes = new byte[size];
+ ByteStreams.readFully(input, inputBytes);
+
String s = new String(inputBytes, Charset.defaultCharset());
return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s));
}
@@ -522,7 +525,10 @@ public final class RPCServer {
// exit code.
flushOutErr();
try {
- controlChannel.write(("" + exitStatus + "\n").getBytes(UTF_8));
+ controlChannel.write((exitStatus >> 24) & 0xff);
+ controlChannel.write((exitStatus >> 16) & 0xff);
+ controlChannel.write((exitStatus >> 8) & 0xff);
+ controlChannel.write(exitStatus & 0xff);
controlChannel.flush();
LOG.info("" + exitStatus);
} catch (IOException ignored) {
diff --git a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java b/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java
deleted file mode 100644
index 3589bb2fb0..0000000000
--- a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java
+++ /dev/null
@@ -1,116 +0,0 @@
-// Copyright 2014 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.server;
-
-import com.google.devtools.build.lib.util.Preconditions;
-
-import java.io.ByteArrayOutputStream;
-import java.io.UnsupportedEncodingException;
-
-/**
- * This class models a response from the {@link RPCServer}. This is a
- * tuple of an error message and the exit status. The encoding of the response
- * is extremely simple {@link #toString()}:
- *
- * <ul><li>Iff a message is present, the wire format is
- * <pre>message + '\n' + exit code as string + '\n'</pre>
- * </li>
- * <li>Otherwise it's just the exit code as string + '\n'</li>
- * </ul>
- */
-final class ServerResponse {
-
- /**
- * Parses an input string into a {@link ServerResponse} object.
- */
- public static ServerResponse parseFrom(String input) {
- if (input.charAt(input.length() - 1) != '\n') {
- String msg = "Response must end with newline (" + input + ")";
- throw new IllegalArgumentException(msg);
- }
- int newlineAt = input.lastIndexOf('\n', input.length() - 2);
-
- final String exitStatusString;
- final String errorMessage;
- if (newlineAt == -1) {
- errorMessage = "";
- exitStatusString = input.substring(0, input.length() - 1);
- } else {
- errorMessage = input.substring(0, newlineAt);
- exitStatusString = input.substring(newlineAt + 1, input.length() - 1);
- }
-
- return new ServerResponse(errorMessage, Integer.parseInt(exitStatusString));
- }
-
- /**
- * Parses {@code bytes} into a {@link ServerResponse} instance, assuming
- * Latin 1 encoding.
- */
- public static ServerResponse parseFrom(byte[] bytes) {
- try {
- return parseFrom(new String(bytes, "ISO-8859-1"));
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e); // Latin 1 is everywhere.
- }
- }
-
- /**
- * Parses {@code bytes} into a {@link ServerResponse} instance, assuming
- * Latin 1 encoding.
- */
- public static ServerResponse parseFrom(ByteArrayOutputStream bytes) {
- return parseFrom(bytes.toByteArray());
- }
-
- private final String errorMessage;
- private final int exitStatus;
-
- /**
- * Construct a new instance given an error message and an exit status.
- */
- public ServerResponse(String errorMessage, int exitStatus) {
- Preconditions.checkNotNull(errorMessage);
- this.errorMessage = errorMessage;
- this.exitStatus = exitStatus;
- }
-
- /**
- * The wire representation of this response object.
- */
- @Override
- public String toString() {
- if (errorMessage.length() == 0) {
- return Integer.toString(exitStatus) + '\n';
- }
- return errorMessage + '\n' + exitStatus + '\n';
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof ServerResponse)) {
- return false;
- }
- ServerResponse otherResponse = (ServerResponse) other;
- return exitStatus == otherResponse.exitStatus
- && errorMessage.equals(otherResponse.errorMessage);
- }
-
- @Override
- public int hashCode() {
- return exitStatus * 31 ^ errorMessage.hashCode();
- }
-
-}
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
index 051f218515..ca6d2b5e09 100644
--- a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
+++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
@@ -48,9 +48,6 @@ public final class StreamDemultiplexer extends OutputStream {
}
}
- private static final byte AT = '@';
- private static final byte NEWLINE = '\n';
-
/**
* The output streams, conveniently in an array indexed by the marker byte.
* Some of these will be null, most likely.
@@ -64,22 +61,23 @@ public final class StreamDemultiplexer extends OutputStream {
* parse things.
*/
private enum State {
- EXPECT_CONTROL_STARTING_AT,
EXPECT_MARKER_BYTE,
- EXPECT_AT_OR_NEWLINE,
- EXPECT_PAYLOAD_OR_NEWLINE
+ EXPECT_SIZE,
+ EXPECT_PAYLOAD,
}
- private State state = State.EXPECT_CONTROL_STARTING_AT;
- private boolean addNewlineToPayload;
+ private final int[] sizeBuffer = new int[4];
+ private State state = State.EXPECT_MARKER_BYTE;
private OutputStream selectedStream;
+ private int currentSizeByte = 0;
+ private int payloadBytesLeft = 0;
/**
* Construct a new demultiplexer. The {@code smallestMarkerByte} indicates
* the marker byte we would expect for {@code outputStreams[0]} to be used.
* So, if this first stream is your stdout and you're using the
* {@link StreamMultiplexer}, then you will need to set this to
- * {@code '1'}. Because {@link StreamDemultiplexer} extends
+ * {@code 1}. Because {@link StreamDemultiplexer} extends
* {@link OutputStream}, this constructor effectively creates an
* {@link OutputStream} instance which demultiplexes the tagged data client
* code writes to it into {@code outputStreams}.
@@ -95,70 +93,35 @@ public final class StreamDemultiplexer extends OutputStream {
public void write(int b) throws IOException {
// This dispatch traverses the finite state machine / grammar.
switch (state) {
- case EXPECT_CONTROL_STARTING_AT:
- parseControlStartingAt((byte) b);
- resetFields();
- break;
case EXPECT_MARKER_BYTE:
- parseMarkerByte((byte) b);
+ parseMarkerByte(b);
break;
- case EXPECT_AT_OR_NEWLINE:
- parseAtOrNewline((byte) b);
+ case EXPECT_SIZE:
+ parseSize(b);
break;
- case EXPECT_PAYLOAD_OR_NEWLINE:
- parsePayloadOrNewline((byte) b);
+ case EXPECT_PAYLOAD:
+ parsePayload(b);
break;
}
}
- /**
- * Handles {@link State#EXPECT_PAYLOAD_OR_NEWLINE}, which is the payload
- * we are actually transporting over the wire. At this point we can rely
- * on a stream having been preselected into {@link #selectedStream}, and
- * also we will add a newline if {@link #addNewlineToPayload} is set.
- * Flushes at the end of every payload segment.
- */
- private void parsePayloadOrNewline(byte b) throws IOException {
- if (b == NEWLINE) {
- if (addNewlineToPayload) {
- selectedStream.write(NEWLINE);
- }
- selectedStream.flush();
- state = State.EXPECT_CONTROL_STARTING_AT;
- } else {
- selectedStream.write(b);
- selectedStream.flush(); // slow?
+ private void parseSize(int b) {
+ sizeBuffer[currentSizeByte] = b;
+ currentSizeByte += 1;
+ if (currentSizeByte == 4) {
+ state = State.EXPECT_PAYLOAD;
+ payloadBytesLeft = (sizeBuffer[0] << 24)
+ + (sizeBuffer[1] << 16)
+ + (sizeBuffer[2] << 8)
+ + sizeBuffer[3];
}
}
/**
- * Handles {@link State#EXPECT_AT_OR_NEWLINE}, which is either the
- * suppress newline indicator (at) at the end of a control line, or the end
- * of a control line.
- */
- private void parseAtOrNewline(byte b) throws IOException {
- if (b == NEWLINE) {
- state = State.EXPECT_PAYLOAD_OR_NEWLINE;
- } else if (b == AT) {
- addNewlineToPayload = false;
- } else {
- throw new IOException("Expected @ or \\n. (" + b + ")");
- }
- }
-
- /**
- * Reset the fields that are affected by our state.
- */
- private void resetFields() {
- selectedStream = null;
- addNewlineToPayload = true;
- }
-
- /**
* Handles {@link State#EXPECT_MARKER_BYTE}. The byte determines which stream
* we will be using, and will set {@link #selectedStream}.
*/
- private void parseMarkerByte(byte markerByte) throws IOException {
+ private void parseMarkerByte(int markerByte) throws IOException {
if (markerByte < 0 || markerByte > Byte.MAX_VALUE) {
String msg = "Illegal marker byte (" + markerByte + ")";
throw new IllegalArgumentException(msg);
@@ -168,19 +131,15 @@ public final class StreamDemultiplexer extends OutputStream {
throw new IOException("stream " + markerByte + " not registered.");
}
selectedStream = outputStreams[markerByte];
- state = State.EXPECT_AT_OR_NEWLINE;
+ state = State.EXPECT_SIZE;
+ currentSizeByte = 0;
}
- /**
- * Handles {@link State#EXPECT_CONTROL_STARTING_AT}, the very first '@' with
- * which each message starts.
- */
- private void parseControlStartingAt(byte b) throws IOException {
- if (b != AT) {
- throw new IOException("Expected control starting @. (" + b + ", "
- + (char) b + ")");
+ private void parsePayload(int b) throws IOException {
+ selectedStream.write(b);
+ payloadBytesLeft -= 1;
+ if (payloadBytesLeft == 0) {
+ state = State.EXPECT_MARKER_BYTE;
}
- state = State.EXPECT_MARKER_BYTE;
}
-
}
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java
index 263b98a274..209dd302bc 100644
--- a/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java
+++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamMultiplexer.java
@@ -28,37 +28,28 @@ import java.io.OutputStream;
* end of a networking connection can simply read the tagged lines and then act
* on them within a sigle thread.
*
- * The format of the tagged output stream is as follows:
+ * The format of the tagged output stream is reasonably simple:
+ * <ol>
+ * <li>
+ * Marker byte indicating whether that chunk is for stdout (1), stderr (2) or the control
+ * stream (3).
+ * </li>
+ * <li>
+ * 4 bytes indicating the length of the chunk in high-endian format.
+ * </li>
+ * <li>
+ * The payload (as many bytes as the length field before)
+ * </li>
+ * </ol>>
*
- * <pre>
- * combined :: = [ control_line payload ... ]+
- * control_line :: = '@' marker '@'? '\n'
- * payload :: = r'^[^\n]*\n'
- * </pre>
*
- * So basically:
- * <ul>
- * <li>Control lines alternate with payload lines</li>
- * <li>Both types of lines end with a newline, and never have a newline in
- * them.</li>
- * <li>The marker indicates which stream we mean.
- * For now, '1'=stdout, '2'=stderr.</li>
- * <li>The optional second '@' indicates that the following line is
- * incomplete.</li>
- * </ul>
- *
- * This format is optimized for easy interpretation by a Python client, but it's
- * also a compromise in that it's still easy to interpret by a human (let's say
- * you have to read the traffic over a wire for some reason).
*/
@ThreadSafe
public final class StreamMultiplexer {
- public static final byte STDOUT_MARKER = '1';
- public static final byte STDERR_MARKER = '2';
- public static final byte CONTROL_MARKER = '3';
-
- private static final byte AT = '@';
+ public static final byte STDOUT_MARKER = 1;
+ public static final byte STDERR_MARKER = 2;
+ public static final byte CONTROL_MARKER = 3;
private final Object mutex = new Object();
private final OutputStream multiplexed;
@@ -82,19 +73,13 @@ public final class StreamMultiplexer {
multiplexed.flush();
return;
}
- byte lastByte = buffer[len - 1];
- boolean lineIsIncomplete = lastByte != NEWLINE;
- multiplexed.write(AT);
multiplexed.write(markerByte);
- if (lineIsIncomplete) {
- multiplexed.write(AT);
- }
- multiplexed.write(NEWLINE);
+ multiplexed.write((len >> 24) & 0xff);
+ multiplexed.write((len >> 16) & 0xff);
+ multiplexed.write((len >> 8) & 0xff);
+ multiplexed.write(len & 0xff);
multiplexed.write(buffer, 0, len);
- if (lineIsIncomplete) {
- multiplexed.write(NEWLINE);
- }
multiplexed.flush();
}
len = 0;
diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java b/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java
index 5d57544062..f81a0384f3 100644
--- a/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java
@@ -102,27 +102,26 @@ public class RPCServerTest {
FileSystemUtils.deleteTree(serverDir);
}
- private void runTestRequest(String request, int ret, String out, String err,
- String control) throws Exception {
- assertEquals(new ServerResponse(control, ret), client.sendRequest(request));
+ private void runTestRequest(String request, int ret, String out, String err) throws Exception {
+ assertEquals(ret, client.sendRequest(request));
assertEquals(out, outErr.outAsLatin1());
assertThat(outErr.errAsLatin1()).contains(err);
}
@Test
public void testUnknownCommand() throws Exception {
- runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n", "");
+ runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n");
}
@Test
public void testEmptyBlazeCommand() throws Exception {
- runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n", "");
+ runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n");
}
@Test
public void testWorkspaceDies() throws Exception {
assertTrue(serverThread.isAlive());
- runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR, "");
+ runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR);
Thread.sleep(HEALTH_CHECK_MILLIS * 2);
assertTrue(serverThread.isAlive());
diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java
index e24e0bb3ac..4a659a14f8 100644
--- a/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java
+++ b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java
@@ -43,7 +43,7 @@ public class RPCTestingClient {
this.outErr = outErr;
}
- public ServerResponse sendRequest(String command, String... params)
+ public int sendRequest(String command, String... params)
throws Exception {
String request = command;
for (String param : params) {
@@ -52,24 +52,35 @@ public class RPCTestingClient {
return sendRequest(request);
}
- public ServerResponse sendRequest(String request) throws Exception {
+ public int sendRequest(String request) throws Exception {
LocalClientSocket connection = new LocalClientSocket();
connection.connect(new LocalSocketAddress(socketFile.getPathFile()));
try {
OutputStream out = connection.getOutputStream();
- out.write(request.getBytes(UTF_8));
+ byte[] requestBytes = request.getBytes(UTF_8);
+ byte[] requestLength = new byte[4];
+ requestLength[0] = (byte) (requestBytes.length << 24);
+ requestLength[1] = (byte) ((requestBytes.length << 16) & 0xff);
+ requestLength[2] = (byte) ((requestBytes.length << 8) & 0xff);
+ requestLength[3] = (byte) (requestBytes.length & 0xff);
+ out.write(requestLength);
+ out.write(requestBytes);
out.flush();
connection.shutdownOutput();
OutputStream stdout = outErr.getOutputStream();
OutputStream stderr = outErr.getErrorStream();
ByteArrayOutputStream control = new ByteArrayOutputStream();
- StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1',
+ StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1,
stdout, stderr, control);
ByteStreams.copy(connection.getInputStream(), demux);
demux.flush();
- return ServerResponse.parseFrom(control);
+ byte[] controlBytes = control.toByteArray();
+ return (((int) controlBytes[0]) << 24)
+ + (((int) controlBytes[1]) << 16)
+ + (((int) controlBytes[2]) << 8)
+ + ((int) controlBytes[3]);
} finally {
connection.close();
}
diff --git a/src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java b/src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java
index 47d693116a..786eb6c42b 100644
--- a/src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/util/io/StreamDemultiplexerTest.java
@@ -16,8 +16,6 @@ package com.google.devtools.build.lib.util.io;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import com.google.devtools.build.lib.util.StringUtilities;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -25,6 +23,7 @@ import org.junit.runners.JUnit4;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
import java.util.Random;
/**
@@ -37,14 +36,6 @@ public class StreamDemultiplexerTest {
private ByteArrayOutputStream err = new ByteArrayOutputStream();
private ByteArrayOutputStream ctl = new ByteArrayOutputStream();
- private byte[] lines(String... lines) {
- try {
- return StringUtilities.joinLines(lines).getBytes("ISO-8859-1");
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e);
- }
- }
-
private String toAnsi(ByteArrayOutputStream stream) {
try {
return new String(stream.toByteArray(), "ISO-8859-1");
@@ -63,8 +54,8 @@ public class StreamDemultiplexerTest {
@Test
public void testHelloWorldOnStandardOut() throws Exception {
- byte[] multiplexed = lines("@1@", "Hello, world.");
- try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out)) {
+ byte[] multiplexed = chunk(1, "Hello, world.");
+ try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out)) {
demux.write(multiplexed);
}
assertEquals("Hello, world.", out.toString("ISO-8859-1"));
@@ -72,8 +63,8 @@ public class StreamDemultiplexerTest {
@Test
public void testOutErrCtl() throws Exception {
- byte[] multiplexed = lines("@1@", "out", "@2@", "err", "@3@", "ctl", "");
- try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out, err, ctl)) {
+ byte[] multiplexed = concat(chunk(1, "out"), chunk(2, "err"), chunk(3, "ctl"));
+ try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out, err, ctl)) {
demux.write(multiplexed);
}
assertEquals("out", toAnsi(out));
@@ -83,26 +74,16 @@ public class StreamDemultiplexerTest {
@Test
public void testWithoutLineBreaks() throws Exception {
- byte[] multiplexed = lines("@1@", "just ", "@1@", "one ", "@1@", "line", "");
- try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out)) {
+ byte[] multiplexed = concat(chunk(1, "just "), chunk(1, "one "), chunk(1, "line"));
+ try (final StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out)) {
demux.write(multiplexed);
}
assertEquals("just one line", out.toString("ISO-8859-1"));
}
@Test
- public void testLineBreaks() throws Exception {
- byte[] multiplexed = lines("@1", "two", "@1", "lines", "");
- try (StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out)) {
- demux.write(multiplexed);
- demux.flush();
- assertEquals("two\nlines\n", out.toString("ISO-8859-1"));
- }
- }
-
- @Test
public void testMultiplexAndBackWithHelloWorld() throws Exception {
- StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out);
+ StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out);
StreamMultiplexer mux = new StreamMultiplexer(demux);
OutputStream out = mux.createStdout();
out.write(inAnsi("Hello, world."));
@@ -112,7 +93,7 @@ public class StreamDemultiplexerTest {
@Test
public void testMultiplexDemultiplexBinaryStress() throws Exception {
- StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', out, err, ctl);
+ StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, out, err, ctl);
StreamMultiplexer mux = new StreamMultiplexer(demux);
OutputStream[] muxOuts = {mux.createStdout(), mux.createStderr(), mux.createControl()};
ByteArrayOutputStream[] expectedOuts =
@@ -132,4 +113,32 @@ public class StreamDemultiplexerTest {
assertArrayEquals(expectedOuts[1].toByteArray(), err.toByteArray());
assertArrayEquals(expectedOuts[2].toByteArray(), ctl.toByteArray());
}
+
+ private static byte[] chunk(int stream, String payload) {
+ byte[] payloadBytes = payload.getBytes(Charset.defaultCharset());
+ byte[] result = new byte[payloadBytes.length + 5];
+
+ System.arraycopy(payloadBytes, 0, result, 5, payloadBytes.length);
+ result[0] = (byte) stream;
+ result[1] = (byte) (payloadBytes.length >> 24);
+ result[2] = (byte) ((payloadBytes.length >> 16) & 0xff);
+ result[3] = (byte) ((payloadBytes.length >> 8) & 0xff);
+ result[4] = (byte) (payloadBytes.length & 0xff);
+ return result;
+ }
+
+ private static byte[] concat(byte[]... chunks) {
+ int length = 0;
+ for (byte[] chunk : chunks) {
+ length += chunk.length;
+ }
+
+ byte[] result = new byte[length];
+ int previousChunks = 0;
+ for (byte[] chunk : chunks) {
+ System.arraycopy(chunk, 0, result, previousChunks, chunk.length);
+ previousChunks += chunk.length;
+ }
+ return result;
+ }
}
diff --git a/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java b/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java
index 932850fb0c..0c99a16d08 100644
--- a/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java
+++ b/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerParallelStressTest.java
@@ -49,8 +49,7 @@ public class StreamMultiplexerParallelStressTest {
*/
OutputStream devNull = ByteStreams.nullOutputStream();
- StreamDemultiplexer demux = new StreamDemultiplexer((byte)'1',
- devNull, devNull, devNull);
+ StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, devNull, devNull, devNull);
/**
* The multiplexer under test.
diff --git a/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java b/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java
index 26664a155d..6d3f2c3292 100644
--- a/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/util/io/StreamMultiplexerTest.java
@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.util.io;
-import static com.google.devtools.build.lib.util.StringUtilities.joinLines;
+
+import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -27,6 +28,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
/**
* Test for {@link StreamMultiplexer}.
@@ -61,42 +63,41 @@ public class StreamMultiplexerTest {
return string.getBytes("ISO-8859-1");
}
- private static String getLatin(byte[] bytes)
- throws UnsupportedEncodingException {
- return new String(bytes, "ISO-8859-1");
- }
-
@Test
- public void testHelloWorldOnStdOut() throws IOException {
+ public void testHelloWorldOnStdOut() throws Exception {
out.write(getLatin("Hello, world."));
out.flush();
- assertEquals(joinLines("@1@", "Hello, world.", ""),
- getLatin(multiplexed.toByteArray()));
+ assertMessage(multiplexed.toByteArray(), 0, "Hello, world.");
}
@Test
public void testInterleavedStdoutStderrControl() throws Exception {
+ int start = 0;
out.write(getLatin("Hello, stdout."));
out.flush();
+ assertMessage(multiplexed.toByteArray(), start, "Hello, stdout.");
+ start = multiplexed.toByteArray().length;
+
err.write(getLatin("Hello, stderr."));
err.flush();
+ assertMessage(multiplexed.toByteArray(), start, "Hello, stderr.");
+ start = multiplexed.toByteArray().length;
+
ctl.write(getLatin("Hello, control."));
ctl.flush();
+ assertMessage(multiplexed.toByteArray(), start, "Hello, control.");
+ start = multiplexed.toByteArray().length;
+
out.write(getLatin("... and back!"));
out.flush();
- assertEquals(joinLines("@1@", "Hello, stdout.",
- "@2@", "Hello, stderr.",
- "@3@", "Hello, control.",
- "@1@", "... and back!",
- ""),
- getLatin(multiplexed.toByteArray()));
+ assertMessage(multiplexed.toByteArray(), start, "... and back!");
}
@Test
public void testWillNotCommitToUnderlyingStreamUnlessFlushOrNewline()
throws Exception {
out.write(getLatin("There are no newline characters in here, so it won't" +
- " get written just yet."));
+ " get written just yet."));
assertArrayEquals(multiplexed.toByteArray(), new byte[0]);
}
@@ -105,16 +106,11 @@ public class StreamMultiplexerTest {
out.write(getLatin("No newline just yet, so no flushing. "));
assertArrayEquals(multiplexed.toByteArray(), new byte[0]);
out.write(getLatin("OK, here we go:\nAnd more to come."));
-
- String expected = joinLines("@1",
- "No newline just yet, so no flushing. OK, here we go:", "");
-
- assertEquals(expected, getLatin(multiplexed.toByteArray()));
-
+ assertMessage(
+ multiplexed.toByteArray(), 0, "No newline just yet, so no flushing. OK, here we go:\n");
+ int firstMessageLength = multiplexed.toByteArray().length;
out.write((byte) '\n');
- expected += joinLines("@1", "And more to come.", "");
-
- assertEquals(expected, getLatin(multiplexed.toByteArray()));
+ assertMessage(multiplexed.toByteArray(), firstMessageLength, "And more to come.\n");
}
@Test
@@ -122,14 +118,14 @@ public class StreamMultiplexerTest {
out.write(getLatin("Don't forget to flush!"));
assertArrayEquals(new byte[0], multiplexed.toByteArray());
out.flush(); // now the output will appear in multiplexed.
- assertEquals(joinLines("@1@", "Don't forget to flush!", ""),
- getLatin(multiplexed.toByteArray()));
+ assertStartsWith(multiplexed.toByteArray(), 1, 0, 0, 0);
+ assertMessage(multiplexed.toByteArray(), 0, "Don't forget to flush!");
}
@Test
public void testByteEncoding() throws IOException {
OutputStream devNull = ByteStreams.nullOutputStream();
- StreamDemultiplexer demux = new StreamDemultiplexer((byte) '1', devNull);
+ StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, devNull);
StreamMultiplexer mux = new StreamMultiplexer(demux);
OutputStream out = mux.createStdout();
@@ -145,4 +141,13 @@ public class StreamMultiplexerTest {
out.write(10);
}
+ private static void assertStartsWith(byte[] actual, int... expectedPrefix){
+ for (int i = 0; i < expectedPrefix.length; i++) {
+ assertThat(actual[i]).isEqualTo(expectedPrefix[i]);
+ }
+ }
+
+ private static void assertMessage(byte[] actual, int start, String expected) throws Exception {
+ assertThat(Arrays.copyOfRange(actual, start + 5, actual.length)).isEqualTo(getLatin(expected));
+ }
}