diff options
Diffstat (limited to 'src/main/java/com/google')
5 files changed, 291 insertions, 52 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java index df56b97478..2fe7c6acac 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java @@ -63,8 +63,8 @@ import com.google.devtools.build.lib.runtime.commands.ShutdownCommand; import com.google.devtools.build.lib.runtime.commands.TestCommand; import com.google.devtools.build.lib.runtime.commands.VersionCommand; import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy; +import com.google.devtools.build.lib.server.GrpcServer; import com.google.devtools.build.lib.server.RPCServer; -import com.google.devtools.build.lib.server.ServerCommand; import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; import com.google.devtools.build.lib.skyframe.DiffAwareness; import com.google.devtools.build.lib.skyframe.PrecomputedValue; @@ -102,8 +102,6 @@ import com.google.devtools.common.options.TriState; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.io.PrintWriter; -import java.io.StringWriter; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -133,6 +131,16 @@ import javax.annotation.Nullable; * <p>The parts specific to the current command are stored in {@link CommandEnvironment}. */ public final class BlazeRuntime { + private static class BlazeServer { + private final RPCServer afUnixServer; + private final GrpcServer grpcServer; + + private BlazeServer(RPCServer afUnixServer, GrpcServer grpcServer) { + this.afUnixServer = afUnixServer; + this.grpcServer = grpcServer; + } + } + private static final Pattern suppressFromLog = Pattern.compile(".*(auth|pass|cookie).*", Pattern.CASE_INSENSITIVE); @@ -837,7 +845,16 @@ public final class BlazeRuntime { */ private static int serverMain(Iterable<BlazeModule> modules, OutErr outErr, String[] args) { try { - createBlazeRPCServer(modules, Arrays.asList(args)).serve(); + BlazeServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args)); + if (blazeServer.grpcServer != null) { + blazeServer.grpcServer.serve(); + } + + // TODO(lberki): Make this call non-blocking and terminate the two servers at the same time + blazeServer.afUnixServer.serve(); + if (blazeServer.grpcServer != null) { + blazeServer.grpcServer.serve(); + } return ExitCode.SUCCESS.getNumericExitCode(); } catch (OptionsParsingException e) { outErr.printErr(e.getMessage()); @@ -863,7 +880,7 @@ public final class BlazeRuntime { /** * Creates and returns a new Blaze RPCServer. Call {@link RPCServer#serve()} to start the server. */ - private static RPCServer createBlazeRPCServer(Iterable<BlazeModule> modules, List<String> args) + private static BlazeServer createBlazeRPCServer(Iterable<BlazeModule> modules, List<String> args) throws IOException, OptionsParsingException, AbruptExitException { OptionsProvider options = parseOptions(modules, args); BlazeServerStartupOptions startupOptions = options.getOptions(BlazeServerStartupOptions.class); @@ -871,44 +888,26 @@ public final class BlazeRuntime { final BlazeRuntime runtime = newRuntime(modules, options); final BlazeCommandDispatcher dispatcher = new BlazeCommandDispatcher(runtime); - final ServerCommand blazeCommand; - - // Adaptor from RPC mechanism to BlazeCommandDispatcher: - blazeCommand = new ServerCommand() { - private boolean shutdown = false; - - @Override - public int exec(List<String> args, OutErr outErr, long firstContactTime) { - LOG.info(getRequestLogString(args)); - - try { - return dispatcher.exec(args, outErr, firstContactTime); - } catch (BlazeCommandDispatcher.ShutdownBlazeServerException e) { - if (e.getCause() != null) { - StringWriter message = new StringWriter(); - message.write("Shutting down due to exception:\n"); - PrintWriter writer = new PrintWriter(message, true); - e.printStackTrace(writer); - writer.flush(); - LOG.severe(message.toString()); - } - shutdown = true; - runtime.shutdown(); - dispatcher.shutdown(); - return e.getExitStatus(); - } - } - - @Override - public boolean shutdown() { - return shutdown; - } - }; - - RPCServer server = RPCServer.newServerWith(runtime.getClock(), blazeCommand, + CommandExecutor commandExecutor = new CommandExecutor(runtime, dispatcher); + RPCServer afUnixServer = RPCServer.newServerWith(runtime.getClock(), commandExecutor, runtime.getServerDirectory(), runtime.workspace.getWorkspace(), startupOptions.maxIdleSeconds); - return server; + GrpcServer grpcServer = null; + if (startupOptions.grpcPort != -1) { + try { + // We don't want to directly depend on this class so that we don't need gRPC for + // bootstrapping, so we instantiate it using a factory class and reflection + Class<?> factoryClass = Class.forName( + "com.google.devtools.build.lib.server.GrpcServerImpl$Factory"); + GrpcServer.Factory factory = (GrpcServer.Factory) factoryClass.newInstance(); + grpcServer = factory.create(commandExecutor, runtime.getClock(), + startupOptions.grpcPort, startupOptions.outputBase.getPathString()); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR); + } + } + + return new BlazeServer(afUnixServer, grpcServer); } private static Function<String, String> sourceFunctionForMap(final Map<String, String> map) { diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java index c4fc2c3fff..35bc68e076 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeServerStartupOptions.java @@ -257,4 +257,10 @@ public class BlazeServerStartupOptions extends OptionsBase { + "invocation_policy.InvocationPolicy proto. Unlike other options, it is an error to " + "specify --invocation_policy multiple times.") public String invocationPolicy; + + @Option(name = "grpc_port", + defaultValue = "-1", + category = "undocumented", + help = "Port to start up the gRPC command server on. If 0, let the kernel choose.") + public int grpcPort; } diff --git a/src/main/java/com/google/devtools/build/lib/runtime/CommandExecutor.java b/src/main/java/com/google/devtools/build/lib/runtime/CommandExecutor.java new file mode 100644 index 0000000000..659f4b7136 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/runtime/CommandExecutor.java @@ -0,0 +1,68 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.runtime; + +import com.google.devtools.build.lib.server.ServerCommand; +import com.google.devtools.build.lib.util.io.OutErr; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.List; +import java.util.logging.Logger; + +/** + * Executes a Blaze command. + * + * <p>This is the common execution path between the gRPC server and the legacy AF_UNIX server. + */ +public class CommandExecutor implements ServerCommand { + private static final Logger LOG = Logger.getLogger(CommandExecutor.class.getName()); + + private boolean shutdown; + private final BlazeRuntime runtime; + private final BlazeCommandDispatcher dispatcher; + + CommandExecutor(BlazeRuntime runtime, BlazeCommandDispatcher dispatcher) { + this.shutdown = false; + this.runtime = runtime; + this.dispatcher = dispatcher; + } + + @Override + public int exec(List<String> args, OutErr outErr, long firstContactTime) { + LOG.info(BlazeRuntime.getRequestLogString(args)); + + try { + return dispatcher.exec(args, outErr, firstContactTime); + } catch (BlazeCommandDispatcher.ShutdownBlazeServerException e) { + if (e.getCause() != null) { + StringWriter message = new StringWriter(); + message.write("Shutting down due to exception:\n"); + PrintWriter writer = new PrintWriter(message, true); + e.printStackTrace(writer); + writer.flush(); + LOG.severe(message.toString()); + } + shutdown = true; + runtime.shutdown(); + dispatcher.shutdown(); + return e.getExitStatus(); + } + } + + @Override + public boolean shutdown() { + return shutdown; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java index 3674da8a53..d93d044b14 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java @@ -11,21 +11,31 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package com.google.devtools.build.lib.server; -import io.grpc.stub.StreamObserver; +import com.google.devtools.build.lib.runtime.CommandExecutor; +import com.google.devtools.build.lib.util.Clock; + +import java.io.IOException; /** - * Unused class just to make sure that we can compile with gRPC. + * Interface for the gRPC server. + * + * <p>This is necessary so that Bazel kind of works during bootstrapping, at which time the + * gRPC server is not compiled on so that we don't need gRPC for bootstrapping. */ -public class GrpcServer implements CommandServerGrpc.CommandServer { - @Override - public void run(CommandProtos.Request request, StreamObserver<CommandProtos.Response> observer) { - CommandProtos.Response response = CommandProtos.Response.newBuilder() - .setNumber(request.getNumber() + 1) - .build(); - observer.onNext(response); - observer.onCompleted(); +public interface GrpcServer { + + /** + * Factory class. + * + * Present so that we don't need to invoke a constructor with multiple arguments by reflection. + */ + interface Factory { + GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port, + String outputBase); } + + void serve() throws IOException; + void terminate(); } diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java new file mode 100644 index 0000000000..7a03c16eba --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -0,0 +1,156 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.runtime.CommandExecutor; +import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.util.io.OutErr; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.nio.channels.ServerSocketChannel; + +/** + * gRPC server class. + * + * <p>Only this class should depend on gRPC so that we only need to exclude this during + * bootstrapping. + */ +public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer { + private static final String PORT_FILE = "server/grpc_port"; // relative to the output base + private final CommandExecutor commandExecutor; + private final Clock clock; + private final File portFile; + private Server server; + private int port; // mutable so that we can overwrite it if port 0 is passed in + boolean serving; + + /** + * Factory class. Instantiated by reflection. + */ + public static class Factory implements GrpcServer.Factory { + @Override + public GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port, + String outputBase) { + return new GrpcServerImpl(commandExecutor, clock, port, outputBase); + } + } + + private GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, + String outputBase) { + this.commandExecutor = commandExecutor; + this.clock = clock; + this.port = port; + this.portFile = new File(outputBase + "/" + PORT_FILE); + this.serving = false; + } + + public void serve() throws IOException { + Preconditions.checkState(!serving); + server = ServerBuilder.forPort(port) + .addService(CommandServerGrpc.bindService(this)) + .build(); + + server.start(); + if (port == 0) { + port = getActualServerPort(); + } + + PrintWriter portWriter = new PrintWriter(portFile); + portWriter.print(port); + portWriter.close(); + serving = true; + } + + /** + * Gets the server port the kernel bound our server to if port 0 was passed in. + * + * <p>The implementation is awful, but gRPC doesn't provide an official way to do this: + * https://github.com/grpc/grpc-java/issues/72 + */ + private int getActualServerPort() { + try { + ServerSocketChannel channel = + (ServerSocketChannel) getField(server, "transportServer", "channel", "ch"); + InetSocketAddress address = (InetSocketAddress) channel.getLocalAddress(); + return address.getPort(); + } catch (IllegalAccessException | NullPointerException | IOException e) { + throw new IllegalStateException("Cannot read server socket address from gRPC"); + } + } + + private static Object getField(Object instance, String... fieldNames) + throws IllegalAccessException, NullPointerException { + for (String fieldName : fieldNames) { + Field field = null; + for (Class<?> clazz = instance.getClass(); clazz != null; clazz = clazz.getSuperclass()) { + try { + field = clazz.getDeclaredField(fieldName); + break; + } catch (NoSuchFieldException e) { + // Try again with the superclass + } + } + field.setAccessible(true); + instance = field.get(instance); + } + + return instance; + } + + public void terminate() { + Preconditions.checkState(serving); + server.shutdownNow(); + // This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same + // amount of code as implementing it ourselves. + boolean interrupted = false; + try { + while (true) { + try { + server.awaitTermination(); + serving = false; + return; + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public void run(CommandProtos.Request request, StreamObserver<CommandProtos.Response> observer) { + Preconditions.checkState(serving); + commandExecutor.exec( + ImmutableList.of("version"), OutErr.SYSTEM_OUT_ERR, clock.currentTimeMillis()); + CommandProtos.Response response = CommandProtos.Response.newBuilder() + .setNumber(request.getNumber() + 1) + .build(); + observer.onNext(response); + observer.onCompleted(); + } +} |