diff options
author | 2016-04-21 12:38:29 +0000 | |
---|---|---|
committer | 2016-04-21 14:55:05 +0000 | |
commit | f7633792b14111ff783382e864d28e7657573216 (patch) | |
tree | 357c777636fe94c34c82b94675ef2bfc0fbad47f /src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | |
parent | aa11df2d2156c350d17c1a8d5962915a2414b81e (diff) |
Make the integration tests pass with gRPC client/server comms.
In particular:
- Make a SIGINT to the server interrupt every command
- Parse negative numbers on the command line correctly (std::stoi throws an exception, and I'd rather not start using C++ exceptions)
- Use "bytes" for command line arguments instead of "string" in the client/server proto . This is more principled, although we pretend all arguments are strings all over the place and it works for "blaze run" mostly by accident.
--
MOS_MIGRATED_REVID=120434432
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | 65 |
1 files changed, 64 insertions, 1 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 477f4b4850..a5dbbf0ac4 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -14,6 +14,8 @@ package com.google.devtools.build.lib.server; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; import com.google.devtools.build.lib.runtime.CommandExecutor; import com.google.devtools.build.lib.server.CommandProtos.CancelRequest; @@ -25,6 +27,7 @@ import com.google.devtools.build.lib.server.CommandProtos.RunResponse; import com.google.devtools.build.lib.util.Clock; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.util.ThreadUtils; import com.google.devtools.build.lib.util.io.OutErr; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; @@ -39,10 +42,15 @@ import java.io.OutputStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; +import java.nio.charset.Charset; import java.security.SecureRandom; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.concurrent.GuardedBy; /** * gRPC server class. @@ -51,6 +59,11 @@ import java.util.UUID; * bootstrapping. */ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer { + // UTF-8 won't do because we want to be able to pass arbitrary binary strings. + // Not that the internals of Bazel handle that correctly, but why not make at least this little + // part correct? + private static final Charset CHARSET = Charset.forName("ISO-8859-1"); + private class RunningCommand implements AutoCloseable { private final Thread thread; private final String id; @@ -129,12 +142,14 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma private static final String REQUEST_COOKIE_FILE = "request_cookie"; private static final String RESPONSE_COOKIE_FILE = "response_cookie"; + @GuardedBy("runningCommands") private final Map<String, RunningCommand> runningCommands = new HashMap<>(); private final CommandExecutor commandExecutor; private final Clock clock; private final Path serverDirectory; private final String requestCookie; private final String responseCookie; + private final AtomicLong interruptCounter = new AtomicLong(0); private Server server; private int port; // mutable so that we can overwrite it if port 0 is passed in @@ -165,6 +180,47 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma return result.toString(); } + private void startSlowInterruptWatcher(final ImmutableSet<String> commandIds) { + if (commandIds.isEmpty()) { + return; + } + + Runnable interruptWatcher = new Runnable() { + @Override + public void run() { + try { + boolean ok; + Thread.sleep(10 * 1000); + synchronized (runningCommands) { + ok = Collections.disjoint(commandIds, runningCommands.keySet()); + } + if (!ok) { + // At least one command was not interrupted. Interrupt took too long. + ThreadUtils.warnAboutSlowInterrupt(); + } + } catch (InterruptedException e) { + // Ignore. + } + } + }; + + Thread interruptWatcherThread = + new Thread(interruptWatcher, "interrupt-watcher-" + interruptCounter.incrementAndGet()); + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + } + + @Override + public void interrupt() { + synchronized (runningCommands) { + for (RunningCommand command : runningCommands.values()) { + command.thread.interrupt(); + } + + startSlowInterruptWatcher(ImmutableSet.copyOf(runningCommands.keySet())); + } + } + @Override public void serve() throws IOException { Preconditions.checkState(!serving); @@ -245,6 +301,11 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma return; } + ImmutableList.Builder<String> args = ImmutableList.builder(); + for (ByteString requestArg : request.getArgList()) { + args.add(requestArg.toString(CHARSET)); + } + String commandId; int exitCode; try (RunningCommand command = new RunningCommand()) { @@ -254,7 +315,7 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma new RpcOutputStream(observer, command.id, StreamType.STDERR)); exitCode = commandExecutor.exec( - request.getArgList(), rpcOutErr, + args.build(), rpcOutErr, request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT, request.getClientDescription(), clock.currentTimeMillis()); } catch (InterruptedException e) { @@ -308,6 +369,8 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma if (command != null) { command.thread.interrupt(); } + + startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId())); } streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build()); |