diff options
author | 2017-07-04 10:12:13 -0400 | |
---|---|---|
committer | 2017-07-05 10:58:16 -0400 | |
commit | aff0c671cf0aaca4da2230abc69179bfd789dede (patch) | |
tree | 93f49b51a4bb55faa56d8e8b1af23b3b68abce38 /src/main/java/com/google/devtools/build/lib | |
parent | b259cc4491a6d43a3fe025d9de401f724373fca8 (diff) |
Stop reusing gRPC stubs, that causes the deadline to be per-build rather than per-call.
PiperOrigin-RevId: 160892006
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java | 71 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java | 36 |
2 files changed, 43 insertions, 64 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java index d077cbcf8f..036fa93f4f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java @@ -22,8 +22,6 @@ import com.google.bytestream.ByteStreamProto.ReadResponse; import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.devtools.build.lib.actions.ActionInput; @@ -79,11 +77,6 @@ public class GrpcRemoteCache implements RemoteActionCache { private final ChannelOptions channelOptions; private final Channel channel; private final Retrier retrier; - // All gRPC stubs are reused. - private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub; - private final Supplier<ByteStreamBlockingStub> bsBlockingStub; - private final Supplier<ByteStreamStub> bsStub; - private final Supplier<ActionCacheBlockingStub> acBlockingStub; @VisibleForTesting public GrpcRemoteCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) { @@ -91,30 +84,30 @@ public class GrpcRemoteCache implements RemoteActionCache { this.channelOptions = channelOptions; this.channel = channel; this.retrier = new Retrier(options); - casBlockingStub = - Suppliers.memoize( - () -> - ContentAddressableStorageGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); - bsBlockingStub = - Suppliers.memoize( - () -> - ByteStreamGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); - bsStub = - Suppliers.memoize( - () -> - ByteStreamGrpc.newStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); - acBlockingStub = - Suppliers.memoize( - () -> - ActionCacheGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); + } + + private ContentAddressableStorageBlockingStub casBlockingStub() { + return ContentAddressableStorageGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + + private ByteStreamBlockingStub bsBlockingStub() { + return ByteStreamGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + + private ByteStreamStub bsStub() { + return ByteStreamGrpc.newStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + + private ActionCacheBlockingStub acBlockingStub() { + return ActionCacheGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); } @Override @@ -134,7 +127,7 @@ public class GrpcRemoteCache implements RemoteActionCache { return ImmutableSet.of(); } FindMissingBlobsResponse response = - retrier.execute(() -> casBlockingStub.get().findMissingBlobs(request.build())); + retrier.execute(() -> casBlockingStub().findMissingBlobs(request.build())); return ImmutableSet.copyOf(response.getMissingBlobDigestsList()); } @@ -169,7 +162,7 @@ public class GrpcRemoteCache implements RemoteActionCache { retrier.execute( () -> { BatchUpdateBlobsResponse response = - casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build()); + casBlockingStub().batchUpdateBlobs(treeBlobRequest.build()); for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) { if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) { throw StatusProto.toStatusRuntimeException(r.getStatus()); @@ -264,8 +257,7 @@ public class GrpcRemoteCache implements RemoteActionCache { } resourceName += "blobs/" + digest.getHash() + "/" + digest.getSizeBytes(); try { - return bsBlockingStub - .get() + return bsBlockingStub() .read(ReadRequest.newBuilder().setResourceName(resourceName).build()); } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.NOT_FOUND) { @@ -283,8 +275,7 @@ public class GrpcRemoteCache implements RemoteActionCache { try { retrier.execute( () -> - acBlockingStub - .get() + acBlockingStub() .updateActionResult( UpdateActionResultRequest.newBuilder() .setInstanceName(options.remoteInstanceName) @@ -398,8 +389,7 @@ public class GrpcRemoteCache implements RemoteActionCache { resourceName, UUID.randomUUID(), digest.getHash(), digest.getSizeBytes())); // The batches execute simultaneously. requestObserver = - bsStub - .get() + bsStub() .write( new StreamObserver<WriteResponse>() { private long bytesLeft = digest.getSizeBytes(); @@ -503,8 +493,7 @@ public class GrpcRemoteCache implements RemoteActionCache { try { return retrier.execute( () -> - acBlockingStub - .get() + acBlockingStub() .getActionResult( GetActionResultRequest.newBuilder() .setInstanceName(options.remoteInstanceName) diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index 51bc0b6fa7..e460c3ea7c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -14,8 +14,6 @@ package com.google.devtools.build.lib.remote; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.devtools.build.lib.actions.UserExecException; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.util.Preconditions; @@ -48,10 +46,6 @@ public class GrpcRemoteExecutor { private final Channel channel; private final Retrier retrier; - // Reuse the gRPC stubs. - private final Supplier<ExecutionBlockingStub> execBlockingStub; - private final Supplier<WatcherBlockingStub> watcherBlockingStub; - public static boolean isRemoteExecutionOptions(RemoteOptions options) { return options.remoteExecutor != null; } @@ -61,21 +55,17 @@ public class GrpcRemoteExecutor { this.channelOptions = channelOptions; this.channel = channel; this.retrier = new Retrier(options); - execBlockingStub = - Suppliers.memoize( - () -> - ExecutionGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); - // Do not set a deadline on this call, because it is hard to estimate in - // advance how much time we should give the server to execute an action - // remotely (scheduling, queing, optional retries, etc.) - // It is the server's responsibility to respect the Action timeout field. - watcherBlockingStub = - Suppliers.memoize( - () -> - WatcherGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials())); + } + + private ExecutionBlockingStub execBlockingStub() { + return ExecutionGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + + private WatcherBlockingStub watcherBlockingStub() { + return WatcherGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()); } private @Nullable ExecuteResponse getOperationResponse(Operation op) @@ -103,7 +93,7 @@ public class GrpcRemoteExecutor { public ExecuteResponse executeRemotely(ExecuteRequest request) throws InterruptedException, IOException, UserExecException { - Operation op = retrier.execute(() -> execBlockingStub.get().execute(request)); + Operation op = retrier.execute(() -> execBlockingStub().execute(request)); ExecuteResponse resp = getOperationResponse(op); if (resp != null) { return resp; @@ -111,7 +101,7 @@ public class GrpcRemoteExecutor { Request wr = Request.newBuilder().setTarget(op.getName()).build(); return retrier.execute( () -> { - Iterator<ChangeBatch> replies = watcherBlockingStub.get().watch(wr); + Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr); while (replies.hasNext()) { ChangeBatch cb = replies.next(); for (Change ch : cb.getChangesList()) { |