aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/remote
diff options
context:
space:
mode:
authorGravatar olaola <olaola@google.com>2017-07-04 10:12:13 -0400
committerGravatar John Cater <jcater@google.com>2017-07-05 10:58:16 -0400
commitaff0c671cf0aaca4da2230abc69179bfd789dede (patch)
tree93f49b51a4bb55faa56d8e8b1af23b3b68abce38 /src/main/java/com/google/devtools/build/lib/remote
parentb259cc4491a6d43a3fe025d9de401f724373fca8 (diff)
Stop reusing gRPC stubs, that causes the deadline to be per-build rather than per-call.
PiperOrigin-RevId: 160892006
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/remote')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java71
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java36
2 files changed, 43 insertions, 64 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index d077cbcf8f..036fa93f4f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -22,8 +22,6 @@ import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.actions.ActionInput;
@@ -79,11 +77,6 @@ public class GrpcRemoteCache implements RemoteActionCache {
private final ChannelOptions channelOptions;
private final Channel channel;
private final Retrier retrier;
- // All gRPC stubs are reused.
- private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub;
- private final Supplier<ByteStreamBlockingStub> bsBlockingStub;
- private final Supplier<ByteStreamStub> bsStub;
- private final Supplier<ActionCacheBlockingStub> acBlockingStub;
@VisibleForTesting
public GrpcRemoteCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
@@ -91,30 +84,30 @@ public class GrpcRemoteCache implements RemoteActionCache {
this.channelOptions = channelOptions;
this.channel = channel;
this.retrier = new Retrier(options);
- casBlockingStub =
- Suppliers.memoize(
- () ->
- ContentAddressableStorageGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
- bsBlockingStub =
- Suppliers.memoize(
- () ->
- ByteStreamGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
- bsStub =
- Suppliers.memoize(
- () ->
- ByteStreamGrpc.newStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
- acBlockingStub =
- Suppliers.memoize(
- () ->
- ActionCacheGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
+ }
+
+ private ContentAddressableStorageBlockingStub casBlockingStub() {
+ return ContentAddressableStorageGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+
+ private ByteStreamBlockingStub bsBlockingStub() {
+ return ByteStreamGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+
+ private ByteStreamStub bsStub() {
+ return ByteStreamGrpc.newStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+
+ private ActionCacheBlockingStub acBlockingStub() {
+ return ActionCacheGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
@Override
@@ -134,7 +127,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
return ImmutableSet.of();
}
FindMissingBlobsResponse response =
- retrier.execute(() -> casBlockingStub.get().findMissingBlobs(request.build()));
+ retrier.execute(() -> casBlockingStub().findMissingBlobs(request.build()));
return ImmutableSet.copyOf(response.getMissingBlobDigestsList());
}
@@ -169,7 +162,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
retrier.execute(
() -> {
BatchUpdateBlobsResponse response =
- casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build());
+ casBlockingStub().batchUpdateBlobs(treeBlobRequest.build());
for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) {
if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) {
throw StatusProto.toStatusRuntimeException(r.getStatus());
@@ -264,8 +257,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
}
resourceName += "blobs/" + digest.getHash() + "/" + digest.getSizeBytes();
try {
- return bsBlockingStub
- .get()
+ return bsBlockingStub()
.read(ReadRequest.newBuilder().setResourceName(resourceName).build());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
@@ -283,8 +275,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
try {
retrier.execute(
() ->
- acBlockingStub
- .get()
+ acBlockingStub()
.updateActionResult(
UpdateActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
@@ -398,8 +389,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
resourceName, UUID.randomUUID(), digest.getHash(), digest.getSizeBytes()));
// The batches execute simultaneously.
requestObserver =
- bsStub
- .get()
+ bsStub()
.write(
new StreamObserver<WriteResponse>() {
private long bytesLeft = digest.getSizeBytes();
@@ -503,8 +493,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
try {
return retrier.execute(
() ->
- acBlockingStub
- .get()
+ acBlockingStub()
.getActionResult(
GetActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index 51bc0b6fa7..e460c3ea7c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -14,8 +14,6 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.Preconditions;
@@ -48,10 +46,6 @@ public class GrpcRemoteExecutor {
private final Channel channel;
private final Retrier retrier;
- // Reuse the gRPC stubs.
- private final Supplier<ExecutionBlockingStub> execBlockingStub;
- private final Supplier<WatcherBlockingStub> watcherBlockingStub;
-
public static boolean isRemoteExecutionOptions(RemoteOptions options) {
return options.remoteExecutor != null;
}
@@ -61,21 +55,17 @@ public class GrpcRemoteExecutor {
this.channelOptions = channelOptions;
this.channel = channel;
this.retrier = new Retrier(options);
- execBlockingStub =
- Suppliers.memoize(
- () ->
- ExecutionGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
- // Do not set a deadline on this call, because it is hard to estimate in
- // advance how much time we should give the server to execute an action
- // remotely (scheduling, queing, optional retries, etc.)
- // It is the server's responsibility to respect the Action timeout field.
- watcherBlockingStub =
- Suppliers.memoize(
- () ->
- WatcherGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials()));
+ }
+
+ private ExecutionBlockingStub execBlockingStub() {
+ return ExecutionGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+
+ private WatcherBlockingStub watcherBlockingStub() {
+ return WatcherGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials());
}
private @Nullable ExecuteResponse getOperationResponse(Operation op)
@@ -103,7 +93,7 @@ public class GrpcRemoteExecutor {
public ExecuteResponse executeRemotely(ExecuteRequest request)
throws InterruptedException, IOException, UserExecException {
- Operation op = retrier.execute(() -> execBlockingStub.get().execute(request));
+ Operation op = retrier.execute(() -> execBlockingStub().execute(request));
ExecuteResponse resp = getOperationResponse(op);
if (resp != null) {
return resp;
@@ -111,7 +101,7 @@ public class GrpcRemoteExecutor {
Request wr = Request.newBuilder().setTarget(op.getName()).build();
return retrier.execute(
() -> {
- Iterator<ChangeBatch> replies = watcherBlockingStub.get().watch(wr);
+ Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
while (replies.hasNext()) {
ChangeBatch cb = replies.next();
for (Change ch : cb.getChangesList()) {