diff options
author | 2016-04-15 13:11:21 +0000 | |
---|---|---|
committer | 2016-04-15 14:09:06 +0000 | |
commit | 1b25ce2028fe36d614a08478c703754e3b5ebd69 (patch) | |
tree | 86c4bc1decde394003d02f1a4418e7c547d1cb5b /src/main/java/com/google/devtools/build/lib/server | |
parent | bf8d910e4bc993e33d747ce4e9dd3f8498734c53 (diff) |
Initial version of client-server communication over gRPC.
This still has a number of issues, including, but not limited to:
- When switching between gRPC and AF_UNIX mode, you need to do a manual shutdown
- The console is spammed with "connection refused" messages on server startup
- When in gRPC mode, server also starts up an AF_UNIX server even though it's not necessary and concurrent requests probably make Bazel crash and burn
- I have no idea how concurrent gRPC requests are handled and now many threads gRPC creates
- Not tested except under Linux
- The request/response cookies are written in an odd format (negative bytes are not handled correctly). This is only a cosmetic issue, since the data content of the string is the same either way.
Can be tested with the --grpc_port=0 (or a valid port number) startup option.
--
MOS_MIGRATED_REVID=119948959
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | 152 |
1 files changed, 129 insertions, 23 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 7a03c16eba..cd387362b9 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -14,22 +14,31 @@ package com.google.devtools.build.lib.server; -import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.runtime.CommandExecutor; +import com.google.devtools.build.lib.server.CommandProtos.PingRequest; +import com.google.devtools.build.lib.server.CommandProtos.PingResponse; +import com.google.devtools.build.lib.server.CommandProtos.RunRequest; +import com.google.devtools.build.lib.server.CommandProtos.RunResponse; import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.io.OutErr; +import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; -import java.io.PrintWriter; +import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; /** * gRPC server class. @@ -38,14 +47,6 @@ import java.nio.channels.ServerSocketChannel; * bootstrapping. */ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer { - private static final String PORT_FILE = "server/grpc_port"; // relative to the output base - private final CommandExecutor commandExecutor; - private final Clock clock; - private final File portFile; - private Server server; - private int port; // mutable so that we can overwrite it if port 0 is passed in - boolean serving; - /** * Factory class. Instantiated by reflection. */ @@ -57,13 +58,81 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ } } - private GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, + private enum StreamType { + STDOUT, + STDERR, + } + + // TODO(lberki): Maybe we should implement line buffering? + private class RpcOutputStream extends OutputStream { + private final StreamObserver<RunResponse> observer; + private final StreamType type; + + private RpcOutputStream(StreamObserver<RunResponse> observer, StreamType type) { + this.observer = observer; + this.type = type; + } + + @Override + public synchronized void write(byte[] b, int off, int inlen) { + ByteString input = ByteString.copyFrom(b, off, inlen); + RunResponse.Builder response = RunResponse + .newBuilder() + .setCookie(responseCookie); + + switch (type) { + case STDOUT: response.setStdout(input); break; + case STDERR: response.setStderr(input); break; + default: throw new IllegalStateException(); + } + + observer.onNext(response.build()); + } + + @Override + public void write(int byteAsInt) throws IOException { + byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons + write(new byte[] {b}, 0, 1); + } + } + + // These paths are all relative to the output base + private static final String PORT_FILE = "server/grpc_port"; + private static final String REQUEST_COOKIE_FILE = "server/request_cookie"; + private static final String RESPONSE_COOKIE_FILE = "server/response_cookie"; + + private final CommandExecutor commandExecutor; + private final Clock clock; + private final String outputBase; + private final String requestCookie; + private final String responseCookie; + + private Server server; + private int port; // mutable so that we can overwrite it if port 0 is passed in + boolean serving; + + public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, String outputBase) { this.commandExecutor = commandExecutor; this.clock = clock; + this.outputBase = outputBase; this.port = port; - this.portFile = new File(outputBase + "/" + PORT_FILE); this.serving = false; + + SecureRandom random = new SecureRandom(); + requestCookie = generateCookie(random, 16); + responseCookie = generateCookie(random, 16); + } + + private static String generateCookie(SecureRandom random, int byteCount) { + byte[] bytes = new byte[byteCount]; + random.nextBytes(bytes); + StringBuilder result = new StringBuilder(); + for (byte b : bytes) { + result.append(Integer.toHexString(((int) b) + 128)); + } + + return result.toString(); } public void serve() throws IOException { @@ -73,14 +142,23 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ .build(); server.start(); + serving = true; + if (port == 0) { port = getActualServerPort(); } - PrintWriter portWriter = new PrintWriter(portFile); - portWriter.print(port); - portWriter.close(); - serving = true; + writeFile(PORT_FILE, Integer.toString(port)); + writeFile(REQUEST_COOKIE_FILE, requestCookie); + writeFile(RESPONSE_COOKIE_FILE, responseCookie); + + } + + private void writeFile(String path, String contents) throws IOException { + OutputStreamWriter writer = new OutputStreamWriter( + new FileOutputStream(new File(outputBase + "/" + path)), StandardCharsets.UTF_8); + writer.write(contents); + writer.close(); } /** @@ -120,7 +198,6 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ } public void terminate() { - Preconditions.checkState(serving); server.shutdownNow(); // This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same // amount of code as implementing it ourselves. @@ -143,14 +220,43 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ } @Override - public void run(CommandProtos.Request request, StreamObserver<CommandProtos.Response> observer) { - Preconditions.checkState(serving); - commandExecutor.exec( - ImmutableList.of("version"), OutErr.SYSTEM_OUT_ERR, clock.currentTimeMillis()); - CommandProtos.Response response = CommandProtos.Response.newBuilder() - .setNumber(request.getNumber() + 1) + public void run( + RunRequest request, StreamObserver<RunResponse> observer) { + if (!request.getCookie().equals(requestCookie)) { + observer.onNext(RunResponse.newBuilder() + .setExitCode(ExitCode.LOCAL_ENVIRONMENTAL_ERROR.getNumericExitCode()) + .build()); + observer.onCompleted(); + return; + } + + OutErr rpcOutErr = OutErr.create( + new RpcOutputStream(observer, StreamType.STDOUT), + new RpcOutputStream(observer, StreamType.STDERR)); + + int exitCode = commandExecutor.exec( + request.getArgList(), rpcOutErr, clock.currentTimeMillis()); + + RunResponse response = RunResponse.newBuilder() + .setCookie(responseCookie) + .setFinished(true) + .setExitCode(exitCode) .build(); + observer.onNext(response); observer.onCompleted(); } + + @Override + public void ping(PingRequest pingRequest, StreamObserver<PingResponse> streamObserver) { + Preconditions.checkState(serving); + + CommandProtos.PingResponse.Builder response = CommandProtos.PingResponse.newBuilder(); + if (pingRequest.getCookie().equals(requestCookie)) { + response.setCookie(responseCookie); + } + + streamObserver.onNext(response.build()); + streamObserver.onCompleted(); + } } |