// Copyright 2016 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.server; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.net.InetAddresses; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import com.google.devtools.build.lib.clock.BlazeClock; import com.google.devtools.build.lib.clock.Clock; import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable; import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher; import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; import com.google.devtools.build.lib.runtime.BlazeCommandResult; import com.google.devtools.build.lib.runtime.BlazeRuntime; import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy; import com.google.devtools.build.lib.server.CommandProtos.CancelRequest; import com.google.devtools.build.lib.server.CommandProtos.CancelResponse; import com.google.devtools.build.lib.server.CommandProtos.PingRequest; import com.google.devtools.build.lib.server.CommandProtos.PingResponse; import com.google.devtools.build.lib.server.CommandProtos.RunRequest; import com.google.devtools.build.lib.server.CommandProtos.RunResponse; import com.google.devtools.build.lib.server.CommandProtos.StartupOption; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.Pair; import com.google.devtools.build.lib.util.ThreadUtils; import com.google.devtools.build.lib.util.io.OutErr; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.common.options.InvocationPolicyParser; import com.google.devtools.common.options.OptionsParsingException; import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.StatusRuntimeException; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; import java.io.StringWriter; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; /** * gRPC server class. * *
Only this class should depend on gRPC so that we only need to exclude this during * bootstrapping. * *
This class is a little complicated and rich in multithreading, so an explanation of its * innards follows. * *
We use the direct executor for gRPC so that it calls our methods directly on its event handler * threads (which it creates itself). This is acceptable for {@code ping()} and {@code cancel()} * because they run very quickly. For {@code run()}, we transfer the call to our own threads in * {@code commandExecutorPool}. We do this instead of setting an executor on the server object * because gRPC insists on serializing calls within a single RPC call, which means that the Runnable * passed to {@code setOnReadyHandler} doesn't get called while the main RPC method is running, * which means we can't use flow control, which we need so that gRPC doesn't buffer an unbounded * amount of outgoing data. * *
Two threads are spawned for each command: one that handles the command in {@code * commandExecutorPool} and one that streams the result back to the client in {@code * streamExecutorPool}. * *
In addition to these threads, we maintain one extra thread for handling the server timeout and * an interrupt watcher thread is started for each interrupt request that logs if it takes too long * to take effect. * *
Each running RPC has a UUID associated with it that is used to identify it when a client wants * to cancel it. Cancellation is done by the client sending the server a {@code cancel()} RPC call * which results in the main thread of the command being interrupted. */ public class GrpcServerImpl implements RPCServer { private static final Logger logger = Logger.getLogger(GrpcServerImpl.class.getName()); private static final long NANOSECONDS_IN_MS = TimeUnit.MILLISECONDS.toNanos(1); private class RunningCommand implements AutoCloseable { private final Thread thread; private final String id; private RunningCommand() { thread = Thread.currentThread(); id = UUID.randomUUID().toString(); synchronized (runningCommands) { if (runningCommands.isEmpty()) { busy(); } runningCommands.put(id, this); runningCommands.notify(); } logger.info(String.format("Starting command %s on thread %s", id, thread.getName())); } @Override public void close() { synchronized (runningCommands) { runningCommands.remove(id); if (runningCommands.isEmpty()) { idle(); } runningCommands.notify(); } logger.info(String.format("Finished command %s on thread %s", id, thread.getName())); } } /** * Factory class. Instantiated by reflection. * *
Used so that method calls using reflection are as simple as possible. */ public static class Factory implements RPCServer.Factory { @Override public RPCServer create(BlazeCommandDispatcher dispatcher, Clock clock, int port, Path workspace, Path serverDirectory, int maxIdleSeconds) throws IOException { return new GrpcServerImpl( dispatcher, clock, port, workspace, serverDirectory, maxIdleSeconds); } } @VisibleForTesting enum StreamType { STDOUT, STDERR, } /** Actions {@link GrpcSink} can do. */ private enum SinkThreadAction { DISCONNECT, FINISH, READY, SEND, } /** * Sent back and forth between threads wanting to write to the client stream and the stream * handler thread. */ @Immutable private static final class SinkThreadItem { private final boolean success; private final RunResponse message; private SinkThreadItem(boolean success, RunResponse message) { this.success = success; this.message = message; } } /** * A class that handles communicating through a gRPC interface for a streaming rpc call. * *
It can do four things: *
Note that wraping this class with a {@code Channel} can cause a deadlock if there is an
* {@link OutputStream} in between that synchronizes both on {@code #close()} and {@code #write()}
* because then if an interrupt happens in {@link GrpcSink#exchange(SinkThreadItem, boolean)},
* the thread on which {@code interrupt()} was called will wait until the {@code Channel} closes
* itself while holding a lock for interrupting the thread on which {@code #exchange()} is
* being executed and that thread will hold a lock that is needed for the {@code Channel} to be
* closed and call {@code interrupt()} in {@code #exchange()}, which will in turn try to acquire
* the interrupt lock.
*/
@VisibleForTesting
static class RpcOutputStream extends OutputStream {
private static final int CHUNK_SIZE = 8192;
// Store commandId and responseCookie as ByteStrings to avoid String -> UTF8 bytes conversion
// for each serialized chunk of output.
private final ByteString commandIdBytes;
private final ByteString responseCookieBytes;
private final StreamType type;
private final GrpcSink sink;
RpcOutputStream(String commandId, String responseCookie, StreamType type, GrpcSink sink) {
this.commandIdBytes = ByteString.copyFromUtf8(commandId);
this.responseCookieBytes = ByteString.copyFromUtf8(responseCookie);
this.type = type;
this.sink = sink;
}
@Override
public synchronized void write(byte[] b, int off, int inlen) throws IOException {
for (int i = 0; i < inlen; i += CHUNK_SIZE) {
ByteString input = ByteString.copyFrom(b, off + i, Math.min(CHUNK_SIZE, inlen - i));
RunResponse.Builder response = RunResponse
.newBuilder()
.setCookieBytes(responseCookieBytes)
.setCommandIdBytes(commandIdBytes);
switch (type) {
case STDOUT: response.setStandardOutput(input); break;
case STDERR: response.setStandardError(input); break;
default: throw new IllegalStateException();
}
// Send the chunk to the streamer thread. May block.
if (!sink.offer(response.build())) {
// Client disconnected. Terminate the current command as soon as possible. Note that
// throwing IOException is not enough because we are in the habit of swallowing it. Note
// that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler)
// we interrupt the command thread, which should be enough to make the server come around
// as soon as possible.
logger.info(
String.format(
"Client disconnected received for command %s on thread %s",
commandIdBytes.toStringUtf8(), Thread.currentThread().getName()));
throw new IOException("Client disconnected");
}
}
}
@Override
public void write(int byteAsInt) throws IOException {
byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons
write(new byte[] {b}, 0, 1);
}
}
/**
* A thread that watches if the PID file changes and shuts down the server immediately if so.
*/
private class PidFileWatcherThread extends Thread {
private boolean shuttingDown = false;
private PidFileWatcherThread() {
super("pid-file-watcher");
setDaemon(true);
}
// The synchronized block is here so that if the "PID file deleted" timer kicks in during a
// regular shutdown, they don't race.
private synchronized void signalShutdown() {
shuttingDown = true;
}
@Override
public void run() {
while (true) {
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
boolean ok = false;
try {
String pidFileContents = new String(FileSystemUtils.readContentAsLatin1(pidFile));
ok = pidFileContents.equals(pidInFile);
} catch (IOException e) {
logger.info("Cannot read PID file: " + e.getMessage());
// Handled by virtue of ok not being set to true
}
if (!ok) {
synchronized (PidFileWatcherThread.this) {
if (shuttingDown) {
logger.warning("PID file deleted or overwritten but shutdown is already in progress");
break;
}
shuttingDown = true;
// Someone overwrote the PID file. Maybe it's another server, so shut down as quickly
// as possible without even running the shutdown hooks (that would delete it)
logger.severe("PID file deleted or overwritten, exiting as quickly as possible");
Runtime.getRuntime().halt(ExitCode.BLAZE_INTERNAL_ERROR.getNumericExitCode());
}
}
}
}
}
// These paths are all relative to the server directory
private static final String PORT_FILE = "command_port";
private static final String REQUEST_COOKIE_FILE = "request_cookie";
private static final String RESPONSE_COOKIE_FILE = "response_cookie";
private static final AtomicBoolean runShutdownHooks = new AtomicBoolean(true);
@GuardedBy("runningCommands")
private final Map In this case, no files should be deleted on shutdown hooks, since clean also deletes the
* lock file, and there is a small possibility of the following sequence of events:
*
*
*
*
* It also disables the "die when the PID file changes" handler so that it doesn't kill the server
* while the "clean --expunge" commmand is running.
*/
@Override
public void prepareForAbruptShutdown() {
disableShutdownHooks();
pidFileWatcherThread.signalShutdown();
}
@Override
public void interrupt() {
synchronized (runningCommands) {
for (RunningCommand command : runningCommands.values()) {
command.thread.interrupt();
}
startSlowInterruptWatcher(ImmutableSet.copyOf(runningCommands.keySet()));
}
}
@Override
public void serve() throws IOException {
Preconditions.checkState(!serving);
// For reasons only Apple knows, you cannot bind to IPv4-localhost when you run in a sandbox
// that only allows loopback traffic, but binding to IPv6-localhost works fine. This would
// however break on systems that don't support IPv6. So what we'll do is to try to bind to IPv6
// and if that fails, try again with IPv4.
InetSocketAddress address = new InetSocketAddress("[::1]", port);
try {
server =
NettyServerBuilder.forAddress(address)
.addService(commandServer)
.directExecutor()
.build()
.start();
} catch (IOException e) {
address = new InetSocketAddress("127.0.0.1", port);
server =
NettyServerBuilder.forAddress(address)
.addService(commandServer)
.directExecutor()
.build()
.start();
}
if (maxIdleSeconds > 0) {
Thread timeoutThread = new Thread(this::timeoutThread);
timeoutThread.setName("grpc-timeout");
timeoutThread.setDaemon(true);
timeoutThread.start();
}
serving = true;
writeServerFile(
PORT_FILE, InetAddresses.toUriString(address.getAddress()) + ":" + server.getPort());
writeServerFile(REQUEST_COOKIE_FILE, requestCookie);
writeServerFile(RESPONSE_COOKIE_FILE, responseCookie);
try {
server.awaitTermination();
} catch (InterruptedException e) {
// TODO(lberki): Handle SIGINT in a reasonable way
throw new IllegalStateException(e);
}
}
private void writeServerFile(String name, String contents) throws IOException {
Path file = serverDirectory.getChild(name);
FileSystemUtils.writeContentAsLatin1(file, contents);
deleteAtExit(file);
}
protected void disableShutdownHooks() {
runShutdownHooks.set(false);
}
private void shutdownHook() {
if (!runShutdownHooks.get()) {
return;
}
List