aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-04-15 13:11:21 +0000
committerGravatar Dmitry Lomov <dslomov@google.com>2016-04-15 14:09:06 +0000
commit1b25ce2028fe36d614a08478c703754e3b5ebd69 (patch)
tree86c4bc1decde394003d02f1a4418e7c547d1cb5b /src
parentbf8d910e4bc993e33d747ce4e9dd3f8498734c53 (diff)
Initial version of client-server communication over gRPC.
This still has a number of issues, including, but not limited to: - When switching between gRPC and AF_UNIX mode, you need to do a manual shutdown - The console is spammed with "connection refused" messages on server startup - When in gRPC mode, server also starts up an AF_UNIX server even though it's not necessary and concurrent requests probably make Bazel crash and burn - I have no idea how concurrent gRPC requests are handled and now many threads gRPC creates - Not tested except under Linux - The request/response cookies are written in an odd format (negative bytes are not handled correctly). This is only a cosmetic issue, since the data content of the string is the same either way. Can be tested with the --grpc_port=0 (or a valid port number) startup option. -- MOS_MIGRATED_REVID=119948959
Diffstat (limited to 'src')
-rw-r--r--src/main/cpp/blaze.cc157
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java152
-rw-r--r--src/main/protobuf/command_server.proto31
3 files changed, 284 insertions, 56 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index 256abb632e..20f4ae57ee 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -56,6 +56,7 @@
#include <grpc++/security/credentials.h>
#include <algorithm>
+#include <chrono> // NOLINT (gRPC requires this)
#include <set>
#include <string>
#include <utility>
@@ -504,14 +505,41 @@ static int StartServer() {
}
class BlazeServer {
+ public:
+ virtual ~BlazeServer() {}
+
+ virtual bool Connect() = 0;
+ virtual void Disconnect() = 0;
+ virtual void Communicate() = 0;
+};
+
+class AfUnixBlazeServer : public BlazeServer {
private:
int server_socket;
public:
- BlazeServer();
- bool Connect();
- void Disconnect();
- void ATTRIBUTE_NORETURN Communicate();
+ AfUnixBlazeServer();
+ virtual ~AfUnixBlazeServer() {}
+
+ bool Connect() override;
+ void Disconnect() override;
+ void Communicate() override;
+};
+
+class GrpcBlazeServer : public BlazeServer {
+ private:
+ std::unique_ptr<command_server::CommandServer::Stub> client;
+
+ std::string request_cookie;
+ std::string response_cookie;
+
+ public:
+ GrpcBlazeServer();
+ virtual ~GrpcBlazeServer() {}
+
+ bool Connect() override;
+ void Disconnect() override;
+ void Communicate() override;
};
static bool KillRunningServerIfAny(BlazeServer *server);
@@ -563,11 +591,11 @@ static void StartStandalone(BlazeServer* server) {
pdie(blaze_exit_code::INTERNAL_ERROR, "execv of '%s' failed", exe.c_str());
}
-BlazeServer::BlazeServer() {
+AfUnixBlazeServer::AfUnixBlazeServer() {
server_socket = -1;
}
-bool BlazeServer::Connect() {
+bool AfUnixBlazeServer::Connect() {
if (server_socket == -1) {
server_socket = socket(PF_UNIX, SOCK_STREAM, 0);
if (server_socket == -1) {
@@ -603,7 +631,7 @@ bool BlazeServer::Connect() {
}
}
-void BlazeServer::Disconnect() {
+void AfUnixBlazeServer::Disconnect() {
close(server_socket);
server_socket = -1;
}
@@ -663,7 +691,7 @@ static void forward_server_output(int socket, int output) {
}
}
-void ATTRIBUTE_NORETURN BlazeServer::Communicate() {
+void AfUnixBlazeServer::Communicate() {
const string request = BuildServerRequest();
// Send request (Request is written in a single chunk.)
@@ -832,7 +860,7 @@ static bool ConnectToServer(BlazeServer *server, bool start) {
}
fputc('.', stderr);
fflush(stderr);
- poll(NULL, 0, 100); // sleep 100ms. (usleep(3) is obsolete.)
+ poll(NULL, 0, 1000); // sleep 100ms. (usleep(3) is obsolete.)
char c;
if (read(fd, &c, 1) != -1 || errno != EAGAIN) {
fprintf(stderr, "\nunexpected pipe read status: %s\n"
@@ -1374,6 +1402,8 @@ static ATTRIBUTE_NORETURN void SendServerRequest(BlazeServer* server) {
signal(SIGQUIT, handler);
server->Communicate();
+ fprintf(stderr, "Communicate() did not exit");
+ exit(blaze_exit_code::INTERNAL_ERROR);
}
// Parse the options, storing parsed values in globals.
@@ -1695,39 +1725,112 @@ int main(int argc, const char *argv[]) {
WarnFilesystemType(globals->options.output_base);
EnsureFiniteStackLimit();
- BlazeServer server;
+ std::unique_ptr<BlazeServer> server(
+ globals->options.grpc_port >= 0
+ ? static_cast<BlazeServer *>(new GrpcBlazeServer())
+ : static_cast<BlazeServer *>(new AfUnixBlazeServer()));
ExtractData(self_path);
- EnsureCorrectRunningVersion(&server);
- KillRunningServerIfDifferentStartupOptions(&server);
+ EnsureCorrectRunningVersion(server.get());
+ KillRunningServerIfDifferentStartupOptions(server.get());
if (globals->options.batch) {
SetScheduling(globals->options.batch_cpu_scheduling,
globals->options.io_nice_level);
- StartStandalone(&server);
+ StartStandalone(server.get());
} else {
- SendServerRequest(&server);
+ SendServerRequest(server.get());
}
return 0;
}
-} // namespace blaze
-
-int main(int argc, const char *argv[]) {
- return blaze::main(argc, argv);
+GrpcBlazeServer::GrpcBlazeServer() {
}
-// Unused method just to make sure that we can compile and link with gRPC
-void InvokeServer(const std::string& address) {
- std::shared_ptr<grpc::Channel> channel(
- grpc::CreateChannel(address, grpc::InsecureChannelCredentials()));
+bool GrpcBlazeServer::Connect() {
+ std::string server_dir = globals->options.output_base + "/server";
+ std::string port;
+
+ if (!ReadFile(server_dir + "/grpc_port", &port)) {
+ return false;
+ }
+
+ if (!ReadFile(server_dir + "/request_cookie", &request_cookie)) {
+ return false;
+ }
+
+ if (!ReadFile(server_dir + "/response_cookie", &response_cookie)) {
+ return false;
+ }
+
+ std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel(
+ "localhost:" + port, grpc::InsecureChannelCredentials()));
std::unique_ptr<command_server::CommandServer::Stub> client(
command_server::CommandServer::NewStub(channel));
- command_server::Request request;
- command_server::Response response;
+
+ grpc::ClientContext context;
+ context.set_deadline(
+ std::chrono::system_clock::now() + std::chrono::milliseconds(50));
+
+ command_server::PingRequest request;
+ command_server::PingResponse response;
+ request.set_cookie(request_cookie);
+ grpc::Status status = client->Ping(&context, request, &response);
+
+ if (!status.ok()) {
+ return false;
+ }
+
+ this->client = std::move(client);
+ return true;
+}
+
+void GrpcBlazeServer::Communicate() {
+ vector<string> arg_vector;
+ string command = globals->option_processor.GetCommand();
+ if (command != "") {
+ arg_vector.push_back(command);
+ AddLoggingArgs(&arg_vector);
+ }
+
+ globals->option_processor.GetCommandArguments(&arg_vector);
+
+ command_server::RunRequest request;
+ request.set_cookie(request_cookie);
+ for (const string& arg : arg_vector) {
+ request.add_arg(arg);
+ }
+
grpc::ClientContext context;
+ command_server::RunResponse response;
+ std::unique_ptr<grpc::ClientReader<command_server::RunResponse>> reader(
+ client->Run(&context, request));
+ while (reader->Read(&response)) {
+ if (response.stdout().size() > 0) {
+ write(1, response.stdout().c_str(), response.stdout().size());
+ }
+
+ if (response.stderr().size() > 0) {
+ write(2, response.stderr().c_str(), response.stderr().size());
+ }
+ }
+
+ if (!response.finished()) {
+ fprintf(stderr, "\nServer finished RPC without an explicit exit code\n");
+ exit(blaze_exit_code::INTERNAL_ERROR);
+ }
- request.set_number(42);
- grpc::Status status = client->Run(&context, request, &response);
- fprintf(stderr, "The response is %d\n", response.number());
+ exit(response.exit_code());
+}
+
+void GrpcBlazeServer::Disconnect() {
+ client.reset();
+ request_cookie = "";
+ response_cookie = "";
+}
+
+} // namespace blaze
+
+int main(int argc, const char *argv[]) {
+ return blaze::main(argc, argv);
}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 7a03c16eba..cd387362b9 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -14,22 +14,31 @@
package com.google.devtools.build.lib.server;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.runtime.CommandExecutor;
+import com.google.devtools.build.lib.server.CommandProtos.PingRequest;
+import com.google.devtools.build.lib.server.CommandProtos.PingResponse;
+import com.google.devtools.build.lib.server.CommandProtos.RunRequest;
+import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.PrintWriter;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
/**
* gRPC server class.
@@ -38,14 +47,6 @@ import java.nio.channels.ServerSocketChannel;
* bootstrapping.
*/
public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer {
- private static final String PORT_FILE = "server/grpc_port"; // relative to the output base
- private final CommandExecutor commandExecutor;
- private final Clock clock;
- private final File portFile;
- private Server server;
- private int port; // mutable so that we can overwrite it if port 0 is passed in
- boolean serving;
-
/**
* Factory class. Instantiated by reflection.
*/
@@ -57,13 +58,81 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
}
}
- private GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
+ private enum StreamType {
+ STDOUT,
+ STDERR,
+ }
+
+ // TODO(lberki): Maybe we should implement line buffering?
+ private class RpcOutputStream extends OutputStream {
+ private final StreamObserver<RunResponse> observer;
+ private final StreamType type;
+
+ private RpcOutputStream(StreamObserver<RunResponse> observer, StreamType type) {
+ this.observer = observer;
+ this.type = type;
+ }
+
+ @Override
+ public synchronized void write(byte[] b, int off, int inlen) {
+ ByteString input = ByteString.copyFrom(b, off, inlen);
+ RunResponse.Builder response = RunResponse
+ .newBuilder()
+ .setCookie(responseCookie);
+
+ switch (type) {
+ case STDOUT: response.setStdout(input); break;
+ case STDERR: response.setStderr(input); break;
+ default: throw new IllegalStateException();
+ }
+
+ observer.onNext(response.build());
+ }
+
+ @Override
+ public void write(int byteAsInt) throws IOException {
+ byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons
+ write(new byte[] {b}, 0, 1);
+ }
+ }
+
+ // These paths are all relative to the output base
+ private static final String PORT_FILE = "server/grpc_port";
+ private static final String REQUEST_COOKIE_FILE = "server/request_cookie";
+ private static final String RESPONSE_COOKIE_FILE = "server/response_cookie";
+
+ private final CommandExecutor commandExecutor;
+ private final Clock clock;
+ private final String outputBase;
+ private final String requestCookie;
+ private final String responseCookie;
+
+ private Server server;
+ private int port; // mutable so that we can overwrite it if port 0 is passed in
+ boolean serving;
+
+ public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
String outputBase) {
this.commandExecutor = commandExecutor;
this.clock = clock;
+ this.outputBase = outputBase;
this.port = port;
- this.portFile = new File(outputBase + "/" + PORT_FILE);
this.serving = false;
+
+ SecureRandom random = new SecureRandom();
+ requestCookie = generateCookie(random, 16);
+ responseCookie = generateCookie(random, 16);
+ }
+
+ private static String generateCookie(SecureRandom random, int byteCount) {
+ byte[] bytes = new byte[byteCount];
+ random.nextBytes(bytes);
+ StringBuilder result = new StringBuilder();
+ for (byte b : bytes) {
+ result.append(Integer.toHexString(((int) b) + 128));
+ }
+
+ return result.toString();
}
public void serve() throws IOException {
@@ -73,14 +142,23 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
.build();
server.start();
+ serving = true;
+
if (port == 0) {
port = getActualServerPort();
}
- PrintWriter portWriter = new PrintWriter(portFile);
- portWriter.print(port);
- portWriter.close();
- serving = true;
+ writeFile(PORT_FILE, Integer.toString(port));
+ writeFile(REQUEST_COOKIE_FILE, requestCookie);
+ writeFile(RESPONSE_COOKIE_FILE, responseCookie);
+
+ }
+
+ private void writeFile(String path, String contents) throws IOException {
+ OutputStreamWriter writer = new OutputStreamWriter(
+ new FileOutputStream(new File(outputBase + "/" + path)), StandardCharsets.UTF_8);
+ writer.write(contents);
+ writer.close();
}
/**
@@ -120,7 +198,6 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
}
public void terminate() {
- Preconditions.checkState(serving);
server.shutdownNow();
// This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same
// amount of code as implementing it ourselves.
@@ -143,14 +220,43 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
}
@Override
- public void run(CommandProtos.Request request, StreamObserver<CommandProtos.Response> observer) {
- Preconditions.checkState(serving);
- commandExecutor.exec(
- ImmutableList.of("version"), OutErr.SYSTEM_OUT_ERR, clock.currentTimeMillis());
- CommandProtos.Response response = CommandProtos.Response.newBuilder()
- .setNumber(request.getNumber() + 1)
+ public void run(
+ RunRequest request, StreamObserver<RunResponse> observer) {
+ if (!request.getCookie().equals(requestCookie)) {
+ observer.onNext(RunResponse.newBuilder()
+ .setExitCode(ExitCode.LOCAL_ENVIRONMENTAL_ERROR.getNumericExitCode())
+ .build());
+ observer.onCompleted();
+ return;
+ }
+
+ OutErr rpcOutErr = OutErr.create(
+ new RpcOutputStream(observer, StreamType.STDOUT),
+ new RpcOutputStream(observer, StreamType.STDERR));
+
+ int exitCode = commandExecutor.exec(
+ request.getArgList(), rpcOutErr, clock.currentTimeMillis());
+
+ RunResponse response = RunResponse.newBuilder()
+ .setCookie(responseCookie)
+ .setFinished(true)
+ .setExitCode(exitCode)
.build();
+
observer.onNext(response);
observer.onCompleted();
}
+
+ @Override
+ public void ping(PingRequest pingRequest, StreamObserver<PingResponse> streamObserver) {
+ Preconditions.checkState(serving);
+
+ CommandProtos.PingResponse.Builder response = CommandProtos.PingResponse.newBuilder();
+ if (pingRequest.getCookie().equals(requestCookie)) {
+ response.setCookie(responseCookie);
+ }
+
+ streamObserver.onNext(response.build());
+ streamObserver.onCompleted();
+ }
}
diff --git a/src/main/protobuf/command_server.proto b/src/main/protobuf/command_server.proto
index d2a9b69b3c..8055610c08 100644
--- a/src/main/protobuf/command_server.proto
+++ b/src/main/protobuf/command_server.proto
@@ -22,15 +22,34 @@ package command_server;
option java_package = "com.google.devtools.build.lib.server";
option java_outer_classname = "CommandProtos";
-message Request {
- int32 number = 1234; // Test field, will be removed
+message RunRequest {
+ // This must be the request cookie from the output base. A rudimentary form of authentication.
+ string cookie = 1;
+ repeated string arg = 2;
}
-message Response {
- int32 number = 1234; // Test field, will be removed
+message RunResponse {
+ // The server always sets this to the response cookie from the output base. A rudimentary form
+ // of authentication.
+ string cookie = 1;
+ bytes stdout = 2;
+ bytes stderr = 3;
+ bool finished = 4; // Whether this is the last message of the stream
+ int32 exit_code = 5; // Only valid for the last message in the stream
+}
+
+message PingRequest {
+ string cookie = 1;
+}
+
+message PingResponse {
+ string cookie = 1;
}
service CommandServer {
- // Dummy method to test if compiling things works
- rpc Run (Request) returns (Response) {}
+ // Run a Bazel command.
+ rpc Run (RunRequest) returns (stream RunResponse) {}
+
+ // Does not do anything. Used for liveness check.
+ rpc Ping (PingRequest) returns (PingResponse) {}
}