aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools
diff options
context:
space:
mode:
authorGravatar olaola <olaola@google.com>2017-06-14 16:11:32 +0200
committerGravatar Yun Peng <pcloudy@google.com>2017-06-14 16:21:11 +0200
commit9eea05d068a06ab642dd9d86d46ee5fa2e36b02e (patch)
treea71f15b29d072870f13bf22d232f1cceb9fdeabf /src/main/java/com/google/devtools
parent4d302735b0e5c8284946ba35e58f2bad9bca70ca (diff)
Switching to Watcher API instead of wait_for_completion, in preparation for
deprecating the wait_for_completion field. Note on errors: in the RemoteWorker, I currently handle all errors as onError of the watch call. Other options are: pass them as the operation error field, and pass some of them as the onError of the execute call. For now, I'm just using the simplest option; the Bazel client is ready to handle all possible options. RELNOTES: none PiperOrigin-RevId: 158974207
Diffstat (limited to 'src/main/java/com/google/devtools')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/BUILD3
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java93
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java1
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java1
4 files changed, 81 insertions, 17 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 9951598650..cbe44c0383 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -41,6 +41,9 @@ java_library(
"@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
"@googleapis//:google_longrunning_operations_java_proto",
"@googleapis//:google_rpc_error_details_java_proto",
+ "@googleapis//:google_rpc_status_java_proto",
+ "@googleapis//:google_watch_v1_java_grpc",
+ "@googleapis//:google_watch_v1_java_proto",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index 1950a724c3..815b9237d0 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -14,6 +14,8 @@
package com.google.devtools.build.lib.remote;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
@@ -23,9 +25,17 @@ import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockin
import com.google.longrunning.Operation;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.Durations;
+import com.google.rpc.Status;
+import com.google.watcher.v1.Change;
+import com.google.watcher.v1.ChangeBatch;
+import com.google.watcher.v1.Request;
+import com.google.watcher.v1.WatcherGrpc;
+import com.google.watcher.v1.WatcherGrpc.WatcherBlockingStub;
import io.grpc.Channel;
import io.grpc.protobuf.StatusProto;
+import java.util.Iterator;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */
@ThreadSafe
@@ -34,6 +44,18 @@ public class GrpcRemoteExecutor {
private final ChannelOptions channelOptions;
private final Channel channel;
+ // Reuse the gRPC stub.
+ private final Supplier<ExecutionBlockingStub> execBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ExecutionBlockingStub>() {
+ @Override
+ public ExecutionBlockingStub get() {
+ return ExecutionGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
public static boolean isRemoteExecutionOptions(RemoteOptions options) {
return options.remoteExecutor != null;
}
@@ -44,25 +66,66 @@ public class GrpcRemoteExecutor {
this.channel = channel;
}
+ private @Nullable ExecuteResponse getOperationResponse(Operation op) {
+ if (op.getResultCase() == Operation.ResultCase.ERROR) {
+ throw StatusProto.toStatusRuntimeException(op.getError());
+ }
+ if (op.getDone()) {
+ Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET);
+ try {
+ return op.getResponse().unpack(ExecuteResponse.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+
public ExecuteResponse executeRemotely(ExecuteRequest request) {
- // TODO(olaola): handle longrunning Operations by using the Watcher API to wait for results.
- // For now, only support actions with wait_for_completion = true.
- Preconditions.checkArgument(request.getWaitForCompletion());
+ Operation op = execBlockingStub.get().execute(request);
+ ExecuteResponse resp = getOperationResponse(op);
+ if (resp != null) {
+ return resp;
+ }
int actionSeconds = (int) Durations.toSeconds(request.getAction().getTimeout());
- ExecutionBlockingStub stub =
- ExecutionGrpc.newBlockingStub(channel)
+ WatcherBlockingStub stub =
+ WatcherGrpc.newBlockingStub(channel)
.withCallCredentials(channelOptions.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout + actionSeconds, TimeUnit.SECONDS);
- Operation op = stub.execute(request);
- Preconditions.checkState(op.getDone());
- Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET);
- if (op.getResultCase() == Operation.ResultCase.ERROR) {
- throw StatusProto.toStatusRuntimeException(op.getError());
- }
- try {
- return op.getResponse().unpack(ExecuteResponse.class);
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
+ Request wr = Request.newBuilder().setTarget(op.getName()).build();
+ Iterator<ChangeBatch> replies = stub.watch(wr);
+ while (replies.hasNext()) {
+ ChangeBatch cb = replies.next();
+ for (Change ch : cb.getChangesList()) {
+ switch (ch.getState()) {
+ case INITIAL_STATE_SKIPPED:
+ continue;
+ case ERROR:
+ try {
+ throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ case DOES_NOT_EXIST:
+ throw new RuntimeException(
+ String.format("Operation %s lost on the remote server.", op.getName()));
+ case EXISTS:
+ try {
+ op = ch.getData().unpack(Operation.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ resp = getOperationResponse(op);
+ if (resp != null) {
+ return resp;
+ }
+ continue;
+ default:
+ throw new RuntimeException(String.format("Illegal change state: %s", ch.getState()));
+ }
+ }
}
+ throw new RuntimeException(
+ String.format("Watch request for %s terminated with no result.", op.getName()));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 383667437d..412031bb1b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -115,7 +115,6 @@ final class RemoteSpawnRunner implements SpawnRunner {
ExecuteRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setAction(action)
- .setWaitForCompletion(true)
.setTotalInputFileCount(inputMap.size())
.setSkipCacheLookup(!options.remoteAcceptCached);
result = executor.executeRemotely(request.build()).getResult();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
index d0371dfdb2..98af3056de 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
@@ -317,7 +317,6 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
ExecuteRequest.newBuilder()
.setInstanceName(remoteOptions.remoteInstanceName)
.setAction(action)
- .setWaitForCompletion(true)
.setTotalInputFileCount(inputMap.size())
.setSkipCacheLookup(!acceptCachedResult);
ExecuteResponse reply = workExecutor.executeRemotely(request.build());