aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-04-21 12:38:29 +0000
committerGravatar Kristina Chodorow <kchodorow@google.com>2016-04-21 14:55:05 +0000
commitf7633792b14111ff783382e864d28e7657573216 (patch)
tree357c777636fe94c34c82b94675ef2bfc0fbad47f /src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
parentaa11df2d2156c350d17c1a8d5962915a2414b81e (diff)
Make the integration tests pass with gRPC client/server comms.
In particular: - Make a SIGINT to the server interrupt every command - Parse negative numbers on the command line correctly (std::stoi throws an exception, and I'd rather not start using C++ exceptions) - Use "bytes" for command line arguments instead of "string" in the client/server proto . This is more principled, although we pretend all arguments are strings all over the place and it works for "blaze run" mostly by accident. -- MOS_MIGRATED_REVID=120434432
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java65
1 files changed, 64 insertions, 1 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 477f4b4850..a5dbbf0ac4 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -14,6 +14,8 @@
package com.google.devtools.build.lib.server;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
import com.google.devtools.build.lib.runtime.CommandExecutor;
import com.google.devtools.build.lib.server.CommandProtos.CancelRequest;
@@ -25,6 +27,7 @@ import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
import com.google.devtools.build.lib.util.Clock;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.ThreadUtils;
import com.google.devtools.build.lib.util.io.OutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
@@ -39,10 +42,15 @@ import java.io.OutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
+import java.nio.charset.Charset;
import java.security.SecureRandom;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.GuardedBy;
/**
* gRPC server class.
@@ -51,6 +59,11 @@ import java.util.UUID;
* bootstrapping.
*/
public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer {
+ // UTF-8 won't do because we want to be able to pass arbitrary binary strings.
+ // Not that the internals of Bazel handle that correctly, but why not make at least this little
+ // part correct?
+ private static final Charset CHARSET = Charset.forName("ISO-8859-1");
+
private class RunningCommand implements AutoCloseable {
private final Thread thread;
private final String id;
@@ -129,12 +142,14 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
private static final String REQUEST_COOKIE_FILE = "request_cookie";
private static final String RESPONSE_COOKIE_FILE = "response_cookie";
+ @GuardedBy("runningCommands")
private final Map<String, RunningCommand> runningCommands = new HashMap<>();
private final CommandExecutor commandExecutor;
private final Clock clock;
private final Path serverDirectory;
private final String requestCookie;
private final String responseCookie;
+ private final AtomicLong interruptCounter = new AtomicLong(0);
private Server server;
private int port; // mutable so that we can overwrite it if port 0 is passed in
@@ -165,6 +180,47 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
return result.toString();
}
+ private void startSlowInterruptWatcher(final ImmutableSet<String> commandIds) {
+ if (commandIds.isEmpty()) {
+ return;
+ }
+
+ Runnable interruptWatcher = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ boolean ok;
+ Thread.sleep(10 * 1000);
+ synchronized (runningCommands) {
+ ok = Collections.disjoint(commandIds, runningCommands.keySet());
+ }
+ if (!ok) {
+ // At least one command was not interrupted. Interrupt took too long.
+ ThreadUtils.warnAboutSlowInterrupt();
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ };
+
+ Thread interruptWatcherThread =
+ new Thread(interruptWatcher, "interrupt-watcher-" + interruptCounter.incrementAndGet());
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ }
+
+ @Override
+ public void interrupt() {
+ synchronized (runningCommands) {
+ for (RunningCommand command : runningCommands.values()) {
+ command.thread.interrupt();
+ }
+
+ startSlowInterruptWatcher(ImmutableSet.copyOf(runningCommands.keySet()));
+ }
+ }
+
@Override
public void serve() throws IOException {
Preconditions.checkState(!serving);
@@ -245,6 +301,11 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
return;
}
+ ImmutableList.Builder<String> args = ImmutableList.builder();
+ for (ByteString requestArg : request.getArgList()) {
+ args.add(requestArg.toString(CHARSET));
+ }
+
String commandId;
int exitCode;
try (RunningCommand command = new RunningCommand()) {
@@ -254,7 +315,7 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
new RpcOutputStream(observer, command.id, StreamType.STDERR));
exitCode = commandExecutor.exec(
- request.getArgList(), rpcOutErr,
+ args.build(), rpcOutErr,
request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT,
request.getClientDescription(), clock.currentTimeMillis());
} catch (InterruptedException e) {
@@ -308,6 +369,8 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
if (command != null) {
command.thread.interrupt();
}
+
+ startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId()));
}
streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build());