diff options
Diffstat (limited to 'src/main/java')
4 files changed, 127 insertions, 51 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java index ace94597c9..25231366f6 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java @@ -838,8 +838,19 @@ public final class BlazeRuntime { * the program. */ private static int serverMain(Iterable<BlazeModule> modules, OutErr outErr, String[] args) { + InterruptSignalHandler sigintHandler = null; try { - RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args)); + final RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args)); + + // Register the signal handler. + sigintHandler = new InterruptSignalHandler() { + @Override + protected void onSignal() { + LOG.severe("User interrupt"); + blazeServer.interrupt(); + } + }; + blazeServer.serve(); return ExitCode.SUCCESS.getNumericExitCode(); } catch (OptionsParsingException e) { @@ -851,6 +862,10 @@ public final class BlazeRuntime { } catch (AbruptExitException e) { outErr.printErr(e.getMessage()); return e.getExitCode().getNumericExitCode(); + } finally { + if (sigintHandler != null) { + sigintHandler.uninstall(); + } } } diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java index 26473e2323..558b87569a 100644 --- a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java @@ -19,7 +19,6 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; import com.google.devtools.build.lib.server.RPCService.UnknownCommandException; -import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; import com.google.devtools.build.lib.unix.LocalClientSocket; import com.google.devtools.build.lib.unix.LocalServerSocket; import com.google.devtools.build.lib.unix.LocalSocketAddress; @@ -129,59 +128,54 @@ public final class AfUnixServer extends RPCServer { serverDirectory, workspaceDir); } + + private final AtomicBoolean inAction = new AtomicBoolean(false); + private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); + private final AtomicLong cmdNum = new AtomicLong(); + private final Thread mainThread = Thread.currentThread(); + private final Object interruptLock = new Object(); + + @Override + public void interrupt() { + // Only interrupt during actions - otherwise we may end up setting the interrupt bit + // at the end of a build and responding to it at the beginning of the subsequent build. + synchronized (interruptLock) { + if (allowingInterrupt.get()) { + mainThread.interrupt(); + } + } + + if (inAction.get()) { + Runnable interruptWatcher = + new Runnable() { + @Override + public void run() { + try { + long originalCmd = cmdNum.get(); + Thread.sleep(10 * 1000); + if (inAction.get() && cmdNum.get() == originalCmd) { + // We're still operating on the same command. + // Interrupt took too long. + ThreadUtils.warnAboutSlowInterrupt(); + } + } catch (InterruptedException e) { + // Ignore. + } + } + }; + Thread interruptWatcherThread = + new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + } + } + /** * Wait on a socket for business (answer requests). Note that this * method won't return until the server shuts down. */ @Override public void serve() { - // Register the signal handler. - final AtomicBoolean inAction = new AtomicBoolean(false); - final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); - final AtomicLong cmdNum = new AtomicLong(); - final Thread mainThread = Thread.currentThread(); - final Object interruptLock = new Object(); - - InterruptSignalHandler sigintHandler = - new InterruptSignalHandler() { - @Override - protected void onSignal() { - LOG.severe("User interrupt"); - - // Only interrupt during actions - otherwise we may end up setting the interrupt bit - // at the end of a build and responding to it at the beginning of the subsequent build. - synchronized (interruptLock) { - if (allowingInterrupt.get()) { - mainThread.interrupt(); - } - } - - if (inAction.get()) { - Runnable interruptWatcher = - new Runnable() { - @Override - public void run() { - try { - long originalCmd = cmdNum.get(); - Thread.sleep(10 * 1000); - if (inAction.get() && cmdNum.get() == originalCmd) { - // We're still operating on the same command. - // Interrupt took too long. - ThreadUtils.warnAboutSlowInterrupt(); - } - } catch (InterruptedException e) { - // Ignore. - } - } - }; - Thread interruptWatcherThread = - new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); - interruptWatcherThread.setDaemon(true); - interruptWatcherThread.start(); - } - } - }; - try { while (!lameDuck) { try { @@ -246,7 +240,6 @@ public final class AfUnixServer extends RPCServer { } finally { rpcService.shutdown(); LOG.info("Logging finished"); - sigintHandler.uninstall(); } } diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 477f4b4850..a5dbbf0ac4 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -14,6 +14,8 @@ package com.google.devtools.build.lib.server; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; import com.google.devtools.build.lib.runtime.CommandExecutor; import com.google.devtools.build.lib.server.CommandProtos.CancelRequest; @@ -25,6 +27,7 @@ import com.google.devtools.build.lib.server.CommandProtos.RunResponse; import com.google.devtools.build.lib.util.Clock; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.util.ThreadUtils; import com.google.devtools.build.lib.util.io.OutErr; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; @@ -39,10 +42,15 @@ import java.io.OutputStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; +import java.nio.charset.Charset; import java.security.SecureRandom; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.concurrent.GuardedBy; /** * gRPC server class. @@ -51,6 +59,11 @@ import java.util.UUID; * bootstrapping. */ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer { + // UTF-8 won't do because we want to be able to pass arbitrary binary strings. + // Not that the internals of Bazel handle that correctly, but why not make at least this little + // part correct? + private static final Charset CHARSET = Charset.forName("ISO-8859-1"); + private class RunningCommand implements AutoCloseable { private final Thread thread; private final String id; @@ -129,12 +142,14 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma private static final String REQUEST_COOKIE_FILE = "request_cookie"; private static final String RESPONSE_COOKIE_FILE = "response_cookie"; + @GuardedBy("runningCommands") private final Map<String, RunningCommand> runningCommands = new HashMap<>(); private final CommandExecutor commandExecutor; private final Clock clock; private final Path serverDirectory; private final String requestCookie; private final String responseCookie; + private final AtomicLong interruptCounter = new AtomicLong(0); private Server server; private int port; // mutable so that we can overwrite it if port 0 is passed in @@ -165,6 +180,47 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma return result.toString(); } + private void startSlowInterruptWatcher(final ImmutableSet<String> commandIds) { + if (commandIds.isEmpty()) { + return; + } + + Runnable interruptWatcher = new Runnable() { + @Override + public void run() { + try { + boolean ok; + Thread.sleep(10 * 1000); + synchronized (runningCommands) { + ok = Collections.disjoint(commandIds, runningCommands.keySet()); + } + if (!ok) { + // At least one command was not interrupted. Interrupt took too long. + ThreadUtils.warnAboutSlowInterrupt(); + } + } catch (InterruptedException e) { + // Ignore. + } + } + }; + + Thread interruptWatcherThread = + new Thread(interruptWatcher, "interrupt-watcher-" + interruptCounter.incrementAndGet()); + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + } + + @Override + public void interrupt() { + synchronized (runningCommands) { + for (RunningCommand command : runningCommands.values()) { + command.thread.interrupt(); + } + + startSlowInterruptWatcher(ImmutableSet.copyOf(runningCommands.keySet())); + } + } + @Override public void serve() throws IOException { Preconditions.checkState(!serving); @@ -245,6 +301,11 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma return; } + ImmutableList.Builder<String> args = ImmutableList.builder(); + for (ByteString requestArg : request.getArgList()) { + args.add(requestArg.toString(CHARSET)); + } + String commandId; int exitCode; try (RunningCommand command = new RunningCommand()) { @@ -254,7 +315,7 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma new RpcOutputStream(observer, command.id, StreamType.STDERR)); exitCode = commandExecutor.exec( - request.getArgList(), rpcOutErr, + args.build(), rpcOutErr, request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT, request.getClientDescription(), clock.currentTimeMillis()); } catch (InterruptedException e) { @@ -308,6 +369,8 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma if (command != null) { command.thread.interrupt(); } + + startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId())); } streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build()); diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java index fc6b83ffc2..a9663eaf72 100644 --- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java @@ -90,4 +90,9 @@ public abstract class RPCServer { * Start serving and block until the a shutdown command is received. */ public abstract void serve() throws IOException; + + /** + * Called when the server receives a SIGINT. + */ + public abstract void interrupt(); } |