aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/test/java
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2018-06-02 14:13:43 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-06-02 14:15:06 -0700
commitff008f445905bf6f4601a368782b620f7899d322 (patch)
tree7fbfe2ef3d3e794680d12ee42f5d4e0016b6b736 /src/test/java
parentaaf11e91a02a2f42d8bf26cce76df941c8afc8e2 (diff)
remote: concurrent blob downloads. Fixes #5215
This change introduces concurrent downloads of action outputs for remote caching/execution. So far, for an action we would download one output after the other which isn't as bad as it sounds as we would typically run dozens or hundreds of actions in parallel. However, for actions with a lot of outputs or graphs that allow limited parallelism we expect this change to positively impact performance. Note, that with this change the AbstractRemoteActionCache will attempt to always download all outputs concurrently. The actual parallelism is controlled by the underlying network transport. The gRPC transport currently enforces no limits on the concurrent calls, which should be fine given that all calls are multiplexed on a single network connection. The HTTP/1.1 transport also enforces no parallelism by default, but I have added the --remote_max_connections=INT flag which allows to specify an upper bound on the number of network connections to be open concurrently. I have introduced this flag as a defensive mechanism for users who's environment might enforce an upper bound on the number of open connections, as with this change its possible for the number of concurrently open connections to dramatically increase (from NumParallelActions to NumParallelActions * SumParallelActionOutputs). A side effect of this change is that it puts the infrastructure for retries and circuit breaking for the HttpBlobStore in place. RELNOTES: None PiperOrigin-RevId: 199005510
Diffstat (limited to 'src/test/java')
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java67
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java28
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java25
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java39
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java23
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java56
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java47
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java21
8 files changed, 230 insertions, 76 deletions
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index db1f0c380e..731610e204 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -65,7 +65,9 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -85,8 +87,7 @@ public class ByteStreamUploaderTest {
private static final String INSTANCE_NAME = "foo";
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
- private final ListeningScheduledExecutorService retryService =
- MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ private static ListeningScheduledExecutorService retryService;
private Server server;
private Channel channel;
@@ -95,6 +96,11 @@ public class ByteStreamUploaderTest {
@Mock private Retrier.Backoff mockBackoff;
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
+
@Before
public final void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
@@ -118,17 +124,20 @@ public class ByteStreamUploaderTest {
withEmptyMetadata.detach(prevContext);
server.shutdownNow();
- retryService.shutdownNow();
server.awaitTermination();
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test(timeout = 10000)
public void singleBlobUploadShouldWork() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -198,9 +207,9 @@ public class ByteStreamUploaderTest {
public void multipleBlobsUploadShouldWork() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
int numUploads = 100;
Map<String, byte[]> blobsByHash = new HashMap<>();
@@ -285,15 +294,15 @@ public class ByteStreamUploaderTest {
withEmptyMetadata.detach(prevContext);
}
- @Test(timeout = 20000)
+ @Test
public void contextShouldBePreservedUponRetries() throws Exception {
Context prevContext = withEmptyMetadata.attach();
// We upload blobs with different context, and retry 3 times for each upload.
// We verify that the correct metadata is passed to the server with every blob.
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(3, 0), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(5, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
List<Chunker> builders = new ArrayList<>(toUpload.size());
@@ -383,9 +392,8 @@ public class ByteStreamUploaderTest {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE * 10];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -445,9 +453,9 @@ public class ByteStreamUploaderTest {
public void errorsShouldBeReported() throws IOException, InterruptedException {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -475,9 +483,9 @@ public class ByteStreamUploaderTest {
public void shutdownShouldCancelOngoingUploads() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
CountDownLatch cancellations = new CountDownLatch(2);
@@ -532,10 +540,12 @@ public class ByteStreamUploaderTest {
@Test(timeout = 10000)
public void failureInRetryExecutorShouldBeHandled() throws Exception {
Context prevContext = withEmptyMetadata.attach();
+ ListeningScheduledExecutorService retryService =
+ MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
RemoteRetrier retrier =
- new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader =
- new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -567,9 +577,9 @@ public class ByteStreamUploaderTest {
public void resourceNameWithoutInstanceName() throws Exception {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
- new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS);
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
ByteStreamUploader uploader =
- new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
+ new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -610,9 +620,10 @@ public class ByteStreamUploaderTest {
new RemoteRetrier(
() -> new FixedBackoff(1, 0),
/* No Status is retriable. */ (e) -> false,
+ retryService,
Retrier.ALLOW_ALL_CALLS);
ByteStreamUploader uploader =
- new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
+ new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
AtomicInteger numCalls = new AtomicInteger();
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
index 522241cd4b..25228a5124 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
@@ -27,6 +28,8 @@ import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.authandtls.GoogleAuthUtils;
@@ -71,8 +74,11 @@ import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
+import java.util.concurrent.Executors;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -95,6 +101,12 @@ public class GrpcRemoteCacheTest {
private Server fakeServer;
private Context withEmptyMetadata;
private Context prevContext;
+ private static ListeningScheduledExecutorService retryService;
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
@Before
public final void setUp() throws Exception {
@@ -129,6 +141,11 @@ public class GrpcRemoteCacheTest {
fakeServer.awaitTermination();
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
private static class CallCredentialsInterceptor implements ClientInterceptor {
private final CallCredentials credentials;
@@ -166,7 +183,10 @@ public class GrpcRemoteCacheTest {
RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
RemoteRetrier retrier =
new RemoteRetrier(
- remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
+ remoteOptions,
+ RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ retryService,
+ Retrier.ALLOW_ALL_CALLS);
return new GrpcRemoteCache(
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(),
@@ -182,7 +202,7 @@ public class GrpcRemoteCacheTest {
GrpcRemoteCache client = newClient();
Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
// Will not call the mock Bytestream interface at all.
- assertThat(client.downloadBlob(emptyDigest)).isEmpty();
+ assertThat(getFromFuture(client.downloadBlob(emptyDigest))).isEmpty();
}
@Test
@@ -199,7 +219,7 @@ public class GrpcRemoteCacheTest {
responseObserver.onCompleted();
}
});
- assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
+ assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
}
@Test
@@ -220,7 +240,7 @@ public class GrpcRemoteCacheTest {
responseObserver.onCompleted();
}
});
- assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
+ assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index c6758c185d..e8b1313fd9 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -26,6 +26,8 @@ import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -94,8 +96,11 @@ import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import java.util.SortedMap;
+import java.util.concurrent.Executors;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -127,6 +132,7 @@ public class GrpcRemoteExecutionClientTest {
private RemoteSpawnRunner client;
private FileOutErr outErr;
private Server fakeServer;
+ private static ListeningScheduledExecutorService retryService;
private final SpawnExecutionContext simplePolicy =
new SpawnExecutionContext() {
@@ -182,6 +188,11 @@ public class GrpcRemoteExecutionClientTest {
}
};
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
+
@Before
public final void setUp() throws Exception {
String fakeServerName = "fake server for " + getClass();
@@ -238,7 +249,8 @@ public class GrpcRemoteExecutionClientTest {
outErr = new FileOutErr(stdout, stderr);
RemoteOptions options = Options.getDefaults(RemoteOptions.class);
RemoteRetrier retrier =
- new RemoteRetrier(options, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
+ new RemoteRetrier(
+ options, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService, Retrier.ALLOW_ALL_CALLS);
Channel channel = InProcessChannelBuilder.forName(fakeServerName).directExecutor().build();
GrpcRemoteExecutor executor =
new GrpcRemoteExecutor(channel, null, options.remoteTimeout, retrier);
@@ -268,6 +280,11 @@ public class GrpcRemoteExecutionClientTest {
fakeServer.awaitTermination();
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test
public void cacheHit() throws Exception {
serviceRegistry.addService(
@@ -909,10 +926,10 @@ public class GrpcRemoteExecutionClientTest {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
- // First read is a cache miss, next read succeeds.
+ // First read is a retriable error, next read succeeds.
if (first) {
first = false;
- responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
+ responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
} else {
responseObserver.onNext(
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("stdout")).build());
@@ -968,7 +985,7 @@ public class GrpcRemoteExecutionClientTest {
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
- assertThat(result.isCacheHit()).isFalse();
+ assertThat(result.isCacheHit()).isTrue();
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java
index 68ce4543cd..d9b08d26ce 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java
@@ -19,6 +19,8 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import com.google.common.collect.Range;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff;
import com.google.devtools.build.lib.remote.Retrier.Backoff;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
@@ -27,9 +29,12 @@ import com.google.devtools.common.options.Options;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -46,12 +51,23 @@ public class RemoteRetrierTest {
}
private RemoteRetrierTest.Foo fooMock;
+ private static ListeningScheduledExecutorService retryService;
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
@Before
public void setUp() {
fooMock = Mockito.mock(RemoteRetrierTest.Foo.class);
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test
public void testExponentialBackoff() throws Exception {
Retrier.Backoff backoff =
@@ -93,7 +109,7 @@ public class RemoteRetrierTest {
options.experimentalRemoteRetry = false;
RemoteRetrier retrier =
- Mockito.spy(new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS));
+ Mockito.spy(new RemoteRetrier(options, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS));
when(fooMock.foo())
.thenReturn("bla")
.thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException());
@@ -106,8 +122,14 @@ public class RemoteRetrierTest {
public void testNonRetriableError() throws Exception {
Supplier<Backoff> s =
() -> new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 2.0, 0.0, 2);
- RemoteRetrier retrier = Mockito.spy(new RemoteRetrier(s, (e) -> false,
- Retrier.ALLOW_ALL_CALLS, Mockito.mock(Sleeper.class)));
+ RemoteRetrier retrier =
+ Mockito.spy(
+ new RemoteRetrier(
+ s,
+ (e) -> false,
+ retryService,
+ Retrier.ALLOW_ALL_CALLS,
+ Mockito.mock(Sleeper.class)));
when(fooMock.foo()).thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException());
assertThrows(retrier, 1);
Mockito.verify(fooMock, Mockito.times(1)).foo();
@@ -118,8 +140,9 @@ public class RemoteRetrierTest {
Supplier<Backoff> s =
() -> new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 2.0, 0.0, 2);
Sleeper sleeper = Mockito.mock(Sleeper.class);
- RemoteRetrier retrier = Mockito.spy(new RemoteRetrier(s, (e) -> true,
- Retrier.ALLOW_ALL_CALLS, sleeper));
+ RemoteRetrier retrier =
+ Mockito.spy(
+ new RemoteRetrier(s, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS, sleeper));
when(fooMock.foo()).thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException());
assertThrows(retrier, 3);
@@ -135,7 +158,8 @@ public class RemoteRetrierTest {
RemoteOptions options = Options.getDefaults(RemoteOptions.class);
options.experimentalRemoteRetry = false;
- RemoteRetrier retrier = new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS);
+ RemoteRetrier retrier =
+ new RemoteRetrier(options, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
try {
retrier.execute(() -> {
throw thrown;
@@ -151,7 +175,8 @@ public class RemoteRetrierTest {
StatusRuntimeException thrown = Status.Code.UNKNOWN.toStatus().asRuntimeException();
RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- RemoteRetrier retrier = new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS);
+ RemoteRetrier retrier =
+ new RemoteRetrier(options, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
AtomicInteger numCalls = new AtomicInteger();
try {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index 3c8293a0f5..b325eebe1d 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander;
@@ -514,6 +515,7 @@ public class RemoteSpawnRunnerTest {
logDir);
Digest logDigest = digestUtil.computeAsUtf8("bla");
+ Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
when(executor.executeRemotely(any(ExecuteRequest.class)))
.thenReturn(
ExecuteResponse.newBuilder()
@@ -522,6 +524,9 @@ public class RemoteSpawnRunnerTest {
LogFile.newBuilder().setHumanReadable(true).setDigest(logDigest).build())
.setResult(ActionResult.newBuilder().setExitCode(31).build())
.build());
+ SettableFuture<Void> completed = SettableFuture.create();
+ completed.set(null);
+ when(cache.downloadFile(eq(logPath), eq(logDigest), eq(null))).thenReturn(completed);
Spawn spawn = newSimpleSpawn();
SpawnExecutionContext policy = new FakeSpawnExecutionContext(spawn);
@@ -530,8 +535,7 @@ public class RemoteSpawnRunnerTest {
assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT);
verify(executor).executeRemotely(any(ExecuteRequest.class));
- Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
- verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(false), eq(null));
+ verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(null));
}
@Test
@@ -551,6 +555,7 @@ public class RemoteSpawnRunnerTest {
logDir);
Digest logDigest = digestUtil.computeAsUtf8("bla");
+ Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
com.google.rpc.Status timeoutStatus =
com.google.rpc.Status.newBuilder().setCode(Code.DEADLINE_EXCEEDED.getNumber()).build();
ExecuteResponse resp =
@@ -562,6 +567,9 @@ public class RemoteSpawnRunnerTest {
when(executor.executeRemotely(any(ExecuteRequest.class)))
.thenThrow(new Retrier.RetryException(
"", 1, new ExecutionStatusException(resp.getStatus(), resp)));
+ SettableFuture<Void> completed = SettableFuture.create();
+ completed.set(null);
+ when(cache.downloadFile(eq(logPath), eq(logDigest), eq(null))).thenReturn(completed);
Spawn spawn = newSimpleSpawn();
SpawnExecutionContext policy = new FakeSpawnExecutionContext(spawn);
@@ -570,8 +578,7 @@ public class RemoteSpawnRunnerTest {
assertThat(res.status()).isEqualTo(Status.TIMEOUT);
verify(executor).executeRemotely(any(ExecuteRequest.class));
- Path logPath = logDir.getRelative(simpleActionId).getRelative("logname");
- verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(false), eq(null));
+ verify(cache).downloadFile(eq(logPath), eq(logDigest), eq(null));
}
@Test
@@ -609,9 +616,7 @@ public class RemoteSpawnRunnerTest {
verify(executor).executeRemotely(any(ExecuteRequest.class));
verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class));
- verify(cache, never())
- .downloadFile(
- any(Path.class), any(Digest.class), any(Boolean.class), any(ByteString.class));
+ verify(cache, never()).downloadFile(any(Path.class), any(Digest.class), any(ByteString.class));
}
@Test
@@ -649,9 +654,7 @@ public class RemoteSpawnRunnerTest {
verify(executor).executeRemotely(any(ExecuteRequest.class));
verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class));
- verify(cache, never())
- .downloadFile(
- any(Path.class), any(Digest.class), any(Boolean.class), any(ByteString.class));
+ verify(cache, never()).downloadFile(any(Path.class), any(Digest.class), any(ByteString.class));
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java
index 945c27d66d..624d074468 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java
@@ -21,16 +21,22 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.Retrier.Backoff;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreakerException;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -49,19 +55,31 @@ public class RetrierTest {
private static final Predicate<Exception> RETRY_ALL = (e) -> true;
private static final Predicate<Exception> RETRY_NONE = (e) -> false;
+ private static ListeningScheduledExecutorService retryService;
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
when(alwaysOpen.state()).thenReturn(State.ACCEPT_CALLS);
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
@Test
public void retryShouldWork_failure() throws Exception {
// Test that a call is retried according to the backoff.
// All calls fail.
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen);
try {
r.execute(() -> {
throw new Exception("call failed");
@@ -81,7 +99,7 @@ public class RetrierTest {
// All calls fail.
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/2);
- Retrier r = new Retrier(s, RETRY_NONE, alwaysOpen);
+ Retrier r = new Retrier(s, RETRY_NONE, retryService, alwaysOpen);
try {
r.execute(() -> {
throw new Exception("call failed");
@@ -101,7 +119,7 @@ public class RetrierTest {
// The last call succeeds.
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen);
AtomicInteger numCalls = new AtomicInteger();
int val = r.execute(() -> {
numCalls.incrementAndGet();
@@ -121,7 +139,7 @@ public class RetrierTest {
// Test that nested calls using retries compose as expected.
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/1);
- Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen);
AtomicInteger attemptsLvl0 = new AtomicInteger();
AtomicInteger attemptsLvl1 = new AtomicInteger();
@@ -152,7 +170,7 @@ public class RetrierTest {
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
try {
r.execute(() -> {
@@ -174,7 +192,7 @@ public class RetrierTest {
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
cb.trialCall();
@@ -192,7 +210,7 @@ public class RetrierTest {
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
cb.trialCall();
@@ -214,7 +232,7 @@ public class RetrierTest {
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
try {
Thread.currentThread().interrupt();
@@ -230,7 +248,7 @@ public class RetrierTest {
Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/3);
TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2);
- Retrier r = new Retrier(s, RETRY_ALL, cb);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, cb);
try {
Thread.currentThread().interrupt();
@@ -242,6 +260,26 @@ public class RetrierTest {
}
}
+ @Test
+ public void asyncRetryShouldWork() throws Exception {
+ // Test that a call is retried according to the backoff.
+ // All calls fail.
+
+ Supplier<Backoff> s = () -> new ZeroBackoff(/*maxRetries=*/ 2);
+ Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen);
+ try {
+ r.executeAsync(
+ () -> {
+ throw new Exception("call failed");
+ })
+ .get();
+ fail("exception expected.");
+ } catch (ExecutionException e) {
+ assertThat(e.getCause()).isInstanceOf(RetryException.class);
+ assertThat(((RetryException) e.getCause()).getAttempts()).isEqualTo(3);
+ }
+ }
+
/**
* Simple circuit breaker that trips after N consecutive failures.
*/
diff --git a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
index 594d56bd44..83d5bc307a 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCacheTest.java
@@ -14,12 +14,16 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.clock.JavaClock;
+import com.google.devtools.build.lib.remote.Retrier.Backoff;
import com.google.devtools.build.lib.remote.blobstore.ConcurrentMapBlobStore;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
@@ -38,8 +42,11 @@ import com.google.devtools.remoteexecution.v1test.Tree;
import io.grpc.Context;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -54,6 +61,14 @@ public class SimpleBlobStoreActionCacheTest {
private FakeActionInputFileCache fakeFileCache;
private Context withEmptyMetadata;
private Context prevContext;
+ private Retrier retrier;
+
+ private static ListeningScheduledExecutorService retryService;
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
@Before
public final void setUp() throws Exception {
@@ -62,7 +77,23 @@ public class SimpleBlobStoreActionCacheTest {
execRoot = fs.getPath("/exec/root");
FileSystemUtils.createDirectoryAndParents(execRoot);
fakeFileCache = new FakeActionInputFileCache(execRoot);
-
+ retrier =
+ new Retrier(
+ () ->
+ new Backoff() {
+ @Override
+ public long nextDelayMillis() {
+ return -1;
+ }
+
+ @Override
+ public int getRetryAttempts() {
+ return 0;
+ }
+ },
+ (e) -> false,
+ retryService,
+ RemoteRetrier.ALLOW_ALL_CALLS);
Path stdout = fs.getPath("/tmp/stdout");
Path stderr = fs.getPath("/tmp/stderr");
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
@@ -78,13 +109,21 @@ public class SimpleBlobStoreActionCacheTest {
withEmptyMetadata.detach(prevContext);
}
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
private SimpleBlobStoreActionCache newClient() {
return newClient(new ConcurrentHashMap<>());
}
private SimpleBlobStoreActionCache newClient(ConcurrentMap<String, byte[]> map) {
return new SimpleBlobStoreActionCache(
- Options.getDefaults(RemoteOptions.class), new ConcurrentMapBlobStore(map), DIGEST_UTIL);
+ Options.getDefaults(RemoteOptions.class),
+ new ConcurrentMapBlobStore(map),
+ retrier,
+ DIGEST_UTIL);
}
@Test
@@ -92,7 +131,7 @@ public class SimpleBlobStoreActionCacheTest {
SimpleBlobStoreActionCache client = newClient();
Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]);
// Will not call the mock Bytestream interface at all.
- assertThat(client.downloadBlob(emptyDigest)).isEmpty();
+ assertThat(getFromFuture(client.downloadBlob(emptyDigest))).isEmpty();
}
@Test
@@ -101,7 +140,7 @@ public class SimpleBlobStoreActionCacheTest {
Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
map.put(digest.getHash(), "abcdefg".getBytes(Charsets.UTF_8));
final SimpleBlobStoreActionCache client = newClient(map);
- assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
+ assertThat(new String(getFromFuture(client.downloadBlob(digest)), UTF_8)).isEqualTo("abcdefg");
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
index 6850f9e17d..c677874b4c 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote.blobstore.http;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.util.Collections.singletonList;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -93,8 +94,8 @@ public class HttpBlobStoreTest {
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, credentials);
- blobStore.get("key", new ByteArrayOutputStream());
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, 0, credentials);
+ getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected");
}
@@ -115,8 +116,8 @@ public class HttpBlobStoreTest {
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, credentials);
- blobStore.get("key", new ByteArrayOutputStream());
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, 0, credentials);
+ getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected");
} finally {
closeServerChannel(server);
@@ -137,9 +138,9 @@ public class HttpBlobStoreTest {
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, credentials);
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
- blobStore.get("key", out);
+ getFromFuture(blobStore.get("key", out));
assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents");
verify(credentials, times(1)).refresh();
verify(credentials, times(2)).getRequestMetadata(any(URI.class));
@@ -166,7 +167,7 @@ public class HttpBlobStoreTest {
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, credentials);
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
ByteArrayInputStream in = new ByteArrayInputStream(data);
blobStore.put("key", data.length, in);
@@ -194,8 +195,8 @@ public class HttpBlobStoreTest {
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, credentials);
- blobStore.get("key", new ByteArrayOutputStream());
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
+ getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected.");
} catch (Exception e) {
assertThat(e).isInstanceOf(HttpException.class);
@@ -221,7 +222,7 @@ public class HttpBlobStoreTest {
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, credentials);
+ new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
blobStore.put("key", 1, new ByteArrayInputStream(new byte[] {0}));
fail("Exception expected.");
} catch (Exception e) {