aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/tools/remote_worker
diff options
context:
space:
mode:
authorGravatar olaola <olaola@google.com>2017-06-14 16:11:32 +0200
committerGravatar Yun Peng <pcloudy@google.com>2017-06-14 16:21:11 +0200
commit9eea05d068a06ab642dd9d86d46ee5fa2e36b02e (patch)
treea71f15b29d072870f13bf22d232f1cceb9fdeabf /src/tools/remote_worker
parent4d302735b0e5c8284946ba35e58f2bad9bca70ca (diff)
Switching to Watcher API instead of wait_for_completion, in preparation for
deprecating the wait_for_completion field. Note on errors: in the RemoteWorker, I currently handle all errors as onError of the watch call. Other options are: pass them as the operation error field, and pass some of them as the onError of the execute call. For now, I'm just using the simplest option; the Bazel client is ready to handle all possible options. RELNOTES: none PiperOrigin-RevId: 158974207
Diffstat (limited to 'src/tools/remote_worker')
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD2
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java80
2 files changed, 68 insertions, 14 deletions
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
index ddbb5ede52..7aa0f69f9d 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
@@ -39,5 +39,7 @@ java_library(
"@googleapis//:google_rpc_code_java_proto",
"@googleapis//:google_rpc_error_details_java_proto",
"@googleapis//:google_rpc_status_java_proto",
+ "@googleapis//:google_watch_v1_java_grpc",
+ "@googleapis//:google_watch_v1_java_proto",
],
)
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 0eee6695e1..7ea9af9132 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -66,6 +66,10 @@ import com.google.rpc.BadRequest;
import com.google.rpc.BadRequest.FieldViolation;
import com.google.rpc.Code;
import com.google.rpc.Status;
+import com.google.watcher.v1.Change;
+import com.google.watcher.v1.ChangeBatch;
+import com.google.watcher.v1.Request;
+import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
import io.grpc.Server;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyServerBuilder;
@@ -97,11 +101,13 @@ public class RemoteWorker {
private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
private final ContentAddressableStorageImplBase casServer;
private final ByteStreamImplBase bsServer;
+ private final WatcherImplBase watchServer;
private final ExecutionImplBase execServer;
private final ActionCacheImplBase actionCacheServer;
private final SimpleBlobStoreActionCache cache;
private final RemoteWorkerOptions workerOptions;
private final RemoteOptions remoteOptions;
+ private final ConcurrentHashMap<String, ExecuteRequest> operationsCache;
public RemoteWorker(
RemoteWorkerOptions workerOptions,
@@ -114,13 +120,16 @@ public class RemoteWorker {
if (workerOptions.workPath != null) {
Path workPath = getFileSystem().getPath(workerOptions.workPath);
FileSystemUtils.createDirectoryAndParents(workPath);
- execServer = new ExecutionServer(workPath);
+ watchServer = new WatcherServer(workPath);
+ execServer = new ExecutionServer();
} else {
+ watchServer = null;
execServer = null;
}
casServer = new CasServer();
bsServer = new ByteStreamServer();
actionCacheServer = new ActionCacheServer();
+ operationsCache = new ConcurrentHashMap<>();
}
public Server startServer() throws IOException {
@@ -131,6 +140,7 @@ public class RemoteWorker {
.addService(actionCacheServer);
if (execServer != null) {
b.addService(execServer);
+ b.addService(watchServer);
} else {
System.out.println("*** Execution disabled, only serving cache requests.");
}
@@ -402,7 +412,7 @@ public class RemoteWorker {
// How long to wait for the uid command.
private static final Duration uidTimeout = Durations.fromMicros(30);
- class ExecutionServer extends ExecutionImplBase {
+ class WatcherServer extends WatcherImplBase {
private final Path workPath;
//The name of the container image entry in the Platform proto
@@ -413,7 +423,7 @@ public class RemoteWorker {
private static final int LOCAL_EXEC_ERROR = -1;
- public ExecutionServer(Path workPath) {
+ public WatcherServer(Path workPath) {
this.workPath = workPath;
}
@@ -612,8 +622,18 @@ public class RemoteWorker {
}
@Override
- public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
- Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
+ public void watch(Request wr, StreamObserver<ChangeBatch> responseObserver) {
+ final String opName = wr.getTarget();
+ if (!operationsCache.containsKey(opName)) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.NOT_FOUND.getNumber())
+ .setMessage("Operation not found: " + opName)
+ .build()));
+ }
+ ExecuteRequest request = operationsCache.get(opName);
+ Path tempRoot = workPath.getRelative("build-" + opName);
try {
tempRoot.createDirectory();
if (LOG_FINER) {
@@ -626,17 +646,24 @@ public class RemoteWorker {
}
ActionResult result = execute(request.getAction(), tempRoot);
responseObserver.onNext(
- Operation.newBuilder()
- .setDone(true)
- .setResponse(Any.pack(ExecuteResponse.newBuilder().setResult(result).build()))
+ ChangeBatch.newBuilder()
+ .addChanges(
+ Change.newBuilder()
+ .setState(Change.State.EXISTS)
+ .setData(
+ Any.pack(
+ Operation.newBuilder()
+ .setName(opName)
+ .setDone(true)
+ .setResponse(
+ Any.pack(
+ ExecuteResponse.newBuilder()
+ .setResult(result)
+ .build()))
+ .build()))
+ .build())
.build());
responseObserver.onCompleted();
- if (workerOptions.debug) {
- LOG.fine("Work completed.");
- LOG.warning("Preserving work directory " + tempRoot);
- } else {
- FileSystemUtils.deleteTree(tempRoot);
- }
} catch (CacheNotFoundException e) {
LOG.warning("Cache miss on " + e.getMissingDigest());
responseObserver.onError(notFoundError(e.getMissingDigest()));
@@ -657,10 +684,35 @@ public class RemoteWorker {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
+ } finally {
+ operationsCache.remove(opName);
+ if (workerOptions.debug) {
+ LOG.warning("Preserving work directory " + tempRoot);
+ } else {
+ try {
+ FileSystemUtils.deleteTree(tempRoot);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to delete tmp directory %s: %s: ", tempRoot, e));
+ }
+ }
}
}
}
+ class ExecutionServer extends ExecutionImplBase {
+ @Override
+ public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
+ // Defer the actual action execution to the Watcher.watch request.
+ // There are a lot of errors for which we could fail early here, but deferring them all
+ // is simpler.
+ final String opName = UUID.randomUUID().toString();
+ operationsCache.put(opName, request);
+ responseObserver.onNext(Operation.newBuilder().setName(opName).build());
+ responseObserver.onCompleted();
+ }
+ }
+
public static void main(String[] args) throws Exception {
OptionsParser parser =
OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class);