diff options
author | 2017-06-30 05:07:13 +0200 | |
---|---|---|
committer | 2017-06-30 13:01:12 +0200 | |
commit | fdd66ef1b1dbee1663676cdf4b36ddbe139a35bf (patch) | |
tree | f4abdcf4adc85e1ecb56167bba8eade94cbcd376 /src/test/java/com/google/devtools/build | |
parent | 5f00cd2b1cde682f77a47439e9dc631349992f9e (diff) |
Implement retry logic for the gRPC calls in remote execution and caching. The
retry strategy may need tuning.
Other behavior changes: swallowing gRPC CANCELLED errors when the thread is interrupted, as these are expected and just make debugging difficult. Also, distinguishing between the gRPC DEADLINE_EXCEEDED caused by the actual command timing out on the server vs. other causes (the former should not be retriable, while the latter should retry).
TESTED=unit tests, remote worker on Bazel
PiperOrigin-RevId: 160605830
Diffstat (limited to 'src/test/java/com/google/devtools/build')
4 files changed, 282 insertions, 42 deletions
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java index 4de1498207..e418a29af6 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java +++ b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java @@ -21,11 +21,16 @@ import com.google.bytestream.ByteStreamProto.ReadResponse; import com.google.common.collect.ImmutableMap; import com.google.devtools.remoteexecution.v1test.Digest; import com.google.protobuf.ByteString; +import io.grpc.Status; import io.grpc.stub.StreamObserver; +import java.util.HashMap; import java.util.Map; class FakeImmutableCacheByteStreamImpl extends ByteStreamImplBase { private final Map<ReadRequest, ReadResponse> cannedReplies; + private final Map<ReadRequest, Integer> numErrors; + // Start returning the correct response after this number of errors is reached. + private static final int MAX_ERRORS = 3; public FakeImmutableCacheByteStreamImpl(Map<Digest, String> contents) { ImmutableMap.Builder<ReadRequest, ReadResponse> b = ImmutableMap.builder(); @@ -37,6 +42,7 @@ class FakeImmutableCacheByteStreamImpl extends ByteStreamImplBase { ReadResponse.newBuilder().setData(ByteString.copyFromUtf8(e.getValue())).build()); } cannedReplies = b.build(); + numErrors = new HashMap<>(); } public FakeImmutableCacheByteStreamImpl(Digest digest, String contents) { @@ -50,7 +56,13 @@ class FakeImmutableCacheByteStreamImpl extends ByteStreamImplBase { @Override public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { assertThat(cannedReplies.containsKey(request)).isTrue(); - responseObserver.onNext(cannedReplies.get(request)); - responseObserver.onCompleted(); + int errCount = numErrors.getOrDefault(request, 0); + if (errCount < MAX_ERRORS) { + numErrors.put(request, errCount + 1); + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); // Retriable error. + } else { + responseObserver.onNext(cannedReplies.get(request)); + responseObserver.onCompleted(); + } } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java index 52752f478d..cbfecca1d2 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java @@ -28,6 +28,7 @@ import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.testutil.Scratch; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.FileSystem; @@ -36,11 +37,14 @@ import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.common.options.Options; +import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase; import com.google.devtools.remoteexecution.v1test.ActionResult; import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; import com.google.devtools.remoteexecution.v1test.Digest; import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest; import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; +import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; +import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest; import com.google.protobuf.ByteString; import io.grpc.CallOptions; import io.grpc.Channel; @@ -49,6 +53,7 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.MethodDescriptor; import io.grpc.Server; +import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; @@ -210,17 +215,23 @@ public class GrpcActionCacheTest { } @Test - public void testUploadBlobCacheHit() throws Exception { + public void testUploadBlobCacheHitWithRetries() throws Exception { final GrpcActionCache client = newClient(); final Digest digest = Digests.computeDigestUtf8("abcdefg"); serviceRegistry.addService( new ContentAddressableStorageImplBase() { + private int numErrors = 4; + @Override public void findMissingBlobs( FindMissingBlobsRequest request, StreamObserver<FindMissingBlobsResponse> responseObserver) { - responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); - responseObserver.onCompleted(); + if (numErrors-- <= 0) { + responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + } } }); assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest); @@ -337,6 +348,30 @@ public class GrpcActionCacheTest { }; } + private Answer<StreamObserver<WriteRequest>> blobChunkedWriteAnswerError() { + return new Answer<StreamObserver<WriteRequest>>() { + @Override + @SuppressWarnings("unchecked") + public StreamObserver<WriteRequest> answer(final InvocationOnMock invocation) { + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest request) { + ((StreamObserver<WriteResponse>) invocation.getArguments()[0]) + .onError(Status.UNAVAILABLE.asRuntimeException()); + } + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) { + fail("An unexpected client-side error occurred: " + t); + } + }; + } + }; + } + @Test public void testUploadBlobMultipleChunks() throws Exception { final Digest digest = Digests.computeDigestUtf8("abcdef"); @@ -364,7 +399,7 @@ public class GrpcActionCacheTest { } @Test - public void testUploadAllResultsCacheHits() throws Exception { + public void testUploadCacheHits() throws Exception { final GrpcActionCache client = newClient(); final Digest fooDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); @@ -404,51 +439,89 @@ public class GrpcActionCacheTest { } @Test - public void testUploadAllResultsCacheMisses() throws Exception { + public void testUploadCacheMissesWithRetries() throws Exception { final GrpcActionCache client = newClient(); final Digest fooDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); final Digest barDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x"); + final Digest bazDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("baz"), "z"); final Path fooFile = execRoot.getRelative("a/foo"); final Path barFile = execRoot.getRelative("bar"); + final Path bazFile = execRoot.getRelative("baz"); + ActionKey actionKey = Digests.unsafeActionKeyFromDigest(fooDigest); // Could be any key. barFile.setExecutable(true); serviceRegistry.addService( new ContentAddressableStorageImplBase() { + private int numErrors = 4; + @Override public void findMissingBlobs( FindMissingBlobsRequest request, StreamObserver<FindMissingBlobsResponse> responseObserver) { + if (numErrors-- <= 0) { + responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + } + } + }); + ActionResult.Builder rb = ActionResult.newBuilder(); + rb.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); + rb.addOutputFilesBuilder().setPath("bar").setDigest(barDigest).setIsExecutable(true); + rb.addOutputFilesBuilder().setPath("baz").setDigest(bazDigest); + ActionResult result = rb.build(); + serviceRegistry.addService( + new ActionCacheImplBase() { + private int numErrors = 4; + + @Override + public void updateActionResult( + UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) { assertThat(request) .isEqualTo( - FindMissingBlobsRequest.newBuilder() - .addBlobDigests(fooDigest) - .addBlobDigests(barDigest) + UpdateActionResultRequest.newBuilder() + .setActionDigest(fooDigest) + .setActionResult(result) .build()); - // Both are missing. - responseObserver.onNext( - FindMissingBlobsResponse.newBuilder() - .addMissingBlobDigests(fooDigest) - .addMissingBlobDigests(barDigest) - .build()); - responseObserver.onCompleted(); + if (numErrors-- <= 0) { + responseObserver.onNext(result); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + } } }); ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); serviceRegistry.addService(mockByteStreamImpl); when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject())) - .thenAnswer(blobChunkedWriteAnswer("xyz", 3)) - .thenAnswer(blobChunkedWriteAnswer("x", 1)); + .thenAnswer(blobChunkedWriteAnswerError()) // Error out for foo. + .thenAnswer(blobChunkedWriteAnswer("x", 1)) // Upload bar successfully. + .thenAnswer(blobChunkedWriteAnswerError()) // Error out for baz. + .thenAnswer(blobChunkedWriteAnswer("xyz", 3)) // Retry foo successfully. + .thenAnswer(blobChunkedWriteAnswerError()) // Error out for baz again. + .thenAnswer(blobChunkedWriteAnswer("z", 1)); // Retry baz successfully. + + client.upload(actionKey, execRoot, ImmutableList.<Path>of(fooFile, barFile, bazFile), outErr); + } - ActionResult.Builder result = ActionResult.newBuilder(); - client.upload(execRoot, ImmutableList.<Path>of(fooFile, barFile), outErr, result); - ActionResult.Builder expectedResult = ActionResult.newBuilder(); - expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); - expectedResult - .addOutputFilesBuilder() - .setPath("bar") - .setDigest(barDigest) - .setIsExecutable(true); - assertThat(result.build()).isEqualTo(expectedResult.build()); + @Test + public void testGetCachedActionResultWithRetries() throws Exception { + final GrpcActionCache client = newClient(); + ActionKey actionKey = Digests.unsafeActionKeyFromDigest(Digests.computeDigestUtf8("key")); + serviceRegistry.addService( + new ActionCacheImplBase() { + private int numErrors = 4; + + @Override + public void getActionResult( + GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { + responseObserver.onError( + (numErrors-- <= 0 ? Status.NOT_FOUND : Status.UNAVAILABLE).asRuntimeException()); + } + }); + assertThat(client.getCachedActionResult(actionKey)).isNull(); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java index 1183a03488..1a34a663d1 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java @@ -58,17 +58,15 @@ import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; import com.google.longrunning.Operation; import com.google.protobuf.Any; import com.google.protobuf.ByteString; -import com.google.rpc.Code; -import com.google.rpc.Status; import com.google.watcher.v1.Change; import com.google.watcher.v1.ChangeBatch; import com.google.watcher.v1.Request; import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; import io.grpc.Channel; import io.grpc.Server; +import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; import java.io.IOException; @@ -285,6 +283,30 @@ public class GrpcRemoteExecutionClientTest { }; } + private Answer<StreamObserver<WriteRequest>> blobWriteAnswerError() { + return new Answer<StreamObserver<WriteRequest>>() { + @Override + @SuppressWarnings("unchecked") + public StreamObserver<WriteRequest> answer(final InvocationOnMock invocation) { + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest request) { + ((StreamObserver<WriteResponse>) invocation.getArguments()[0]) + .onError(Status.UNAVAILABLE.asRuntimeException()); + } + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) { + fail("An unexpected client-side error occurred: " + t); + } + }; + } + }; + } + @Test public void remotelyExecute() throws Exception { serviceRegistry.addService( @@ -292,9 +314,7 @@ public class GrpcRemoteExecutionClientTest { @Override public void getActionResult( GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { - responseObserver.onError( - StatusProto.toStatusRuntimeException( - Status.newBuilder().setCode(Code.NOT_FOUND.getNumber()).build())); + responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); } }); final ActionResult actionResult = @@ -359,15 +379,16 @@ public class GrpcRemoteExecutionClientTest { } @Test - public void remotelyExecuteWithWatch() throws Exception { + public void remotelyExecuteWithWatchAndRetries() throws Exception { serviceRegistry.addService( new ActionCacheImplBase() { + private int numErrors = 4; + @Override public void getActionResult( GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { responseObserver.onError( - StatusProto.toStatusRuntimeException( - Status.newBuilder().setCode(Code.NOT_FOUND.getNumber()).build())); + (numErrors-- <= 0 ? Status.NOT_FOUND : Status.UNAVAILABLE).asRuntimeException()); } }); final ActionResult actionResult = @@ -378,17 +399,29 @@ public class GrpcRemoteExecutionClientTest { final String opName = "operations/xyz"; serviceRegistry.addService( new ExecutionImplBase() { + private int numErrors = 4; + @Override public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { - responseObserver.onNext(Operation.newBuilder().setName(opName).build()); - responseObserver.onCompleted(); + if (numErrors-- <= 0) { + responseObserver.onNext(Operation.newBuilder().setName(opName).build()); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + } } }); serviceRegistry.addService( new WatcherImplBase() { + private int numErrors = 4; + @Override public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) { assertThat(request.getTarget()).isEqualTo(opName); + if (numErrors-- > 0) { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + return; + } // Some optional initial state. responseObserver.onNext( ChangeBatch.newBuilder() @@ -443,10 +476,17 @@ public class GrpcRemoteExecutionClientTest { final Digest cmdDigest = Digests.computeDigest(command); serviceRegistry.addService( new ContentAddressableStorageImplBase() { + private int numErrors = 4; + @Override public void findMissingBlobs( FindMissingBlobsRequest request, StreamObserver<FindMissingBlobsResponse> responseObserver) { + if (numErrors-- > 0) { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + return; + } + FindMissingBlobsResponse.Builder b = FindMissingBlobsResponse.newBuilder(); final Set<Digest> requested = ImmutableSet.copyOf(request.getBlobDigestsList()); if (requested.contains(cmdDigest)) { @@ -463,8 +503,11 @@ public class GrpcRemoteExecutionClientTest { ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject())) - .thenAnswer(blobWriteAnswer(command.toByteArray())) - .thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8))); + .thenAnswer(blobWriteAnswerError()) // Error on command upload. + .thenAnswer(blobWriteAnswer(command.toByteArray())) // Upload command successfully. + .thenAnswer(blobWriteAnswerError()) // Error on the input file. + .thenAnswer(blobWriteAnswerError()) // Error on the input file again. + .thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8))); // Upload input file successfully. serviceRegistry.addService(mockByteStreamImpl); SpawnResult result = client.exec(simpleSpawn, simplePolicy); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java new file mode 100644 index 0000000000..ac54c26da7 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java @@ -0,0 +1,112 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Range; +import io.grpc.Status; +import java.time.Duration; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Tests for {@link Retrier}. */ +@RunWith(JUnit4.class) +public class RetrierTest { + + interface Foo { + public String foo(); + } + + private Foo fooMock; + + @Before + public void setUp() { + fooMock = Mockito.mock(Foo.class); + } + + @Test + public void testExponentialBackoff() throws Exception { + Retrier.Backoff backoff = + Retrier.Backoff.exponential( + Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0, 6) + .get(); + assertThat(backoff.nextDelayMillis()).isEqualTo(1000); + assertThat(backoff.nextDelayMillis()).isEqualTo(2000); + assertThat(backoff.nextDelayMillis()).isEqualTo(4000); + assertThat(backoff.nextDelayMillis()).isEqualTo(8000); + assertThat(backoff.nextDelayMillis()).isEqualTo(10000); + assertThat(backoff.nextDelayMillis()).isEqualTo(10000); + assertThat(backoff.nextDelayMillis()).isEqualTo(Retrier.Backoff.STOP); + } + + @Test + public void testExponentialBackoffJittered() throws Exception { + Retrier.Backoff backoff = + Retrier.Backoff.exponential( + Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0.1, 6) + .get(); + assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(900L, 1100L)); + assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(1800L, 2200L)); + assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(3600L, 4400L)); + assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(7200L, 8800L)); + assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(9000L, 11000L)); + assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(9000L, 11000L)); + assertThat(backoff.nextDelayMillis()).isEqualTo(Retrier.Backoff.STOP); + } + + void assertThrows(Retrier retrier, int attempts) throws InterruptedException { + try { + retrier.execute(() -> fooMock.foo()); + fail(); + } catch (RetryException e) { + assertThat(e.getAttempts()).isEqualTo(attempts); + } + } + + @Test + public void testNoRetries() throws Exception { + Retrier retrier = Mockito.spy(Retrier.NO_RETRIES); + Mockito.doNothing().when(retrier).sleep(Mockito.anyLong()); + when(fooMock.foo()) + .thenReturn("bla") + .thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException()); + assertThat(retrier.execute(() -> fooMock.foo())).isEqualTo("bla"); + assertThrows(retrier, 1); + Mockito.verify(fooMock, Mockito.times(2)).foo(); + } + + @Test + public void testRepeatedRetriesReset() throws Exception { + Retrier retrier = + Mockito.spy( + new Retrier( + Retrier.Backoff.exponential( + Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0, 2), + Retrier.RETRY_ALL)); + Mockito.doNothing().when(retrier).sleep(Mockito.anyLong()); + when(fooMock.foo()).thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException()); + assertThrows(retrier, 3); + assertThrows(retrier, 3); + Mockito.verify(retrier, Mockito.times(2)).sleep(1000); + Mockito.verify(retrier, Mockito.times(2)).sleep(2000); + Mockito.verify(fooMock, Mockito.times(6)).foo(); + } +} + |