diff options
author | 2017-06-14 16:11:32 +0200 | |
---|---|---|
committer | 2017-06-14 16:21:11 +0200 | |
commit | 9eea05d068a06ab642dd9d86d46ee5fa2e36b02e (patch) | |
tree | a71f15b29d072870f13bf22d232f1cceb9fdeabf /src/tools/remote_worker | |
parent | 4d302735b0e5c8284946ba35e58f2bad9bca70ca (diff) |
Switching to Watcher API instead of wait_for_completion, in preparation for
deprecating the wait_for_completion field.
Note on errors: in the RemoteWorker, I currently handle all errors as onError of the watch call. Other options are: pass them as the operation error field, and pass some of them as the onError of the execute call. For now, I'm just using the simplest option; the Bazel client is ready to handle all possible options.
RELNOTES: none
PiperOrigin-RevId: 158974207
Diffstat (limited to 'src/tools/remote_worker')
-rw-r--r-- | src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD | 2 | ||||
-rw-r--r-- | src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java | 80 |
2 files changed, 68 insertions, 14 deletions
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD index ddbb5ede52..7aa0f69f9d 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD @@ -39,5 +39,7 @@ java_library( "@googleapis//:google_rpc_code_java_proto", "@googleapis//:google_rpc_error_details_java_proto", "@googleapis//:google_rpc_status_java_proto", + "@googleapis//:google_watch_v1_java_grpc", + "@googleapis//:google_watch_v1_java_proto", ], ) diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java index 0eee6695e1..7ea9af9132 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java @@ -66,6 +66,10 @@ import com.google.rpc.BadRequest; import com.google.rpc.BadRequest.FieldViolation; import com.google.rpc.Code; import com.google.rpc.Status; +import com.google.watcher.v1.Change; +import com.google.watcher.v1.ChangeBatch; +import com.google.watcher.v1.Request; +import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; import io.grpc.Server; import io.grpc.StatusRuntimeException; import io.grpc.netty.NettyServerBuilder; @@ -97,11 +101,13 @@ public class RemoteWorker { private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER); private final ContentAddressableStorageImplBase casServer; private final ByteStreamImplBase bsServer; + private final WatcherImplBase watchServer; private final ExecutionImplBase execServer; private final ActionCacheImplBase actionCacheServer; private final SimpleBlobStoreActionCache cache; private final RemoteWorkerOptions workerOptions; private final RemoteOptions remoteOptions; + private final ConcurrentHashMap<String, ExecuteRequest> operationsCache; public RemoteWorker( RemoteWorkerOptions workerOptions, @@ -114,13 +120,16 @@ public class RemoteWorker { if (workerOptions.workPath != null) { Path workPath = getFileSystem().getPath(workerOptions.workPath); FileSystemUtils.createDirectoryAndParents(workPath); - execServer = new ExecutionServer(workPath); + watchServer = new WatcherServer(workPath); + execServer = new ExecutionServer(); } else { + watchServer = null; execServer = null; } casServer = new CasServer(); bsServer = new ByteStreamServer(); actionCacheServer = new ActionCacheServer(); + operationsCache = new ConcurrentHashMap<>(); } public Server startServer() throws IOException { @@ -131,6 +140,7 @@ public class RemoteWorker { .addService(actionCacheServer); if (execServer != null) { b.addService(execServer); + b.addService(watchServer); } else { System.out.println("*** Execution disabled, only serving cache requests."); } @@ -402,7 +412,7 @@ public class RemoteWorker { // How long to wait for the uid command. private static final Duration uidTimeout = Durations.fromMicros(30); - class ExecutionServer extends ExecutionImplBase { + class WatcherServer extends WatcherImplBase { private final Path workPath; //The name of the container image entry in the Platform proto @@ -413,7 +423,7 @@ public class RemoteWorker { private static final int LOCAL_EXEC_ERROR = -1; - public ExecutionServer(Path workPath) { + public WatcherServer(Path workPath) { this.workPath = workPath; } @@ -612,8 +622,18 @@ public class RemoteWorker { } @Override - public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { - Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString()); + public void watch(Request wr, StreamObserver<ChangeBatch> responseObserver) { + final String opName = wr.getTarget(); + if (!operationsCache.containsKey(opName)) { + responseObserver.onError( + StatusProto.toStatusRuntimeException( + Status.newBuilder() + .setCode(Code.NOT_FOUND.getNumber()) + .setMessage("Operation not found: " + opName) + .build())); + } + ExecuteRequest request = operationsCache.get(opName); + Path tempRoot = workPath.getRelative("build-" + opName); try { tempRoot.createDirectory(); if (LOG_FINER) { @@ -626,17 +646,24 @@ public class RemoteWorker { } ActionResult result = execute(request.getAction(), tempRoot); responseObserver.onNext( - Operation.newBuilder() - .setDone(true) - .setResponse(Any.pack(ExecuteResponse.newBuilder().setResult(result).build())) + ChangeBatch.newBuilder() + .addChanges( + Change.newBuilder() + .setState(Change.State.EXISTS) + .setData( + Any.pack( + Operation.newBuilder() + .setName(opName) + .setDone(true) + .setResponse( + Any.pack( + ExecuteResponse.newBuilder() + .setResult(result) + .build())) + .build())) + .build()) .build()); responseObserver.onCompleted(); - if (workerOptions.debug) { - LOG.fine("Work completed."); - LOG.warning("Preserving work directory " + tempRoot); - } else { - FileSystemUtils.deleteTree(tempRoot); - } } catch (CacheNotFoundException e) { LOG.warning("Cache miss on " + e.getMissingDigest()); responseObserver.onError(notFoundError(e.getMissingDigest())); @@ -657,10 +684,35 @@ public class RemoteWorker { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } + } finally { + operationsCache.remove(opName); + if (workerOptions.debug) { + LOG.warning("Preserving work directory " + tempRoot); + } else { + try { + FileSystemUtils.deleteTree(tempRoot); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to delete tmp directory %s: %s: ", tempRoot, e)); + } + } } } } + class ExecutionServer extends ExecutionImplBase { + @Override + public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { + // Defer the actual action execution to the Watcher.watch request. + // There are a lot of errors for which we could fail early here, but deferring them all + // is simpler. + final String opName = UUID.randomUUID().toString(); + operationsCache.put(opName, request); + responseObserver.onNext(Operation.newBuilder().setName(opName).build()); + responseObserver.onCompleted(); + } + } + public static void main(String[] args) throws Exception { OptionsParser parser = OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class); |