// Copyright 2016 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.server; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.runtime.CommandExecutor; import com.google.devtools.build.lib.util.Clock; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.io.OutErr; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; /** * gRPC server class. * *

Only this class should depend on gRPC so that we only need to exclude this during * bootstrapping. */ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer { private static final String PORT_FILE = "server/grpc_port"; // relative to the output base private final CommandExecutor commandExecutor; private final Clock clock; private final File portFile; private Server server; private int port; // mutable so that we can overwrite it if port 0 is passed in boolean serving; /** * Factory class. Instantiated by reflection. */ public static class Factory implements GrpcServer.Factory { @Override public GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port, String outputBase) { return new GrpcServerImpl(commandExecutor, clock, port, outputBase); } } private GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, String outputBase) { this.commandExecutor = commandExecutor; this.clock = clock; this.port = port; this.portFile = new File(outputBase + "/" + PORT_FILE); this.serving = false; } public void serve() throws IOException { Preconditions.checkState(!serving); server = ServerBuilder.forPort(port) .addService(CommandServerGrpc.bindService(this)) .build(); server.start(); if (port == 0) { port = getActualServerPort(); } PrintWriter portWriter = new PrintWriter(portFile); portWriter.print(port); portWriter.close(); serving = true; } /** * Gets the server port the kernel bound our server to if port 0 was passed in. * *

The implementation is awful, but gRPC doesn't provide an official way to do this: * https://github.com/grpc/grpc-java/issues/72 */ private int getActualServerPort() { try { ServerSocketChannel channel = (ServerSocketChannel) getField(server, "transportServer", "channel", "ch"); InetSocketAddress address = (InetSocketAddress) channel.getLocalAddress(); return address.getPort(); } catch (IllegalAccessException | NullPointerException | IOException e) { throw new IllegalStateException("Cannot read server socket address from gRPC"); } } private static Object getField(Object instance, String... fieldNames) throws IllegalAccessException, NullPointerException { for (String fieldName : fieldNames) { Field field = null; for (Class clazz = instance.getClass(); clazz != null; clazz = clazz.getSuperclass()) { try { field = clazz.getDeclaredField(fieldName); break; } catch (NoSuchFieldException e) { // Try again with the superclass } } field.setAccessible(true); instance = field.get(instance); } return instance; } public void terminate() { Preconditions.checkState(serving); server.shutdownNow(); // This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same // amount of code as implementing it ourselves. boolean interrupted = false; try { while (true) { try { server.awaitTermination(); serving = false; return; } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } @Override public void run(CommandProtos.Request request, StreamObserver observer) { Preconditions.checkState(serving); commandExecutor.exec( ImmutableList.of("version"), OutErr.SYSTEM_OUT_ERR, clock.currentTimeMillis()); CommandProtos.Response response = CommandProtos.Response.newBuilder() .setNumber(request.getNumber() + 1) .build(); observer.onNext(response); observer.onCompleted(); } }