diff options
author | 2017-06-14 16:11:32 +0200 | |
---|---|---|
committer | 2017-06-14 16:21:11 +0200 | |
commit | 9eea05d068a06ab642dd9d86d46ee5fa2e36b02e (patch) | |
tree | a71f15b29d072870f13bf22d232f1cceb9fdeabf /src/main/java/com/google/devtools | |
parent | 4d302735b0e5c8284946ba35e58f2bad9bca70ca (diff) |
Switching to Watcher API instead of wait_for_completion, in preparation for
deprecating the wait_for_completion field.
Note on errors: in the RemoteWorker, I currently handle all errors as onError of the watch call. Other options are: pass them as the operation error field, and pass some of them as the onError of the execute call. For now, I'm just using the simplest option; the Bazel client is ready to handle all possible options.
RELNOTES: none
PiperOrigin-RevId: 158974207
Diffstat (limited to 'src/main/java/com/google/devtools')
4 files changed, 81 insertions, 17 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 9951598650..cbe44c0383 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -41,6 +41,9 @@ java_library( "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto", "@googleapis//:google_longrunning_operations_java_proto", "@googleapis//:google_rpc_error_details_java_proto", + "@googleapis//:google_rpc_status_java_proto", + "@googleapis//:google_watch_v1_java_grpc", + "@googleapis//:google_watch_v1_java_proto", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index 1950a724c3..815b9237d0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -14,6 +14,8 @@ package com.google.devtools.build.lib.remote; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.remoteexecution.v1test.ExecuteRequest; @@ -23,9 +25,17 @@ import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockin import com.google.longrunning.Operation; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.Durations; +import com.google.rpc.Status; +import com.google.watcher.v1.Change; +import com.google.watcher.v1.ChangeBatch; +import com.google.watcher.v1.Request; +import com.google.watcher.v1.WatcherGrpc; +import com.google.watcher.v1.WatcherGrpc.WatcherBlockingStub; import io.grpc.Channel; import io.grpc.protobuf.StatusProto; +import java.util.Iterator; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; /** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */ @ThreadSafe @@ -34,6 +44,18 @@ public class GrpcRemoteExecutor { private final ChannelOptions channelOptions; private final Channel channel; + // Reuse the gRPC stub. + private final Supplier<ExecutionBlockingStub> execBlockingStub = + Suppliers.memoize( + new Supplier<ExecutionBlockingStub>() { + @Override + public ExecutionBlockingStub get() { + return ExecutionGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + }); + public static boolean isRemoteExecutionOptions(RemoteOptions options) { return options.remoteExecutor != null; } @@ -44,25 +66,66 @@ public class GrpcRemoteExecutor { this.channel = channel; } + private @Nullable ExecuteResponse getOperationResponse(Operation op) { + if (op.getResultCase() == Operation.ResultCase.ERROR) { + throw StatusProto.toStatusRuntimeException(op.getError()); + } + if (op.getDone()) { + Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET); + try { + return op.getResponse().unpack(ExecuteResponse.class); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + return null; + } + public ExecuteResponse executeRemotely(ExecuteRequest request) { - // TODO(olaola): handle longrunning Operations by using the Watcher API to wait for results. - // For now, only support actions with wait_for_completion = true. - Preconditions.checkArgument(request.getWaitForCompletion()); + Operation op = execBlockingStub.get().execute(request); + ExecuteResponse resp = getOperationResponse(op); + if (resp != null) { + return resp; + } int actionSeconds = (int) Durations.toSeconds(request.getAction().getTimeout()); - ExecutionBlockingStub stub = - ExecutionGrpc.newBlockingStub(channel) + WatcherBlockingStub stub = + WatcherGrpc.newBlockingStub(channel) .withCallCredentials(channelOptions.getCallCredentials()) .withDeadlineAfter(options.remoteTimeout + actionSeconds, TimeUnit.SECONDS); - Operation op = stub.execute(request); - Preconditions.checkState(op.getDone()); - Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET); - if (op.getResultCase() == Operation.ResultCase.ERROR) { - throw StatusProto.toStatusRuntimeException(op.getError()); - } - try { - return op.getResponse().unpack(ExecuteResponse.class); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); + Request wr = Request.newBuilder().setTarget(op.getName()).build(); + Iterator<ChangeBatch> replies = stub.watch(wr); + while (replies.hasNext()) { + ChangeBatch cb = replies.next(); + for (Change ch : cb.getChangesList()) { + switch (ch.getState()) { + case INITIAL_STATE_SKIPPED: + continue; + case ERROR: + try { + throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + case DOES_NOT_EXIST: + throw new RuntimeException( + String.format("Operation %s lost on the remote server.", op.getName())); + case EXISTS: + try { + op = ch.getData().unpack(Operation.class); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + resp = getOperationResponse(op); + if (resp != null) { + return resp; + } + continue; + default: + throw new RuntimeException(String.format("Illegal change state: %s", ch.getState())); + } + } } + throw new RuntimeException( + String.format("Watch request for %s terminated with no result.", op.getName())); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index 383667437d..412031bb1b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -115,7 +115,6 @@ final class RemoteSpawnRunner implements SpawnRunner { ExecuteRequest.newBuilder() .setInstanceName(options.remoteInstanceName) .setAction(action) - .setWaitForCompletion(true) .setTotalInputFileCount(inputMap.size()) .setSkipCacheLookup(!options.remoteAcceptCached); result = executor.executeRemotely(request.build()).getResult(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index d0371dfdb2..98af3056de 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -317,7 +317,6 @@ final class RemoteSpawnStrategy implements SpawnActionContext { ExecuteRequest.newBuilder() .setInstanceName(remoteOptions.remoteInstanceName) .setAction(action) - .setWaitForCompletion(true) .setTotalInputFileCount(inputMap.size()) .setSkipCacheLookup(!acceptCachedResult); ExecuteResponse reply = workExecutor.executeRemotely(request.build()); |