From 1b25ce2028fe36d614a08478c703754e3b5ebd69 Mon Sep 17 00:00:00 2001 From: Lukacs Berki Date: Fri, 15 Apr 2016 13:11:21 +0000 Subject: Initial version of client-server communication over gRPC. This still has a number of issues, including, but not limited to: - When switching between gRPC and AF_UNIX mode, you need to do a manual shutdown - The console is spammed with "connection refused" messages on server startup - When in gRPC mode, server also starts up an AF_UNIX server even though it's not necessary and concurrent requests probably make Bazel crash and burn - I have no idea how concurrent gRPC requests are handled and now many threads gRPC creates - Not tested except under Linux - The request/response cookies are written in an odd format (negative bytes are not handled correctly). This is only a cosmetic issue, since the data content of the string is the same either way. Can be tested with the --grpc_port=0 (or a valid port number) startup option. -- MOS_MIGRATED_REVID=119948959 --- src/main/cpp/blaze.cc | 157 +++++++++++++++++---- .../devtools/build/lib/server/GrpcServerImpl.java | 152 +++++++++++++++++--- src/main/protobuf/command_server.proto | 31 +++- 3 files changed, 284 insertions(+), 56 deletions(-) (limited to 'src') diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc index 256abb632e..20f4ae57ee 100644 --- a/src/main/cpp/blaze.cc +++ b/src/main/cpp/blaze.cc @@ -56,6 +56,7 @@ #include #include +#include // NOLINT (gRPC requires this) #include #include #include @@ -504,14 +505,41 @@ static int StartServer() { } class BlazeServer { + public: + virtual ~BlazeServer() {} + + virtual bool Connect() = 0; + virtual void Disconnect() = 0; + virtual void Communicate() = 0; +}; + +class AfUnixBlazeServer : public BlazeServer { private: int server_socket; public: - BlazeServer(); - bool Connect(); - void Disconnect(); - void ATTRIBUTE_NORETURN Communicate(); + AfUnixBlazeServer(); + virtual ~AfUnixBlazeServer() {} + + bool Connect() override; + void Disconnect() override; + void Communicate() override; +}; + +class GrpcBlazeServer : public BlazeServer { + private: + std::unique_ptr client; + + std::string request_cookie; + std::string response_cookie; + + public: + GrpcBlazeServer(); + virtual ~GrpcBlazeServer() {} + + bool Connect() override; + void Disconnect() override; + void Communicate() override; }; static bool KillRunningServerIfAny(BlazeServer *server); @@ -563,11 +591,11 @@ static void StartStandalone(BlazeServer* server) { pdie(blaze_exit_code::INTERNAL_ERROR, "execv of '%s' failed", exe.c_str()); } -BlazeServer::BlazeServer() { +AfUnixBlazeServer::AfUnixBlazeServer() { server_socket = -1; } -bool BlazeServer::Connect() { +bool AfUnixBlazeServer::Connect() { if (server_socket == -1) { server_socket = socket(PF_UNIX, SOCK_STREAM, 0); if (server_socket == -1) { @@ -603,7 +631,7 @@ bool BlazeServer::Connect() { } } -void BlazeServer::Disconnect() { +void AfUnixBlazeServer::Disconnect() { close(server_socket); server_socket = -1; } @@ -663,7 +691,7 @@ static void forward_server_output(int socket, int output) { } } -void ATTRIBUTE_NORETURN BlazeServer::Communicate() { +void AfUnixBlazeServer::Communicate() { const string request = BuildServerRequest(); // Send request (Request is written in a single chunk.) @@ -832,7 +860,7 @@ static bool ConnectToServer(BlazeServer *server, bool start) { } fputc('.', stderr); fflush(stderr); - poll(NULL, 0, 100); // sleep 100ms. (usleep(3) is obsolete.) + poll(NULL, 0, 1000); // sleep 100ms. (usleep(3) is obsolete.) char c; if (read(fd, &c, 1) != -1 || errno != EAGAIN) { fprintf(stderr, "\nunexpected pipe read status: %s\n" @@ -1374,6 +1402,8 @@ static ATTRIBUTE_NORETURN void SendServerRequest(BlazeServer* server) { signal(SIGQUIT, handler); server->Communicate(); + fprintf(stderr, "Communicate() did not exit"); + exit(blaze_exit_code::INTERNAL_ERROR); } // Parse the options, storing parsed values in globals. @@ -1695,39 +1725,112 @@ int main(int argc, const char *argv[]) { WarnFilesystemType(globals->options.output_base); EnsureFiniteStackLimit(); - BlazeServer server; + std::unique_ptr server( + globals->options.grpc_port >= 0 + ? static_cast(new GrpcBlazeServer()) + : static_cast(new AfUnixBlazeServer())); ExtractData(self_path); - EnsureCorrectRunningVersion(&server); - KillRunningServerIfDifferentStartupOptions(&server); + EnsureCorrectRunningVersion(server.get()); + KillRunningServerIfDifferentStartupOptions(server.get()); if (globals->options.batch) { SetScheduling(globals->options.batch_cpu_scheduling, globals->options.io_nice_level); - StartStandalone(&server); + StartStandalone(server.get()); } else { - SendServerRequest(&server); + SendServerRequest(server.get()); } return 0; } -} // namespace blaze - -int main(int argc, const char *argv[]) { - return blaze::main(argc, argv); +GrpcBlazeServer::GrpcBlazeServer() { } -// Unused method just to make sure that we can compile and link with gRPC -void InvokeServer(const std::string& address) { - std::shared_ptr channel( - grpc::CreateChannel(address, grpc::InsecureChannelCredentials())); +bool GrpcBlazeServer::Connect() { + std::string server_dir = globals->options.output_base + "/server"; + std::string port; + + if (!ReadFile(server_dir + "/grpc_port", &port)) { + return false; + } + + if (!ReadFile(server_dir + "/request_cookie", &request_cookie)) { + return false; + } + + if (!ReadFile(server_dir + "/response_cookie", &response_cookie)) { + return false; + } + + std::shared_ptr channel(grpc::CreateChannel( + "localhost:" + port, grpc::InsecureChannelCredentials())); std::unique_ptr client( command_server::CommandServer::NewStub(channel)); - command_server::Request request; - command_server::Response response; + + grpc::ClientContext context; + context.set_deadline( + std::chrono::system_clock::now() + std::chrono::milliseconds(50)); + + command_server::PingRequest request; + command_server::PingResponse response; + request.set_cookie(request_cookie); + grpc::Status status = client->Ping(&context, request, &response); + + if (!status.ok()) { + return false; + } + + this->client = std::move(client); + return true; +} + +void GrpcBlazeServer::Communicate() { + vector arg_vector; + string command = globals->option_processor.GetCommand(); + if (command != "") { + arg_vector.push_back(command); + AddLoggingArgs(&arg_vector); + } + + globals->option_processor.GetCommandArguments(&arg_vector); + + command_server::RunRequest request; + request.set_cookie(request_cookie); + for (const string& arg : arg_vector) { + request.add_arg(arg); + } + grpc::ClientContext context; + command_server::RunResponse response; + std::unique_ptr> reader( + client->Run(&context, request)); + while (reader->Read(&response)) { + if (response.stdout().size() > 0) { + write(1, response.stdout().c_str(), response.stdout().size()); + } + + if (response.stderr().size() > 0) { + write(2, response.stderr().c_str(), response.stderr().size()); + } + } + + if (!response.finished()) { + fprintf(stderr, "\nServer finished RPC without an explicit exit code\n"); + exit(blaze_exit_code::INTERNAL_ERROR); + } - request.set_number(42); - grpc::Status status = client->Run(&context, request, &response); - fprintf(stderr, "The response is %d\n", response.number()); + exit(response.exit_code()); +} + +void GrpcBlazeServer::Disconnect() { + client.reset(); + request_cookie = ""; + response_cookie = ""; +} + +} // namespace blaze + +int main(int argc, const char *argv[]) { + return blaze::main(argc, argv); } diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 7a03c16eba..cd387362b9 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -14,22 +14,31 @@ package com.google.devtools.build.lib.server; -import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.runtime.CommandExecutor; +import com.google.devtools.build.lib.server.CommandProtos.PingRequest; +import com.google.devtools.build.lib.server.CommandProtos.PingResponse; +import com.google.devtools.build.lib.server.CommandProtos.RunRequest; +import com.google.devtools.build.lib.server.CommandProtos.RunResponse; import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.io.OutErr; +import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; -import java.io.PrintWriter; +import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; /** * gRPC server class. @@ -38,14 +47,6 @@ import java.nio.channels.ServerSocketChannel; * bootstrapping. */ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer { - private static final String PORT_FILE = "server/grpc_port"; // relative to the output base - private final CommandExecutor commandExecutor; - private final Clock clock; - private final File portFile; - private Server server; - private int port; // mutable so that we can overwrite it if port 0 is passed in - boolean serving; - /** * Factory class. Instantiated by reflection. */ @@ -57,13 +58,81 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ } } - private GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, + private enum StreamType { + STDOUT, + STDERR, + } + + // TODO(lberki): Maybe we should implement line buffering? + private class RpcOutputStream extends OutputStream { + private final StreamObserver observer; + private final StreamType type; + + private RpcOutputStream(StreamObserver observer, StreamType type) { + this.observer = observer; + this.type = type; + } + + @Override + public synchronized void write(byte[] b, int off, int inlen) { + ByteString input = ByteString.copyFrom(b, off, inlen); + RunResponse.Builder response = RunResponse + .newBuilder() + .setCookie(responseCookie); + + switch (type) { + case STDOUT: response.setStdout(input); break; + case STDERR: response.setStderr(input); break; + default: throw new IllegalStateException(); + } + + observer.onNext(response.build()); + } + + @Override + public void write(int byteAsInt) throws IOException { + byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons + write(new byte[] {b}, 0, 1); + } + } + + // These paths are all relative to the output base + private static final String PORT_FILE = "server/grpc_port"; + private static final String REQUEST_COOKIE_FILE = "server/request_cookie"; + private static final String RESPONSE_COOKIE_FILE = "server/response_cookie"; + + private final CommandExecutor commandExecutor; + private final Clock clock; + private final String outputBase; + private final String requestCookie; + private final String responseCookie; + + private Server server; + private int port; // mutable so that we can overwrite it if port 0 is passed in + boolean serving; + + public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, String outputBase) { this.commandExecutor = commandExecutor; this.clock = clock; + this.outputBase = outputBase; this.port = port; - this.portFile = new File(outputBase + "/" + PORT_FILE); this.serving = false; + + SecureRandom random = new SecureRandom(); + requestCookie = generateCookie(random, 16); + responseCookie = generateCookie(random, 16); + } + + private static String generateCookie(SecureRandom random, int byteCount) { + byte[] bytes = new byte[byteCount]; + random.nextBytes(bytes); + StringBuilder result = new StringBuilder(); + for (byte b : bytes) { + result.append(Integer.toHexString(((int) b) + 128)); + } + + return result.toString(); } public void serve() throws IOException { @@ -73,14 +142,23 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ .build(); server.start(); + serving = true; + if (port == 0) { port = getActualServerPort(); } - PrintWriter portWriter = new PrintWriter(portFile); - portWriter.print(port); - portWriter.close(); - serving = true; + writeFile(PORT_FILE, Integer.toString(port)); + writeFile(REQUEST_COOKIE_FILE, requestCookie); + writeFile(RESPONSE_COOKIE_FILE, responseCookie); + + } + + private void writeFile(String path, String contents) throws IOException { + OutputStreamWriter writer = new OutputStreamWriter( + new FileOutputStream(new File(outputBase + "/" + path)), StandardCharsets.UTF_8); + writer.write(contents); + writer.close(); } /** @@ -120,7 +198,6 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ } public void terminate() { - Preconditions.checkState(serving); server.shutdownNow(); // This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same // amount of code as implementing it ourselves. @@ -143,14 +220,43 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ } @Override - public void run(CommandProtos.Request request, StreamObserver observer) { - Preconditions.checkState(serving); - commandExecutor.exec( - ImmutableList.of("version"), OutErr.SYSTEM_OUT_ERR, clock.currentTimeMillis()); - CommandProtos.Response response = CommandProtos.Response.newBuilder() - .setNumber(request.getNumber() + 1) + public void run( + RunRequest request, StreamObserver observer) { + if (!request.getCookie().equals(requestCookie)) { + observer.onNext(RunResponse.newBuilder() + .setExitCode(ExitCode.LOCAL_ENVIRONMENTAL_ERROR.getNumericExitCode()) + .build()); + observer.onCompleted(); + return; + } + + OutErr rpcOutErr = OutErr.create( + new RpcOutputStream(observer, StreamType.STDOUT), + new RpcOutputStream(observer, StreamType.STDERR)); + + int exitCode = commandExecutor.exec( + request.getArgList(), rpcOutErr, clock.currentTimeMillis()); + + RunResponse response = RunResponse.newBuilder() + .setCookie(responseCookie) + .setFinished(true) + .setExitCode(exitCode) .build(); + observer.onNext(response); observer.onCompleted(); } + + @Override + public void ping(PingRequest pingRequest, StreamObserver streamObserver) { + Preconditions.checkState(serving); + + CommandProtos.PingResponse.Builder response = CommandProtos.PingResponse.newBuilder(); + if (pingRequest.getCookie().equals(requestCookie)) { + response.setCookie(responseCookie); + } + + streamObserver.onNext(response.build()); + streamObserver.onCompleted(); + } } diff --git a/src/main/protobuf/command_server.proto b/src/main/protobuf/command_server.proto index d2a9b69b3c..8055610c08 100644 --- a/src/main/protobuf/command_server.proto +++ b/src/main/protobuf/command_server.proto @@ -22,15 +22,34 @@ package command_server; option java_package = "com.google.devtools.build.lib.server"; option java_outer_classname = "CommandProtos"; -message Request { - int32 number = 1234; // Test field, will be removed +message RunRequest { + // This must be the request cookie from the output base. A rudimentary form of authentication. + string cookie = 1; + repeated string arg = 2; } -message Response { - int32 number = 1234; // Test field, will be removed +message RunResponse { + // The server always sets this to the response cookie from the output base. A rudimentary form + // of authentication. + string cookie = 1; + bytes stdout = 2; + bytes stderr = 3; + bool finished = 4; // Whether this is the last message of the stream + int32 exit_code = 5; // Only valid for the last message in the stream +} + +message PingRequest { + string cookie = 1; +} + +message PingResponse { + string cookie = 1; } service CommandServer { - // Dummy method to test if compiling things works - rpc Run (Request) returns (Response) {} + // Run a Bazel command. + rpc Run (RunRequest) returns (stream RunResponse) {} + + // Does not do anything. Used for liveness check. + rpc Ping (PingRequest) returns (PingResponse) {} } -- cgit v1.2.3