diff options
author | 2018-07-12 04:01:45 -0700 | |
---|---|---|
committer | 2018-07-12 04:02:54 -0700 | |
commit | b50fe8607a94579e30561a7cfb65315e71a50238 (patch) | |
tree | 1e1f7a2d28936561fc336af2d4b315d758a66e36 /src/test/java/com/google | |
parent | ef3fed75e478fe63d7a0b9cd61c42d3c5aee509b (diff) |
remote: add a ByteStreamBuildEventArtifactUploader
This change allows local files referenced by the BEP/BES protocol
to be uploaded to a ByteStream gRPC service.
The ByteStreamUploader is now implicitly also used by the BES
module which has a different lifecycle than the remote module.
We introduce reference counting to ensure that the channel is
closed after its no longer needed. This also fixes a bug where
we currently leak one socket per remote build until the Bazel
server is shut down.
RELNOTES: None
PiperOrigin-RevId: 204275316
Diffstat (limited to 'src/test/java/com/google')
5 files changed, 423 insertions, 176 deletions
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java new file mode 100644 index 0000000000..75b46c461e --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -0,0 +1,236 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.bytestream.ByteStreamProto.WriteRequest; +import com.google.bytestream.ByteStreamProto.WriteResponse; +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile; +import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType; +import com.google.devtools.build.lib.buildeventstream.PathConverter; +import com.google.devtools.build.lib.clock.JavaClock; +import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.FixedBackoff; +import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.MaybeFailOnceUploadService; +import com.google.devtools.build.lib.remote.Retrier.RetryException; +import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; +import com.google.devtools.build.lib.vfs.DigestHashFunction; +import com.google.devtools.build.lib.vfs.FileSystem; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; +import com.google.devtools.remoteexecution.v1test.Digest; +import io.grpc.Context; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.util.MutableHandlerRegistry; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockitoAnnotations; + +/** Test for {@link ByteStreamBuildEventArtifactUploader}. */ +@RunWith(JUnit4.class) +public class ByteStreamBuildEventArtifactUploaderTest { + + private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256); + + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private static ListeningScheduledExecutorService retryService; + + private Server server; + private ManagedChannel channel; + private Context withEmptyMetadata; + private Context prevContext; + private final FileSystem fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256); + + @BeforeClass + public static void beforeEverything() { + retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + } + + @Before + public final void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + String serverName = "Server for " + this.getClass(); + server = + InProcessServerBuilder.forName(serverName) + .fallbackHandlerRegistry(serviceRegistry) + .build() + .start(); + channel = InProcessChannelBuilder.forName(serverName).build(); + withEmptyMetadata = + TracingMetadataUtils.contextWithMetadata( + "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance())); + // Needs to be repeated in every test that uses the timeout setting, since the tests run + // on different threads than the setUp. + prevContext = withEmptyMetadata.attach(); + } + + @After + public void tearDown() throws Exception { + // Needs to be repeated in every test that uses the timeout setting, since the tests run + // on different threads than the tearDown. + withEmptyMetadata.detach(prevContext); + + server.shutdownNow(); + server.awaitTermination(); + } + + @AfterClass + public static void afterEverything() { + retryService.shutdownNow(); + } + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void uploadsShouldWork() throws Exception { + int numUploads = 2; + Map<String, byte[]> blobsByHash = new HashMap<>(); + Map<Path, LocalFile> filesToUpload = new HashMap<>(); + Random rand = new Random(); + for (int i = 0; i < numUploads; i++) { + Path file = fs.getPath("/file" + i); + OutputStream out = file.getOutputStream(); + int blobSize = rand.nextInt(100) + 1; + byte[] blob = new byte[blobSize]; + rand.nextBytes(blob); + out.write(blob); + out.close(); + blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob); + filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT)); + } + serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); + + RemoteRetrier retrier = + new RemoteRetrier( + () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); + ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel); + ByteStreamUploader uploader = + new ByteStreamUploader("instance", refCntChannel, null, 3, retrier); + ByteStreamBuildEventArtifactUploader artifactUploader = + new ByteStreamBuildEventArtifactUploader( + uploader, "localhost", withEmptyMetadata, "instance"); + + PathConverter pathConverter = artifactUploader.upload(filesToUpload).get(); + for (Path file : filesToUpload.keySet()) { + String hash = BaseEncoding.base16().lowerCase().encode(file.getDigest()); + long size = file.getFileSize(); + String conversion = pathConverter.apply(file); + assertThat(conversion) + .isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size); + } + + artifactUploader.shutdown(); + + assertThat(uploader.refCnt()).isEqualTo(0); + assertThat(refCntChannel.isShutdown()).isTrue(); + } + + @Test + public void someUploadsFail() throws Exception { + // Test that if one of multiple file uploads fails, the upload future fails and that the + // error is propagated correctly. + + int numUploads = 10; + Map<String, byte[]> blobsByHash = new HashMap<>(); + Map<Path, LocalFile> filesToUpload = new HashMap<>(); + Random rand = new Random(); + for (int i = 0; i < numUploads; i++) { + Path file = fs.getPath("/file" + i); + OutputStream out = file.getOutputStream(); + int blobSize = rand.nextInt(100) + 1; + byte[] blob = new byte[blobSize]; + rand.nextBytes(blob); + out.write(blob); + out.flush(); + out.close(); + blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob); + filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT)); + } + String hashOfBlobThatShouldFail = blobsByHash.keySet().iterator().next(); + serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash) { + @Override + public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) { + StreamObserver<WriteRequest> delegate = super.write(response); + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest value) { + if (value.getResourceName().contains(hashOfBlobThatShouldFail)) { + response.onError(Status.CANCELLED.asException()); + } else { + delegate.onNext(value); + } + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); + } + + @Override + public void onCompleted() { + delegate.onCompleted(); + } + }; + } + }); + + RemoteRetrier retrier = + new RemoteRetrier( + () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); + ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel); + ByteStreamUploader uploader = + new ByteStreamUploader("instance", refCntChannel, null, 3, retrier); + ByteStreamBuildEventArtifactUploader artifactUploader = + new ByteStreamBuildEventArtifactUploader( + uploader, "localhost", withEmptyMetadata, "instance"); + + try { + artifactUploader.upload(filesToUpload).get(); + fail("exception expected."); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(RetryException.class); + assertThat(Status.fromThrowable(e).getCode()).isEqualTo(Status.CANCELLED.getCode()); + } + + artifactUploader.shutdown(); + + assertThat(uploader.refCnt()).isEqualTo(0); + assertThat(refCntChannel.isShutdown()).isTrue(); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 7b2dc7aefb..06ad3914ed 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -34,8 +34,8 @@ import com.google.devtools.remoteexecution.v1test.Digest; import com.google.devtools.remoteexecution.v1test.RequestMetadata; import com.google.protobuf.ByteString; import io.grpc.BindableService; -import io.grpc.Channel; import io.grpc.Context; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerCall; @@ -90,7 +90,7 @@ public class ByteStreamUploaderTest { private static ListeningScheduledExecutorService retryService; private Server server; - private Channel channel; + private ManagedChannel channel; private Context withEmptyMetadata; private Context prevContext; @@ -137,7 +137,8 @@ public class ByteStreamUploaderTest { Context prevContext = withEmptyMetadata.attach(); RemoteRetrier retrier = new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); - ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier); + ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, + new ReferenceCountedChannel(channel), null, 3, retrier); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -193,7 +194,7 @@ public class ByteStreamUploaderTest { } }); - uploader.uploadBlob(chunker); + uploader.uploadBlob(chunker, true); // This test should not have triggered any retries. Mockito.verifyZeroInteractions(mockBackoff); @@ -209,7 +210,8 @@ public class ByteStreamUploaderTest { RemoteRetrier retrier = new RemoteRetrier( () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); - ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier); + ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, + new ReferenceCountedChannel(channel), null, 3, retrier); int numUploads = 10; Map<String, byte[]> blobsByHash = new HashMap<>(); @@ -224,70 +226,9 @@ public class ByteStreamUploaderTest { blobsByHash.put(chunker.digest().getHash(), blob); } - Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>()); + serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - serviceRegistry.addService(new ByteStreamImplBase() { - @Override - public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) { - return new StreamObserver<WriteRequest>() { - - private String digestHash; - private byte[] receivedData; - private long nextOffset; - - @Override - public void onNext(WriteRequest writeRequest) { - if (nextOffset == 0) { - String resourceName = writeRequest.getResourceName(); - assertThat(resourceName).isNotEmpty(); - - String[] components = resourceName.split("/"); - assertThat(components).hasLength(6); - digestHash = components[4]; - assertThat(blobsByHash).containsKey(digestHash); - receivedData = new byte[Integer.parseInt(components[5])]; - } - assertThat(digestHash).isNotNull(); - // An upload for a given blob has a 10% chance to fail once during its lifetime. - // This is to exercise the retry mechanism a bit. - boolean shouldFail = - rand.nextInt(10) == 0 && !uploadsFailedOnce.contains(digestHash); - if (shouldFail) { - uploadsFailedOnce.add(digestHash); - response.onError(Status.INTERNAL.asException()); - return; - } - - ByteString data = writeRequest.getData(); - System.arraycopy( - data.toByteArray(), 0, receivedData, (int) nextOffset, data.size()); - nextOffset += data.size(); - - boolean lastWrite = nextOffset == receivedData.length; - assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite); - } - - @Override - public void onError(Throwable throwable) { - fail("onError should never be called."); - } - - @Override - public void onCompleted() { - byte[] expectedBlob = blobsByHash.get(digestHash); - assertThat(receivedData).isEqualTo(expectedBlob); - - WriteResponse writeResponse = - WriteResponse.newBuilder().setCommittedSize(receivedData.length).build(); - - response.onNext(writeResponse); - response.onCompleted(); - } - }; - } - }); - - uploader.uploadBlobs(builders); + uploader.uploadBlobs(builders, true); blockUntilInternalStateConsistent(uploader); @@ -302,7 +243,8 @@ public class ByteStreamUploaderTest { RemoteRetrier retrier = new RemoteRetrier( () -> new FixedBackoff(5, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); - ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier); + ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, + new ReferenceCountedChannel(channel), null, 3, retrier); List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc"); List<Chunker> builders = new ArrayList<>(toUpload.size()); @@ -372,7 +314,7 @@ public class ByteStreamUploaderTest { "build-req-id", "command-id", DIGEST_UTIL.asActionKey(chunker.digest())); ctx.call( () -> { - uploads.add(uploader.uploadBlobAsync(chunker)); + uploads.add(uploader.uploadBlobAsync(chunker, true)); return null; }); } @@ -393,7 +335,8 @@ public class ByteStreamUploaderTest { Context prevContext = withEmptyMetadata.attach(); RemoteRetrier retrier = new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); - ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier); + ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, + new ReferenceCountedChannel(channel), null, 3, retrier); byte[] blob = new byte[CHUNK_SIZE * 10]; Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL); @@ -435,8 +378,8 @@ public class ByteStreamUploaderTest { } }); - Future<?> upload1 = uploader.uploadBlobAsync(chunker); - Future<?> upload2 = uploader.uploadBlobAsync(chunker); + Future<?> upload1 = uploader.uploadBlobAsync(chunker, true); + Future<?> upload2 = uploader.uploadBlobAsync(chunker, true); blocker.countDown(); @@ -455,7 +398,8 @@ public class ByteStreamUploaderTest { RemoteRetrier retrier = new RemoteRetrier( () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); - ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier); + ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, + new ReferenceCountedChannel(channel), null, 3, retrier); byte[] blob = new byte[CHUNK_SIZE]; Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL); @@ -469,7 +413,7 @@ public class ByteStreamUploaderTest { }); try { - uploader.uploadBlob(chunker); + uploader.uploadBlob(chunker, true); fail("Should have thrown an exception."); } catch (RetryException e) { assertThat(e.getAttempts()).isEqualTo(2); @@ -485,7 +429,8 @@ public class ByteStreamUploaderTest { RemoteRetrier retrier = new RemoteRetrier( () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); - ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier); + ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, + new ReferenceCountedChannel(channel), null, 3, retrier); CountDownLatch cancellations = new CountDownLatch(2); @@ -520,8 +465,8 @@ public class ByteStreamUploaderTest { byte[] blob2 = new byte[CHUNK_SIZE + 1]; Chunker chunker2 = new Chunker(blob2, CHUNK_SIZE, DIGEST_UTIL); - ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1); - ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2); + ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1, true); + ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2, true); assertThat(uploader.uploadsInProgress()).isTrue(); @@ -545,7 +490,8 @@ public class ByteStreamUploaderTest { RemoteRetrier retrier = new RemoteRetrier( () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); - ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier); + ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, + new ReferenceCountedChannel(channel), null, 3, retrier); serviceRegistry.addService(new ByteStreamImplBase() { @Override @@ -564,7 +510,7 @@ public class ByteStreamUploaderTest { byte[] blob = new byte[1]; Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL); try { - uploader.uploadBlob(chunker); + uploader.uploadBlob(chunker, true); fail("Should have thrown an exception."); } catch (RetryException e) { assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class); @@ -579,7 +525,8 @@ public class ByteStreamUploaderTest { RemoteRetrier retrier = new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = - new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier); + new ByteStreamUploader(/* instanceName */ null, + new ReferenceCountedChannel(channel), null, 3, retrier); serviceRegistry.addService(new ByteStreamImplBase() { @Override @@ -608,7 +555,7 @@ public class ByteStreamUploaderTest { byte[] blob = new byte[1]; Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL); - uploader.uploadBlob(chunker); + uploader.uploadBlob(chunker, true); withEmptyMetadata.detach(prevContext); } @@ -623,7 +570,8 @@ public class ByteStreamUploaderTest { retryService, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = - new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier); + new ByteStreamUploader(/* instanceName */ null, + new ReferenceCountedChannel(channel), null, 3, retrier); AtomicInteger numCalls = new AtomicInteger(); @@ -640,7 +588,7 @@ public class ByteStreamUploaderTest { Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL); try { - uploader.uploadBlob(chunker); + uploader.uploadBlob(chunker, true); fail("Should have thrown an exception."); } catch (RetryException e) { assertThat(numCalls.get()).isEqualTo(1); @@ -649,6 +597,67 @@ public class ByteStreamUploaderTest { withEmptyMetadata.detach(prevContext); } + @Test + public void deduplicationOfUploadsShouldWork() throws Exception { + Context prevContext = withEmptyMetadata.attach(); + RemoteRetrier retrier = + new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS); + ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, + new ReferenceCountedChannel(channel), null, 3, retrier); + + byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; + new Random().nextBytes(blob); + + Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL); + + AtomicInteger numUploads = new AtomicInteger(); + serviceRegistry.addService(new ByteStreamImplBase() { + @Override + public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) { + numUploads.incrementAndGet(); + return new StreamObserver<WriteRequest>() { + + long nextOffset = 0; + + @Override + public void onNext(WriteRequest writeRequest) { + nextOffset += writeRequest.getData().size(); + boolean lastWrite = blob.length == nextOffset; + assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite); + } + + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } + + @Override + public void onCompleted() { + assertThat(nextOffset).isEqualTo(blob.length); + + WriteResponse response = + WriteResponse.newBuilder().setCommittedSize(nextOffset).build(); + streamObserver.onNext(response); + streamObserver.onCompleted(); + } + }; + } + }); + + uploader.uploadBlob(chunker, true); + // This should not trigger an upload. + uploader.uploadBlob(chunker, false); + + assertThat(numUploads.get()).isEqualTo(1); + + // This test should not have triggered any retries. + Mockito.verifyZeroInteractions(mockBackoff); + + blockUntilInternalStateConsistent(uploader); + + withEmptyMetadata.detach(prevContext); + } + private static class NoopStreamObserver implements StreamObserver<WriteRequest> { @Override public void onNext(WriteRequest writeRequest) { @@ -663,7 +672,7 @@ public class ByteStreamUploaderTest { } } - private static class FixedBackoff implements Retrier.Backoff { + static class FixedBackoff implements Retrier.Backoff { private final int maxRetries; private final int delayMillis; @@ -690,6 +699,80 @@ public class ByteStreamUploaderTest { } } + /** + * An byte stream service where an upload for a given blob may or may not fail on the first + * attempt but is guaranteed to succeed on the second try. + */ + static class MaybeFailOnceUploadService extends ByteStreamImplBase { + + private final Map<String, byte[]> blobsByHash; + private final Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>()); + private final Random rand = new Random(); + + MaybeFailOnceUploadService(Map<String, byte[]> blobsByHash) { + this.blobsByHash = blobsByHash; + } + + @Override + public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) { + return new StreamObserver<WriteRequest>() { + + private String digestHash; + private byte[] receivedData; + private long nextOffset; + + @Override + public void onNext(WriteRequest writeRequest) { + if (nextOffset == 0) { + String resourceName = writeRequest.getResourceName(); + assertThat(resourceName).isNotEmpty(); + + String[] components = resourceName.split("/"); + assertThat(components).hasLength(6); + digestHash = components[4]; + assertThat(blobsByHash).containsKey(digestHash); + receivedData = new byte[Integer.parseInt(components[5])]; + } + assertThat(digestHash).isNotNull(); + // An upload for a given blob has a 10% chance to fail once during its lifetime. + // This is to exercise the retry mechanism a bit. + boolean shouldFail = + rand.nextInt(10) == 0 && !uploadsFailedOnce.contains(digestHash); + if (shouldFail) { + uploadsFailedOnce.add(digestHash); + response.onError(Status.INTERNAL.asException()); + return; + } + + ByteString data = writeRequest.getData(); + System.arraycopy( + data.toByteArray(), 0, receivedData, (int) nextOffset, data.size()); + nextOffset += data.size(); + + boolean lastWrite = nextOffset == receivedData.length; + assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite); + } + + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } + + @Override + public void onCompleted() { + byte[] expectedBlob = blobsByHash.get(digestHash); + assertThat(receivedData).isEqualTo(expectedBlob); + + WriteResponse writeResponse = + WriteResponse.newBuilder().setCommittedSize(receivedData.length).build(); + + response.onNext(writeResponse); + response.onCompleted(); + } + }; + } + } + private void blockUntilInternalStateConsistent(ByteStreamUploader uploader) throws Exception { // Poll until all upload futures have been removed from the internal hash map. The polling is // necessary, as listeners are executed after Future.get() calls are notified about completion. diff --git a/src/test/java/com/google/devtools/build/lib/remote/CasPathConverterTest.java b/src/test/java/com/google/devtools/build/lib/remote/CasPathConverterTest.java deleted file mode 100644 index 5c6dcf2627..0000000000 --- a/src/test/java/com/google/devtools/build/lib/remote/CasPathConverterTest.java +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.devtools.build.lib.remote.RemoteModule.CasPathConverter; -import com.google.devtools.build.lib.remote.util.DigestUtil; -import com.google.devtools.build.lib.vfs.DigestHashFunction; -import com.google.devtools.build.lib.vfs.FileSystem; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.Path; -import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; -import com.google.devtools.common.options.Options; -import com.google.devtools.common.options.OptionsParser; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link CasPathConverter}. */ -@RunWith(JUnit4.class) -public class CasPathConverterTest { - private final FileSystem fs = new InMemoryFileSystem(); - private final CasPathConverter converter = new CasPathConverter(); - - @Test - public void noOptionsShouldntCrash() { - converter.digestUtil = new DigestUtil(DigestHashFunction.SHA256); - assertThat(converter.apply(fs.getPath("/foo"))).isEqualTo("file:///foo"); - } - - @Test - public void noDigestUtilShouldntCrash() { - converter.options = Options.getDefaults(RemoteOptions.class); - assertThat(converter.apply(fs.getPath("/foo"))).isEqualTo("file:///foo"); - } - - @Test - public void disabledRemote() { - converter.options = Options.getDefaults(RemoteOptions.class); - converter.digestUtil = new DigestUtil(DigestHashFunction.SHA256); - assertThat(converter.apply(fs.getPath("/foo"))).isEqualTo("file:///foo"); - } - - @Test - public void enabledRemoteExecutorNoRemoteInstance() throws Exception { - OptionsParser parser = OptionsParser.newOptionsParser(RemoteOptions.class); - parser.parse("--remote_cache=machine"); - converter.options = parser.getOptions(RemoteOptions.class); - converter.digestUtil = new DigestUtil(DigestHashFunction.SHA256); - Path path = fs.getPath("/foo"); - FileSystemUtils.writeContentAsLatin1(path, "foobar"); - assertThat(converter.apply(fs.getPath("/foo"))) - .isEqualTo("bytestream://machine/blobs/3858f62230ac3c915f300c664312c63f/6"); - } - - @Test - public void enabledRemoteExecutorWithRemoteInstance() throws Exception { - OptionsParser parser = OptionsParser.newOptionsParser(RemoteOptions.class); - parser.parse("--remote_cache=machine", "--remote_instance_name=projects/bazel"); - converter.options = parser.getOptions(RemoteOptions.class); - converter.digestUtil = new DigestUtil(DigestHashFunction.SHA256); - Path path = fs.getPath("/foo"); - FileSystemUtils.writeContentAsLatin1(path, "foobar"); - assertThat(converter.apply(fs.getPath("/foo"))) - .isEqualTo("bytestream://machine/projects/bazel/blobs/3858f62230ac3c915f300c664312c63f/6"); - } -} diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java index 4fc5869276..3c61fd149d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java @@ -64,7 +64,6 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; -import io.grpc.ClientInterceptors; import io.grpc.Context; import io.grpc.MethodDescriptor; import io.grpc.Server; @@ -177,9 +176,9 @@ public class GrpcRemoteCacheTest { Scratch scratch = new Scratch(); scratch.file(authTlsOptions.googleCredentials, new JacksonFactory().toString(json)); - CallCredentials creds = null; + CallCredentials creds; try (InputStream in = scratch.resolve(authTlsOptions.googleCredentials).getInputStream()) { - GoogleAuthUtils.newCallCredentials(in, authTlsOptions.googleAuthScopes); + creds = GoogleAuthUtils.newCallCredentials(in, authTlsOptions.googleAuthScopes); } RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); RemoteRetrier retrier = @@ -188,14 +187,18 @@ public class GrpcRemoteCacheTest { RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService, Retrier.ALLOW_ALL_CALLS); - return new GrpcRemoteCache( - ClientInterceptors.intercept( - InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), - ImmutableList.of(new CallCredentialsInterceptor(creds))), + ReferenceCountedChannel channel = + new ReferenceCountedChannel(InProcessChannelBuilder.forName(fakeServerName).directExecutor() + .intercept(new CallCredentialsInterceptor(creds)).build()); + ByteStreamUploader uploader = + new ByteStreamUploader(remoteOptions.remoteInstanceName, channel.retain(), creds, + remoteOptions.remoteTimeout, retrier); + return new GrpcRemoteCache(channel.retain(), creds, remoteOptions, retrier, - DIGEST_UTIL); + DIGEST_UTIL, + uploader); } @Test diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java index 849259f4a2..93d1cdd447 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java @@ -80,7 +80,6 @@ import com.google.watcher.v1.Request; import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; import io.grpc.BindableService; import io.grpc.CallCredentials; -import io.grpc.Channel; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerCall; @@ -258,13 +257,17 @@ public class GrpcRemoteExecutionClientTest { RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService, Retrier.ALLOW_ALL_CALLS); - Channel channel = InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(); + ReferenceCountedChannel channel = + new ReferenceCountedChannel(InProcessChannelBuilder.forName(fakeServerName).directExecutor().build()); GrpcRemoteExecutor executor = - new GrpcRemoteExecutor(channel, null, remoteOptions.remoteTimeout, retrier); + new GrpcRemoteExecutor(channel.retain(), null, remoteOptions.remoteTimeout, retrier); CallCredentials creds = GoogleAuthUtils.newCallCredentials(Options.getDefaults(AuthAndTLSOptions.class)); + ByteStreamUploader uploader = + new ByteStreamUploader(remoteOptions.remoteInstanceName, channel.retain(), creds, + remoteOptions.remoteTimeout, retrier); GrpcRemoteCache remoteCache = - new GrpcRemoteCache(channel, creds, remoteOptions, retrier, DIGEST_UTIL); + new GrpcRemoteCache(channel.retain(), creds, remoteOptions, retrier, DIGEST_UTIL, uploader); client = new RemoteSpawnRunner( execRoot, @@ -281,6 +284,7 @@ public class GrpcRemoteExecutionClientTest { DIGEST_UTIL, logDir); inputDigest = fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz"); + channel.release(); } @After |