aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-04-14 08:29:05 +0000
committerGravatar Dmitry Lomov <dslomov@google.com>2016-04-14 11:11:15 +0000
commit8b3b918ebd55911a3102f5ea6da60906fa63c866 (patch)
tree5441d1a77d4f89a3450523979b49f8aa63fe4596 /src
parent386f242788a3d0189e6882466105c57ec1149d20 (diff)
Add the --grpc_port startup option and start a Java server if it's passed in.
Note that the presence of server/grpc_port does not guarantee that the server actually listens to it and we can't guarantee it, either, because it can always be kill -9'd. I haven't decided yet how the transition between AF_UNIX and gRPC will work. For now, I'm happy that we can start up a Java server. The way to get the kernel-chosen port is truly awful, but it is apparently impossible to do so in a different way: https://github.com/grpc/grpc-java/issues/72 -- MOS_MIGRATED_REVID=119828354
Diffstat (limited to 'src')
-rw-r--r--src/main/cpp/blaze.cc5
-rw-r--r--src/main/cpp/blaze_startup_options.cc2
-rw-r--r--src/main/cpp/blaze_startup_options.h5
-rw-r--r--src/main/cpp/blaze_startup_options_common.cc15
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java81
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java6
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/CommandExecutor.java68
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServer.java32
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java156
9 files changed, 308 insertions, 62 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index 88f0507d0f..01261ba448 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -265,6 +265,11 @@ static vector<string> GetArgumentArray() {
result.push_back(blaze::ConvertPath(
blaze_util::JoinPath(real_install_dir, globals->extracted_binaries[0])));
+ if (globals->options.grpc_port != -1) {
+ result.push_back("--grpc_port");
+ result.push_back(ToString(globals->options.grpc_port));
+ }
+
if (!globals->options.batch) {
result.push_back("--max_idle_secs");
result.push_back(ToString(globals->options.max_idle_secs));
diff --git a/src/main/cpp/blaze_startup_options.cc b/src/main/cpp/blaze_startup_options.cc
index 2a16cf5e58..1f7a5409b6 100644
--- a/src/main/cpp/blaze_startup_options.cc
+++ b/src/main/cpp/blaze_startup_options.cc
@@ -56,7 +56,7 @@ BlazeStartupOptions::BlazeStartupOptions(const BlazeStartupOptions &rhs)
watchfs(rhs.watchfs),
allow_configurable_attributes(rhs.allow_configurable_attributes),
option_sources(rhs.option_sources),
- webstatus_port(rhs.webstatus_port),
+ grpc_port(rhs.grpc_port),
invocation_policy(rhs.invocation_policy),
host_javabase(rhs.host_javabase) {}
diff --git a/src/main/cpp/blaze_startup_options.h b/src/main/cpp/blaze_startup_options.h
index 41629b2f9b..d44cbbee4c 100644
--- a/src/main/cpp/blaze_startup_options.h
+++ b/src/main/cpp/blaze_startup_options.h
@@ -210,8 +210,9 @@ class BlazeStartupOptions {
// the --host_javabase option.
string GetHostJavabase();
- // Port for web status server, 0 to disable
- int webstatus_port;
+ // Port for gRPC command server. 0 means let the kernel choose, -1 means no
+ // gRPC command server.
+ int grpc_port;
// Invocation policy proto. May be NULL.
const char* invocation_policy;
diff --git a/src/main/cpp/blaze_startup_options_common.cc b/src/main/cpp/blaze_startup_options_common.cc
index 428e8f7b39..54a649931a 100644
--- a/src/main/cpp/blaze_startup_options_common.cc
+++ b/src/main/cpp/blaze_startup_options_common.cc
@@ -50,7 +50,7 @@ void BlazeStartupOptions::Init() {
// 3 hours (but only 5 seconds if used within a test)
max_idle_secs = testing ? 5 : (3 * 3600);
oom_more_eagerly_threshold = 100;
- webstatus_port = 0;
+ grpc_port = -1;
oom_more_eagerly = false;
watchfs = false;
invocation_policy = NULL;
@@ -81,7 +81,7 @@ void BlazeStartupOptions::Copy(
lhs->batch_cpu_scheduling = rhs.batch_cpu_scheduling;
lhs->io_nice_level = rhs.io_nice_level;
lhs->max_idle_secs = rhs.max_idle_secs;
- lhs->webstatus_port = rhs.webstatus_port;
+ lhs->grpc_port = rhs.grpc_port;
lhs->oom_more_eagerly = rhs.oom_more_eagerly;
lhs->watchfs = rhs.watchfs;
lhs->allow_configurable_attributes = rhs.allow_configurable_attributes;
@@ -233,12 +233,13 @@ blaze_exit_code::ExitCode BlazeStartupOptions::ProcessArg(
watchfs = false;
option_sources["watchfs"] = rcfile;
} else if ((value = GetUnaryOption(
- arg, next_arg, "--use_webstatusserver")) != NULL) {
- if (!blaze_util::safe_strto32(value, &webstatus_port) ||
- webstatus_port < 0 || webstatus_port > 65535) {
+ arg, next_arg, "--grpc_port")) != NULL) {
+ if (!blaze_util::safe_strto32(value, &grpc_port) ||
+ grpc_port < -1 || grpc_port > 65535) {
blaze_util::StringPrintf(error,
- "Invalid argument to --use_webstatusserver: '%s'. "
- "Must be a valid port number or 0 if server disabled.\n", value);
+ "Invalid argument to --grpc_port: '%s'. "
+ "Must be a valid port number or -1 to disable the gRPC server.\n",
+ value);
return blaze_exit_code::BAD_ARGV;
}
option_sources["webstatusserver"] = rcfile;
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
index df56b97478..2fe7c6acac 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
@@ -63,8 +63,8 @@ import com.google.devtools.build.lib.runtime.commands.ShutdownCommand;
import com.google.devtools.build.lib.runtime.commands.TestCommand;
import com.google.devtools.build.lib.runtime.commands.VersionCommand;
import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy;
+import com.google.devtools.build.lib.server.GrpcServer;
import com.google.devtools.build.lib.server.RPCServer;
-import com.google.devtools.build.lib.server.ServerCommand;
import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
import com.google.devtools.build.lib.skyframe.DiffAwareness;
import com.google.devtools.build.lib.skyframe.PrecomputedValue;
@@ -102,8 +102,6 @@ import com.google.devtools.common.options.TriState;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
@@ -133,6 +131,16 @@ import javax.annotation.Nullable;
* <p>The parts specific to the current command are stored in {@link CommandEnvironment}.
*/
public final class BlazeRuntime {
+ private static class BlazeServer {
+ private final RPCServer afUnixServer;
+ private final GrpcServer grpcServer;
+
+ private BlazeServer(RPCServer afUnixServer, GrpcServer grpcServer) {
+ this.afUnixServer = afUnixServer;
+ this.grpcServer = grpcServer;
+ }
+ }
+
private static final Pattern suppressFromLog = Pattern.compile(".*(auth|pass|cookie).*",
Pattern.CASE_INSENSITIVE);
@@ -837,7 +845,16 @@ public final class BlazeRuntime {
*/
private static int serverMain(Iterable<BlazeModule> modules, OutErr outErr, String[] args) {
try {
- createBlazeRPCServer(modules, Arrays.asList(args)).serve();
+ BlazeServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
+ if (blazeServer.grpcServer != null) {
+ blazeServer.grpcServer.serve();
+ }
+
+ // TODO(lberki): Make this call non-blocking and terminate the two servers at the same time
+ blazeServer.afUnixServer.serve();
+ if (blazeServer.grpcServer != null) {
+ blazeServer.grpcServer.serve();
+ }
return ExitCode.SUCCESS.getNumericExitCode();
} catch (OptionsParsingException e) {
outErr.printErr(e.getMessage());
@@ -863,7 +880,7 @@ public final class BlazeRuntime {
/**
* Creates and returns a new Blaze RPCServer. Call {@link RPCServer#serve()} to start the server.
*/
- private static RPCServer createBlazeRPCServer(Iterable<BlazeModule> modules, List<String> args)
+ private static BlazeServer createBlazeRPCServer(Iterable<BlazeModule> modules, List<String> args)
throws IOException, OptionsParsingException, AbruptExitException {
OptionsProvider options = parseOptions(modules, args);
BlazeServerStartupOptions startupOptions = options.getOptions(BlazeServerStartupOptions.class);
@@ -871,44 +888,26 @@ public final class BlazeRuntime {
final BlazeRuntime runtime = newRuntime(modules, options);
final BlazeCommandDispatcher dispatcher = new BlazeCommandDispatcher(runtime);
- final ServerCommand blazeCommand;
-
- // Adaptor from RPC mechanism to BlazeCommandDispatcher:
- blazeCommand = new ServerCommand() {
- private boolean shutdown = false;
-
- @Override
- public int exec(List<String> args, OutErr outErr, long firstContactTime) {
- LOG.info(getRequestLogString(args));
-
- try {
- return dispatcher.exec(args, outErr, firstContactTime);
- } catch (BlazeCommandDispatcher.ShutdownBlazeServerException e) {
- if (e.getCause() != null) {
- StringWriter message = new StringWriter();
- message.write("Shutting down due to exception:\n");
- PrintWriter writer = new PrintWriter(message, true);
- e.printStackTrace(writer);
- writer.flush();
- LOG.severe(message.toString());
- }
- shutdown = true;
- runtime.shutdown();
- dispatcher.shutdown();
- return e.getExitStatus();
- }
- }
-
- @Override
- public boolean shutdown() {
- return shutdown;
- }
- };
-
- RPCServer server = RPCServer.newServerWith(runtime.getClock(), blazeCommand,
+ CommandExecutor commandExecutor = new CommandExecutor(runtime, dispatcher);
+ RPCServer afUnixServer = RPCServer.newServerWith(runtime.getClock(), commandExecutor,
runtime.getServerDirectory(), runtime.workspace.getWorkspace(),
startupOptions.maxIdleSeconds);
- return server;
+ GrpcServer grpcServer = null;
+ if (startupOptions.grpcPort != -1) {
+ try {
+ // We don't want to directly depend on this class so that we don't need gRPC for
+ // bootstrapping, so we instantiate it using a factory class and reflection
+ Class<?> factoryClass = Class.forName(
+ "com.google.devtools.build.lib.server.GrpcServerImpl$Factory");
+ GrpcServer.Factory factory = (GrpcServer.Factory) factoryClass.newInstance();
+ grpcServer = factory.create(commandExecutor, runtime.getClock(),
+ startupOptions.grpcPort, startupOptions.outputBase.getPathString());
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR);
+ }
+ }
+
+ return new BlazeServer(afUnixServer, grpcServer);
}
private static Function<String, String> sourceFunctionForMap(final Map<String, String> map) {
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java
index c4fc2c3fff..35bc68e076 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java
@@ -257,4 +257,10 @@ public class BlazeServerStartupOptions extends OptionsBase {
+ "invocation_policy.InvocationPolicy proto. Unlike other options, it is an error to "
+ "specify --invocation_policy multiple times.")
public String invocationPolicy;
+
+ @Option(name = "grpc_port",
+ defaultValue = "-1",
+ category = "undocumented",
+ help = "Port to start up the gRPC command server on. If 0, let the kernel choose.")
+ public int grpcPort;
}
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/CommandExecutor.java b/src/main/java/com/google/devtools/build/lib/runtime/CommandExecutor.java
new file mode 100644
index 0000000000..659f4b7136
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/runtime/CommandExecutor.java
@@ -0,0 +1,68 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.runtime;
+
+import com.google.devtools.build.lib.server.ServerCommand;
+import com.google.devtools.build.lib.util.io.OutErr;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * Executes a Blaze command.
+ *
+ * <p>This is the common execution path between the gRPC server and the legacy AF_UNIX server.
+ */
+public class CommandExecutor implements ServerCommand {
+ private static final Logger LOG = Logger.getLogger(CommandExecutor.class.getName());
+
+ private boolean shutdown;
+ private final BlazeRuntime runtime;
+ private final BlazeCommandDispatcher dispatcher;
+
+ CommandExecutor(BlazeRuntime runtime, BlazeCommandDispatcher dispatcher) {
+ this.shutdown = false;
+ this.runtime = runtime;
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public int exec(List<String> args, OutErr outErr, long firstContactTime) {
+ LOG.info(BlazeRuntime.getRequestLogString(args));
+
+ try {
+ return dispatcher.exec(args, outErr, firstContactTime);
+ } catch (BlazeCommandDispatcher.ShutdownBlazeServerException e) {
+ if (e.getCause() != null) {
+ StringWriter message = new StringWriter();
+ message.write("Shutting down due to exception:\n");
+ PrintWriter writer = new PrintWriter(message, true);
+ e.printStackTrace(writer);
+ writer.flush();
+ LOG.severe(message.toString());
+ }
+ shutdown = true;
+ runtime.shutdown();
+ dispatcher.shutdown();
+ return e.getExitStatus();
+ }
+ }
+
+ @Override
+ public boolean shutdown() {
+ return shutdown;
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java
index 3674da8a53..d93d044b14 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java
@@ -11,21 +11,31 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-
package com.google.devtools.build.lib.server;
-import io.grpc.stub.StreamObserver;
+import com.google.devtools.build.lib.runtime.CommandExecutor;
+import com.google.devtools.build.lib.util.Clock;
+
+import java.io.IOException;
/**
- * Unused class just to make sure that we can compile with gRPC.
+ * Interface for the gRPC server.
+ *
+ * <p>This is necessary so that Bazel kind of works during bootstrapping, at which time the
+ * gRPC server is not compiled on so that we don't need gRPC for bootstrapping.
*/
-public class GrpcServer implements CommandServerGrpc.CommandServer {
- @Override
- public void run(CommandProtos.Request request, StreamObserver<CommandProtos.Response> observer) {
- CommandProtos.Response response = CommandProtos.Response.newBuilder()
- .setNumber(request.getNumber() + 1)
- .build();
- observer.onNext(response);
- observer.onCompleted();
+public interface GrpcServer {
+
+ /**
+ * Factory class.
+ *
+ * Present so that we don't need to invoke a constructor with multiple arguments by reflection.
+ */
+ interface Factory {
+ GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port,
+ String outputBase);
}
+
+ void serve() throws IOException;
+ void terminate();
}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
new file mode 100644
index 0000000000..7a03c16eba
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -0,0 +1,156 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.runtime.CommandExecutor;
+import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.io.OutErr;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+
+/**
+ * gRPC server class.
+ *
+ * <p>Only this class should depend on gRPC so that we only need to exclude this during
+ * bootstrapping.
+ */
+public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer {
+ private static final String PORT_FILE = "server/grpc_port"; // relative to the output base
+ private final CommandExecutor commandExecutor;
+ private final Clock clock;
+ private final File portFile;
+ private Server server;
+ private int port; // mutable so that we can overwrite it if port 0 is passed in
+ boolean serving;
+
+ /**
+ * Factory class. Instantiated by reflection.
+ */
+ public static class Factory implements GrpcServer.Factory {
+ @Override
+ public GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port,
+ String outputBase) {
+ return new GrpcServerImpl(commandExecutor, clock, port, outputBase);
+ }
+ }
+
+ private GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
+ String outputBase) {
+ this.commandExecutor = commandExecutor;
+ this.clock = clock;
+ this.port = port;
+ this.portFile = new File(outputBase + "/" + PORT_FILE);
+ this.serving = false;
+ }
+
+ public void serve() throws IOException {
+ Preconditions.checkState(!serving);
+ server = ServerBuilder.forPort(port)
+ .addService(CommandServerGrpc.bindService(this))
+ .build();
+
+ server.start();
+ if (port == 0) {
+ port = getActualServerPort();
+ }
+
+ PrintWriter portWriter = new PrintWriter(portFile);
+ portWriter.print(port);
+ portWriter.close();
+ serving = true;
+ }
+
+ /**
+ * Gets the server port the kernel bound our server to if port 0 was passed in.
+ *
+ * <p>The implementation is awful, but gRPC doesn't provide an official way to do this:
+ * https://github.com/grpc/grpc-java/issues/72
+ */
+ private int getActualServerPort() {
+ try {
+ ServerSocketChannel channel =
+ (ServerSocketChannel) getField(server, "transportServer", "channel", "ch");
+ InetSocketAddress address = (InetSocketAddress) channel.getLocalAddress();
+ return address.getPort();
+ } catch (IllegalAccessException | NullPointerException | IOException e) {
+ throw new IllegalStateException("Cannot read server socket address from gRPC");
+ }
+ }
+
+ private static Object getField(Object instance, String... fieldNames)
+ throws IllegalAccessException, NullPointerException {
+ for (String fieldName : fieldNames) {
+ Field field = null;
+ for (Class<?> clazz = instance.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
+ try {
+ field = clazz.getDeclaredField(fieldName);
+ break;
+ } catch (NoSuchFieldException e) {
+ // Try again with the superclass
+ }
+ }
+ field.setAccessible(true);
+ instance = field.get(instance);
+ }
+
+ return instance;
+ }
+
+ public void terminate() {
+ Preconditions.checkState(serving);
+ server.shutdownNow();
+ // This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same
+ // amount of code as implementing it ourselves.
+ boolean interrupted = false;
+ try {
+ while (true) {
+ try {
+ server.awaitTermination();
+ serving = false;
+ return;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public void run(CommandProtos.Request request, StreamObserver<CommandProtos.Response> observer) {
+ Preconditions.checkState(serving);
+ commandExecutor.exec(
+ ImmutableList.of("version"), OutErr.SYSTEM_OUT_ERR, clock.currentTimeMillis());
+ CommandProtos.Response response = CommandProtos.Response.newBuilder()
+ .setNumber(request.getNumber() + 1)
+ .build();
+ observer.onNext(response);
+ observer.onCompleted();
+ }
+}