aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java17
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java91
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java65
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/RPCServer.java5
4 files changed, 127 insertions, 51 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
index ace94597c9..25231366f6 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
@@ -838,8 +838,19 @@ public final class BlazeRuntime {
* the program.
*/
private static int serverMain(Iterable<BlazeModule> modules, OutErr outErr, String[] args) {
+ InterruptSignalHandler sigintHandler = null;
try {
- RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
+ final RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
+
+ // Register the signal handler.
+ sigintHandler = new InterruptSignalHandler() {
+ @Override
+ protected void onSignal() {
+ LOG.severe("User interrupt");
+ blazeServer.interrupt();
+ }
+ };
+
blazeServer.serve();
return ExitCode.SUCCESS.getNumericExitCode();
} catch (OptionsParsingException e) {
@@ -851,6 +862,10 @@ public final class BlazeRuntime {
} catch (AbruptExitException e) {
outErr.printErr(e.getMessage());
return e.getExitCode().getNumericExitCode();
+ } finally {
+ if (sigintHandler != null) {
+ sigintHandler.uninstall();
+ }
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
index 26473e2323..558b87569a 100644
--- a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
@@ -19,7 +19,6 @@ import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.server.RPCService.UnknownCommandException;
-import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
import com.google.devtools.build.lib.unix.LocalClientSocket;
import com.google.devtools.build.lib.unix.LocalServerSocket;
import com.google.devtools.build.lib.unix.LocalSocketAddress;
@@ -129,59 +128,54 @@ public final class AfUnixServer extends RPCServer {
serverDirectory, workspaceDir);
}
+
+ private final AtomicBoolean inAction = new AtomicBoolean(false);
+ private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
+ private final AtomicLong cmdNum = new AtomicLong();
+ private final Thread mainThread = Thread.currentThread();
+ private final Object interruptLock = new Object();
+
+ @Override
+ public void interrupt() {
+ // Only interrupt during actions - otherwise we may end up setting the interrupt bit
+ // at the end of a build and responding to it at the beginning of the subsequent build.
+ synchronized (interruptLock) {
+ if (allowingInterrupt.get()) {
+ mainThread.interrupt();
+ }
+ }
+
+ if (inAction.get()) {
+ Runnable interruptWatcher =
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long originalCmd = cmdNum.get();
+ Thread.sleep(10 * 1000);
+ if (inAction.get() && cmdNum.get() == originalCmd) {
+ // We're still operating on the same command.
+ // Interrupt took too long.
+ ThreadUtils.warnAboutSlowInterrupt();
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ };
+ Thread interruptWatcherThread =
+ new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ }
+ }
+
/**
* Wait on a socket for business (answer requests). Note that this
* method won't return until the server shuts down.
*/
@Override
public void serve() {
- // Register the signal handler.
- final AtomicBoolean inAction = new AtomicBoolean(false);
- final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
- final AtomicLong cmdNum = new AtomicLong();
- final Thread mainThread = Thread.currentThread();
- final Object interruptLock = new Object();
-
- InterruptSignalHandler sigintHandler =
- new InterruptSignalHandler() {
- @Override
- protected void onSignal() {
- LOG.severe("User interrupt");
-
- // Only interrupt during actions - otherwise we may end up setting the interrupt bit
- // at the end of a build and responding to it at the beginning of the subsequent build.
- synchronized (interruptLock) {
- if (allowingInterrupt.get()) {
- mainThread.interrupt();
- }
- }
-
- if (inAction.get()) {
- Runnable interruptWatcher =
- new Runnable() {
- @Override
- public void run() {
- try {
- long originalCmd = cmdNum.get();
- Thread.sleep(10 * 1000);
- if (inAction.get() && cmdNum.get() == originalCmd) {
- // We're still operating on the same command.
- // Interrupt took too long.
- ThreadUtils.warnAboutSlowInterrupt();
- }
- } catch (InterruptedException e) {
- // Ignore.
- }
- }
- };
- Thread interruptWatcherThread =
- new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
- interruptWatcherThread.setDaemon(true);
- interruptWatcherThread.start();
- }
- }
- };
-
try {
while (!lameDuck) {
try {
@@ -246,7 +240,6 @@ public final class AfUnixServer extends RPCServer {
} finally {
rpcService.shutdown();
LOG.info("Logging finished");
- sigintHandler.uninstall();
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 477f4b4850..a5dbbf0ac4 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -14,6 +14,8 @@
package com.google.devtools.build.lib.server;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
import com.google.devtools.build.lib.runtime.CommandExecutor;
import com.google.devtools.build.lib.server.CommandProtos.CancelRequest;
@@ -25,6 +27,7 @@ import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
import com.google.devtools.build.lib.util.Clock;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.ThreadUtils;
import com.google.devtools.build.lib.util.io.OutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
@@ -39,10 +42,15 @@ import java.io.OutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
+import java.nio.charset.Charset;
import java.security.SecureRandom;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.GuardedBy;
/**
* gRPC server class.
@@ -51,6 +59,11 @@ import java.util.UUID;
* bootstrapping.
*/
public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer {
+ // UTF-8 won't do because we want to be able to pass arbitrary binary strings.
+ // Not that the internals of Bazel handle that correctly, but why not make at least this little
+ // part correct?
+ private static final Charset CHARSET = Charset.forName("ISO-8859-1");
+
private class RunningCommand implements AutoCloseable {
private final Thread thread;
private final String id;
@@ -129,12 +142,14 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
private static final String REQUEST_COOKIE_FILE = "request_cookie";
private static final String RESPONSE_COOKIE_FILE = "response_cookie";
+ @GuardedBy("runningCommands")
private final Map<String, RunningCommand> runningCommands = new HashMap<>();
private final CommandExecutor commandExecutor;
private final Clock clock;
private final Path serverDirectory;
private final String requestCookie;
private final String responseCookie;
+ private final AtomicLong interruptCounter = new AtomicLong(0);
private Server server;
private int port; // mutable so that we can overwrite it if port 0 is passed in
@@ -165,6 +180,47 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
return result.toString();
}
+ private void startSlowInterruptWatcher(final ImmutableSet<String> commandIds) {
+ if (commandIds.isEmpty()) {
+ return;
+ }
+
+ Runnable interruptWatcher = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ boolean ok;
+ Thread.sleep(10 * 1000);
+ synchronized (runningCommands) {
+ ok = Collections.disjoint(commandIds, runningCommands.keySet());
+ }
+ if (!ok) {
+ // At least one command was not interrupted. Interrupt took too long.
+ ThreadUtils.warnAboutSlowInterrupt();
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ };
+
+ Thread interruptWatcherThread =
+ new Thread(interruptWatcher, "interrupt-watcher-" + interruptCounter.incrementAndGet());
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ }
+
+ @Override
+ public void interrupt() {
+ synchronized (runningCommands) {
+ for (RunningCommand command : runningCommands.values()) {
+ command.thread.interrupt();
+ }
+
+ startSlowInterruptWatcher(ImmutableSet.copyOf(runningCommands.keySet()));
+ }
+ }
+
@Override
public void serve() throws IOException {
Preconditions.checkState(!serving);
@@ -245,6 +301,11 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
return;
}
+ ImmutableList.Builder<String> args = ImmutableList.builder();
+ for (ByteString requestArg : request.getArgList()) {
+ args.add(requestArg.toString(CHARSET));
+ }
+
String commandId;
int exitCode;
try (RunningCommand command = new RunningCommand()) {
@@ -254,7 +315,7 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
new RpcOutputStream(observer, command.id, StreamType.STDERR));
exitCode = commandExecutor.exec(
- request.getArgList(), rpcOutErr,
+ args.build(), rpcOutErr,
request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT,
request.getClientDescription(), clock.currentTimeMillis());
} catch (InterruptedException e) {
@@ -308,6 +369,8 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
if (command != null) {
command.thread.interrupt();
}
+
+ startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId()));
}
streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build());
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
index fc6b83ffc2..a9663eaf72 100644
--- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
@@ -90,4 +90,9 @@ public abstract class RPCServer {
* Start serving and block until the a shutdown command is received.
*/
public abstract void serve() throws IOException;
+
+ /**
+ * Called when the server receives a SIGINT.
+ */
+ public abstract void interrupt();
}