From ff008f445905bf6f4601a368782b620f7899d322 Mon Sep 17 00:00:00 2001 From: buchgr Date: Sat, 2 Jun 2018 14:13:43 -0700 Subject: remote: concurrent blob downloads. Fixes #5215 This change introduces concurrent downloads of action outputs for remote caching/execution. So far, for an action we would download one output after the other which isn't as bad as it sounds as we would typically run dozens or hundreds of actions in parallel. However, for actions with a lot of outputs or graphs that allow limited parallelism we expect this change to positively impact performance. Note, that with this change the AbstractRemoteActionCache will attempt to always download all outputs concurrently. The actual parallelism is controlled by the underlying network transport. The gRPC transport currently enforces no limits on the concurrent calls, which should be fine given that all calls are multiplexed on a single network connection. The HTTP/1.1 transport also enforces no parallelism by default, but I have added the --remote_max_connections=INT flag which allows to specify an upper bound on the number of network connections to be open concurrently. I have introduced this flag as a defensive mechanism for users who's environment might enforce an upper bound on the number of open connections, as with this change its possible for the number of concurrently open connections to dramatically increase (from NumParallelActions to NumParallelActions * SumParallelActionOutputs). A side effect of this change is that it puts the infrastructure for retries and circuit breaking for the HttpBlobStore in place. RELNOTES: None PiperOrigin-RevId: 199005510 --- .../devtools/build/lib/remote/RetrierTest.java | 56 ++++++++++++++++++---- 1 file changed, 47 insertions(+), 9 deletions(-) (limited to 'src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java') diff --git a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java index 945c27d66d..624d074468 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java @@ -21,16 +21,22 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.remote.Retrier.Backoff; import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker; import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State; import com.google.devtools.build.lib.remote.Retrier.CircuitBreakerException; import com.google.devtools.build.lib.remote.Retrier.RetryException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,19 +55,31 @@ public class RetrierTest { private static final Predicate RETRY_ALL = (e) -> true; private static final Predicate RETRY_NONE = (e) -> false; + private static ListeningScheduledExecutorService retryService; + + @BeforeClass + public static void beforeEverything() { + retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + } + @Before public void setup() { MockitoAnnotations.initMocks(this); when(alwaysOpen.state()).thenReturn(State.ACCEPT_CALLS); } + @AfterClass + public static void afterEverything() { + retryService.shutdownNow(); + } + @Test public void retryShouldWork_failure() throws Exception { // Test that a call is retried according to the backoff. // All calls fail. Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); - Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen); + Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen); try { r.execute(() -> { throw new Exception("call failed"); @@ -81,7 +99,7 @@ public class RetrierTest { // All calls fail. Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); - Retrier r = new Retrier(s, RETRY_NONE, alwaysOpen); + Retrier r = new Retrier(s, RETRY_NONE, retryService, alwaysOpen); try { r.execute(() -> { throw new Exception("call failed"); @@ -101,7 +119,7 @@ public class RetrierTest { // The last call succeeds. Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); - Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen); + Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen); AtomicInteger numCalls = new AtomicInteger(); int val = r.execute(() -> { numCalls.incrementAndGet(); @@ -121,7 +139,7 @@ public class RetrierTest { // Test that nested calls using retries compose as expected. Supplier s = () -> new ZeroBackoff(/*maxRetries=*/1); - Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen); + Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen); AtomicInteger attemptsLvl0 = new AtomicInteger(); AtomicInteger attemptsLvl1 = new AtomicInteger(); @@ -152,7 +170,7 @@ public class RetrierTest { Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier r = new Retrier(s, RETRY_ALL, cb); + Retrier r = new Retrier(s, RETRY_ALL, retryService, cb); try { r.execute(() -> { @@ -174,7 +192,7 @@ public class RetrierTest { Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier r = new Retrier(s, RETRY_ALL, cb); + Retrier r = new Retrier(s, RETRY_ALL, retryService, cb); cb.trialCall(); @@ -192,7 +210,7 @@ public class RetrierTest { Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier r = new Retrier(s, RETRY_ALL, cb); + Retrier r = new Retrier(s, RETRY_ALL, retryService, cb); cb.trialCall(); @@ -214,7 +232,7 @@ public class RetrierTest { Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier r = new Retrier(s, RETRY_ALL, cb); + Retrier r = new Retrier(s, RETRY_ALL, retryService, cb); try { Thread.currentThread().interrupt(); @@ -230,7 +248,7 @@ public class RetrierTest { Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier r = new Retrier(s, RETRY_ALL, cb); + Retrier r = new Retrier(s, RETRY_ALL, retryService, cb); try { Thread.currentThread().interrupt(); @@ -242,6 +260,26 @@ public class RetrierTest { } } + @Test + public void asyncRetryShouldWork() throws Exception { + // Test that a call is retried according to the backoff. + // All calls fail. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/ 2); + Retrier r = new Retrier(s, RETRY_ALL, retryService, alwaysOpen); + try { + r.executeAsync( + () -> { + throw new Exception("call failed"); + }) + .get(); + fail("exception expected."); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(RetryException.class); + assertThat(((RetryException) e.getCause()).getAttempts()).isEqualTo(3); + } + } + /** * Simple circuit breaker that trips after N consecutive failures. */ -- cgit v1.2.3