aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/server
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-04-15 13:11:21 +0000
committerGravatar Dmitry Lomov <dslomov@google.com>2016-04-15 14:09:06 +0000
commit1b25ce2028fe36d614a08478c703754e3b5ebd69 (patch)
tree86c4bc1decde394003d02f1a4418e7c547d1cb5b /src/main/java/com/google/devtools/build/lib/server
parentbf8d910e4bc993e33d747ce4e9dd3f8498734c53 (diff)
Initial version of client-server communication over gRPC.
This still has a number of issues, including, but not limited to: - When switching between gRPC and AF_UNIX mode, you need to do a manual shutdown - The console is spammed with "connection refused" messages on server startup - When in gRPC mode, server also starts up an AF_UNIX server even though it's not necessary and concurrent requests probably make Bazel crash and burn - I have no idea how concurrent gRPC requests are handled and now many threads gRPC creates - Not tested except under Linux - The request/response cookies are written in an odd format (negative bytes are not handled correctly). This is only a cosmetic issue, since the data content of the string is the same either way. Can be tested with the --grpc_port=0 (or a valid port number) startup option. -- MOS_MIGRATED_REVID=119948959
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java152
1 files changed, 129 insertions, 23 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 7a03c16eba..cd387362b9 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -14,22 +14,31 @@
package com.google.devtools.build.lib.server;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.runtime.CommandExecutor;
+import com.google.devtools.build.lib.server.CommandProtos.PingRequest;
+import com.google.devtools.build.lib.server.CommandProtos.PingResponse;
+import com.google.devtools.build.lib.server.CommandProtos.RunRequest;
+import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.PrintWriter;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
/**
* gRPC server class.
@@ -38,14 +47,6 @@ import java.nio.channels.ServerSocketChannel;
* bootstrapping.
*/
public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer {
- private static final String PORT_FILE = "server/grpc_port"; // relative to the output base
- private final CommandExecutor commandExecutor;
- private final Clock clock;
- private final File portFile;
- private Server server;
- private int port; // mutable so that we can overwrite it if port 0 is passed in
- boolean serving;
-
/**
* Factory class. Instantiated by reflection.
*/
@@ -57,13 +58,81 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
}
}
- private GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
+ private enum StreamType {
+ STDOUT,
+ STDERR,
+ }
+
+ // TODO(lberki): Maybe we should implement line buffering?
+ private class RpcOutputStream extends OutputStream {
+ private final StreamObserver<RunResponse> observer;
+ private final StreamType type;
+
+ private RpcOutputStream(StreamObserver<RunResponse> observer, StreamType type) {
+ this.observer = observer;
+ this.type = type;
+ }
+
+ @Override
+ public synchronized void write(byte[] b, int off, int inlen) {
+ ByteString input = ByteString.copyFrom(b, off, inlen);
+ RunResponse.Builder response = RunResponse
+ .newBuilder()
+ .setCookie(responseCookie);
+
+ switch (type) {
+ case STDOUT: response.setStdout(input); break;
+ case STDERR: response.setStderr(input); break;
+ default: throw new IllegalStateException();
+ }
+
+ observer.onNext(response.build());
+ }
+
+ @Override
+ public void write(int byteAsInt) throws IOException {
+ byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons
+ write(new byte[] {b}, 0, 1);
+ }
+ }
+
+ // These paths are all relative to the output base
+ private static final String PORT_FILE = "server/grpc_port";
+ private static final String REQUEST_COOKIE_FILE = "server/request_cookie";
+ private static final String RESPONSE_COOKIE_FILE = "server/response_cookie";
+
+ private final CommandExecutor commandExecutor;
+ private final Clock clock;
+ private final String outputBase;
+ private final String requestCookie;
+ private final String responseCookie;
+
+ private Server server;
+ private int port; // mutable so that we can overwrite it if port 0 is passed in
+ boolean serving;
+
+ public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
String outputBase) {
this.commandExecutor = commandExecutor;
this.clock = clock;
+ this.outputBase = outputBase;
this.port = port;
- this.portFile = new File(outputBase + "/" + PORT_FILE);
this.serving = false;
+
+ SecureRandom random = new SecureRandom();
+ requestCookie = generateCookie(random, 16);
+ responseCookie = generateCookie(random, 16);
+ }
+
+ private static String generateCookie(SecureRandom random, int byteCount) {
+ byte[] bytes = new byte[byteCount];
+ random.nextBytes(bytes);
+ StringBuilder result = new StringBuilder();
+ for (byte b : bytes) {
+ result.append(Integer.toHexString(((int) b) + 128));
+ }
+
+ return result.toString();
}
public void serve() throws IOException {
@@ -73,14 +142,23 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
.build();
server.start();
+ serving = true;
+
if (port == 0) {
port = getActualServerPort();
}
- PrintWriter portWriter = new PrintWriter(portFile);
- portWriter.print(port);
- portWriter.close();
- serving = true;
+ writeFile(PORT_FILE, Integer.toString(port));
+ writeFile(REQUEST_COOKIE_FILE, requestCookie);
+ writeFile(RESPONSE_COOKIE_FILE, responseCookie);
+
+ }
+
+ private void writeFile(String path, String contents) throws IOException {
+ OutputStreamWriter writer = new OutputStreamWriter(
+ new FileOutputStream(new File(outputBase + "/" + path)), StandardCharsets.UTF_8);
+ writer.write(contents);
+ writer.close();
}
/**
@@ -120,7 +198,6 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
}
public void terminate() {
- Preconditions.checkState(serving);
server.shutdownNow();
// This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same
// amount of code as implementing it ourselves.
@@ -143,14 +220,43 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
}
@Override
- public void run(CommandProtos.Request request, StreamObserver<CommandProtos.Response> observer) {
- Preconditions.checkState(serving);
- commandExecutor.exec(
- ImmutableList.of("version"), OutErr.SYSTEM_OUT_ERR, clock.currentTimeMillis());
- CommandProtos.Response response = CommandProtos.Response.newBuilder()
- .setNumber(request.getNumber() + 1)
+ public void run(
+ RunRequest request, StreamObserver<RunResponse> observer) {
+ if (!request.getCookie().equals(requestCookie)) {
+ observer.onNext(RunResponse.newBuilder()
+ .setExitCode(ExitCode.LOCAL_ENVIRONMENTAL_ERROR.getNumericExitCode())
+ .build());
+ observer.onCompleted();
+ return;
+ }
+
+ OutErr rpcOutErr = OutErr.create(
+ new RpcOutputStream(observer, StreamType.STDOUT),
+ new RpcOutputStream(observer, StreamType.STDERR));
+
+ int exitCode = commandExecutor.exec(
+ request.getArgList(), rpcOutErr, clock.currentTimeMillis());
+
+ RunResponse response = RunResponse.newBuilder()
+ .setCookie(responseCookie)
+ .setFinished(true)
+ .setExitCode(exitCode)
.build();
+
observer.onNext(response);
observer.onCompleted();
}
+
+ @Override
+ public void ping(PingRequest pingRequest, StreamObserver<PingResponse> streamObserver) {
+ Preconditions.checkState(serving);
+
+ CommandProtos.PingResponse.Builder response = CommandProtos.PingResponse.newBuilder();
+ if (pingRequest.getCookie().equals(requestCookie)) {
+ response.setCookie(responseCookie);
+ }
+
+ streamObserver.onNext(response.build());
+ streamObserver.onCompleted();
+ }
}