aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/test/java/com/google
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2018-07-12 04:01:45 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-07-12 04:02:54 -0700
commitb50fe8607a94579e30561a7cfb65315e71a50238 (patch)
tree1e1f7a2d28936561fc336af2d4b315d758a66e36 /src/test/java/com/google
parentef3fed75e478fe63d7a0b9cd61c42d3c5aee509b (diff)
remote: add a ByteStreamBuildEventArtifactUploader
This change allows local files referenced by the BEP/BES protocol to be uploaded to a ByteStream gRPC service. The ByteStreamUploader is now implicitly also used by the BES module which has a different lifecycle than the remote module. We introduce reference counting to ensure that the channel is closed after its no longer needed. This also fixes a bug where we currently leak one socket per remote build until the Bazel server is shut down. RELNOTES: None PiperOrigin-RevId: 204275316
Diffstat (limited to 'src/test/java/com/google')
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java236
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java253
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/CasPathConverterTest.java79
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java19
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java12
5 files changed, 423 insertions, 176 deletions
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
new file mode 100644
index 0000000000..75b46c461e
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -0,0 +1,236 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
+import com.google.devtools.build.lib.buildeventstream.PathConverter;
+import com.google.devtools.build.lib.clock.JavaClock;
+import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.FixedBackoff;
+import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.MaybeFailOnceUploadService;
+import com.google.devtools.build.lib.remote.Retrier.RetryException;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
+import com.google.devtools.build.lib.vfs.DigestHashFunction;
+import com.google.devtools.build.lib.vfs.FileSystem;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import io.grpc.Context;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+
+/** Test for {@link ByteStreamBuildEventArtifactUploader}. */
+@RunWith(JUnit4.class)
+public class ByteStreamBuildEventArtifactUploaderTest {
+
+ private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256);
+
+ private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+ private static ListeningScheduledExecutorService retryService;
+
+ private Server server;
+ private ManagedChannel channel;
+ private Context withEmptyMetadata;
+ private Context prevContext;
+ private final FileSystem fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
+
+ @BeforeClass
+ public static void beforeEverything() {
+ retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+ }
+
+ @Before
+ public final void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ String serverName = "Server for " + this.getClass();
+ server =
+ InProcessServerBuilder.forName(serverName)
+ .fallbackHandlerRegistry(serviceRegistry)
+ .build()
+ .start();
+ channel = InProcessChannelBuilder.forName(serverName).build();
+ withEmptyMetadata =
+ TracingMetadataUtils.contextWithMetadata(
+ "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
+ // Needs to be repeated in every test that uses the timeout setting, since the tests run
+ // on different threads than the setUp.
+ prevContext = withEmptyMetadata.attach();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // Needs to be repeated in every test that uses the timeout setting, since the tests run
+ // on different threads than the tearDown.
+ withEmptyMetadata.detach(prevContext);
+
+ server.shutdownNow();
+ server.awaitTermination();
+ }
+
+ @AfterClass
+ public static void afterEverything() {
+ retryService.shutdownNow();
+ }
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void uploadsShouldWork() throws Exception {
+ int numUploads = 2;
+ Map<String, byte[]> blobsByHash = new HashMap<>();
+ Map<Path, LocalFile> filesToUpload = new HashMap<>();
+ Random rand = new Random();
+ for (int i = 0; i < numUploads; i++) {
+ Path file = fs.getPath("/file" + i);
+ OutputStream out = file.getOutputStream();
+ int blobSize = rand.nextInt(100) + 1;
+ byte[] blob = new byte[blobSize];
+ rand.nextBytes(blob);
+ out.write(blob);
+ out.close();
+ blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob);
+ filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
+ }
+ serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader("instance", refCntChannel, null, 3, retrier);
+ ByteStreamBuildEventArtifactUploader artifactUploader =
+ new ByteStreamBuildEventArtifactUploader(
+ uploader, "localhost", withEmptyMetadata, "instance");
+
+ PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
+ for (Path file : filesToUpload.keySet()) {
+ String hash = BaseEncoding.base16().lowerCase().encode(file.getDigest());
+ long size = file.getFileSize();
+ String conversion = pathConverter.apply(file);
+ assertThat(conversion)
+ .isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size);
+ }
+
+ artifactUploader.shutdown();
+
+ assertThat(uploader.refCnt()).isEqualTo(0);
+ assertThat(refCntChannel.isShutdown()).isTrue();
+ }
+
+ @Test
+ public void someUploadsFail() throws Exception {
+ // Test that if one of multiple file uploads fails, the upload future fails and that the
+ // error is propagated correctly.
+
+ int numUploads = 10;
+ Map<String, byte[]> blobsByHash = new HashMap<>();
+ Map<Path, LocalFile> filesToUpload = new HashMap<>();
+ Random rand = new Random();
+ for (int i = 0; i < numUploads; i++) {
+ Path file = fs.getPath("/file" + i);
+ OutputStream out = file.getOutputStream();
+ int blobSize = rand.nextInt(100) + 1;
+ byte[] blob = new byte[blobSize];
+ rand.nextBytes(blob);
+ out.write(blob);
+ out.flush();
+ out.close();
+ blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob);
+ filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
+ }
+ String hashOfBlobThatShouldFail = blobsByHash.keySet().iterator().next();
+ serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash) {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ StreamObserver<WriteRequest> delegate = super.write(response);
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest value) {
+ if (value.getResourceName().contains(hashOfBlobThatShouldFail)) {
+ response.onError(Status.CANCELLED.asException());
+ } else {
+ delegate.onNext(value);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ delegate.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ delegate.onCompleted();
+ }
+ };
+ }
+ });
+
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader("instance", refCntChannel, null, 3, retrier);
+ ByteStreamBuildEventArtifactUploader artifactUploader =
+ new ByteStreamBuildEventArtifactUploader(
+ uploader, "localhost", withEmptyMetadata, "instance");
+
+ try {
+ artifactUploader.upload(filesToUpload).get();
+ fail("exception expected.");
+ } catch (ExecutionException e) {
+ assertThat(e.getCause()).isInstanceOf(RetryException.class);
+ assertThat(Status.fromThrowable(e).getCode()).isEqualTo(Status.CANCELLED.getCode());
+ }
+
+ artifactUploader.shutdown();
+
+ assertThat(uploader.refCnt()).isEqualTo(0);
+ assertThat(refCntChannel.isShutdown()).isTrue();
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 7b2dc7aefb..06ad3914ed 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -34,8 +34,8 @@ import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.protobuf.ByteString;
import io.grpc.BindableService;
-import io.grpc.Channel;
import io.grpc.Context;
+import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
@@ -90,7 +90,7 @@ public class ByteStreamUploaderTest {
private static ListeningScheduledExecutorService retryService;
private Server server;
- private Channel channel;
+ private ManagedChannel channel;
private Context withEmptyMetadata;
private Context prevContext;
@@ -137,7 +137,8 @@ public class ByteStreamUploaderTest {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -193,7 +194,7 @@ public class ByteStreamUploaderTest {
}
});
- uploader.uploadBlob(chunker);
+ uploader.uploadBlob(chunker, true);
// This test should not have triggered any retries.
Mockito.verifyZeroInteractions(mockBackoff);
@@ -209,7 +210,8 @@ public class ByteStreamUploaderTest {
RemoteRetrier retrier =
new RemoteRetrier(
() -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
int numUploads = 10;
Map<String, byte[]> blobsByHash = new HashMap<>();
@@ -224,70 +226,9 @@ public class ByteStreamUploaderTest {
blobsByHash.put(chunker.digest().getHash(), blob);
}
- Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>());
+ serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- serviceRegistry.addService(new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
- return new StreamObserver<WriteRequest>() {
-
- private String digestHash;
- private byte[] receivedData;
- private long nextOffset;
-
- @Override
- public void onNext(WriteRequest writeRequest) {
- if (nextOffset == 0) {
- String resourceName = writeRequest.getResourceName();
- assertThat(resourceName).isNotEmpty();
-
- String[] components = resourceName.split("/");
- assertThat(components).hasLength(6);
- digestHash = components[4];
- assertThat(blobsByHash).containsKey(digestHash);
- receivedData = new byte[Integer.parseInt(components[5])];
- }
- assertThat(digestHash).isNotNull();
- // An upload for a given blob has a 10% chance to fail once during its lifetime.
- // This is to exercise the retry mechanism a bit.
- boolean shouldFail =
- rand.nextInt(10) == 0 && !uploadsFailedOnce.contains(digestHash);
- if (shouldFail) {
- uploadsFailedOnce.add(digestHash);
- response.onError(Status.INTERNAL.asException());
- return;
- }
-
- ByteString data = writeRequest.getData();
- System.arraycopy(
- data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
- nextOffset += data.size();
-
- boolean lastWrite = nextOffset == receivedData.length;
- assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
- }
-
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
-
- @Override
- public void onCompleted() {
- byte[] expectedBlob = blobsByHash.get(digestHash);
- assertThat(receivedData).isEqualTo(expectedBlob);
-
- WriteResponse writeResponse =
- WriteResponse.newBuilder().setCommittedSize(receivedData.length).build();
-
- response.onNext(writeResponse);
- response.onCompleted();
- }
- };
- }
- });
-
- uploader.uploadBlobs(builders);
+ uploader.uploadBlobs(builders, true);
blockUntilInternalStateConsistent(uploader);
@@ -302,7 +243,8 @@ public class ByteStreamUploaderTest {
RemoteRetrier retrier =
new RemoteRetrier(
() -> new FixedBackoff(5, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
List<Chunker> builders = new ArrayList<>(toUpload.size());
@@ -372,7 +314,7 @@ public class ByteStreamUploaderTest {
"build-req-id", "command-id", DIGEST_UTIL.asActionKey(chunker.digest()));
ctx.call(
() -> {
- uploads.add(uploader.uploadBlobAsync(chunker));
+ uploads.add(uploader.uploadBlobAsync(chunker, true));
return null;
});
}
@@ -393,7 +335,8 @@ public class ByteStreamUploaderTest {
Context prevContext = withEmptyMetadata.attach();
RemoteRetrier retrier =
new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE * 10];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -435,8 +378,8 @@ public class ByteStreamUploaderTest {
}
});
- Future<?> upload1 = uploader.uploadBlobAsync(chunker);
- Future<?> upload2 = uploader.uploadBlobAsync(chunker);
+ Future<?> upload1 = uploader.uploadBlobAsync(chunker, true);
+ Future<?> upload2 = uploader.uploadBlobAsync(chunker, true);
blocker.countDown();
@@ -455,7 +398,8 @@ public class ByteStreamUploaderTest {
RemoteRetrier retrier =
new RemoteRetrier(
() -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -469,7 +413,7 @@ public class ByteStreamUploaderTest {
});
try {
- uploader.uploadBlob(chunker);
+ uploader.uploadBlob(chunker, true);
fail("Should have thrown an exception.");
} catch (RetryException e) {
assertThat(e.getAttempts()).isEqualTo(2);
@@ -485,7 +429,8 @@ public class ByteStreamUploaderTest {
RemoteRetrier retrier =
new RemoteRetrier(
() -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
CountDownLatch cancellations = new CountDownLatch(2);
@@ -520,8 +465,8 @@ public class ByteStreamUploaderTest {
byte[] blob2 = new byte[CHUNK_SIZE + 1];
Chunker chunker2 = new Chunker(blob2, CHUNK_SIZE, DIGEST_UTIL);
- ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1);
- ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2);
+ ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1, true);
+ ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2, true);
assertThat(uploader.uploadsInProgress()).isTrue();
@@ -545,7 +490,8 @@ public class ByteStreamUploaderTest {
RemoteRetrier retrier =
new RemoteRetrier(
() -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
- ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -564,7 +510,7 @@ public class ByteStreamUploaderTest {
byte[] blob = new byte[1];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
try {
- uploader.uploadBlob(chunker);
+ uploader.uploadBlob(chunker, true);
fail("Should have thrown an exception.");
} catch (RetryException e) {
assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -579,7 +525,8 @@ public class ByteStreamUploaderTest {
RemoteRetrier retrier =
new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
ByteStreamUploader uploader =
- new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
+ new ByteStreamUploader(/* instanceName */ null,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -608,7 +555,7 @@ public class ByteStreamUploaderTest {
byte[] blob = new byte[1];
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
- uploader.uploadBlob(chunker);
+ uploader.uploadBlob(chunker, true);
withEmptyMetadata.detach(prevContext);
}
@@ -623,7 +570,8 @@ public class ByteStreamUploaderTest {
retryService,
Retrier.ALLOW_ALL_CALLS);
ByteStreamUploader uploader =
- new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
+ new ByteStreamUploader(/* instanceName */ null,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
AtomicInteger numCalls = new AtomicInteger();
@@ -640,7 +588,7 @@ public class ByteStreamUploaderTest {
Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
try {
- uploader.uploadBlob(chunker);
+ uploader.uploadBlob(chunker, true);
fail("Should have thrown an exception.");
} catch (RetryException e) {
assertThat(numCalls.get()).isEqualTo(1);
@@ -649,6 +597,67 @@ public class ByteStreamUploaderTest {
withEmptyMetadata.detach(prevContext);
}
+ @Test
+ public void deduplicationOfUploadsShouldWork() throws Exception {
+ Context prevContext = withEmptyMetadata.attach();
+ RemoteRetrier retrier =
+ new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+ ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+ new ReferenceCountedChannel(channel), null, 3, retrier);
+
+ byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
+ new Random().nextBytes(blob);
+
+ Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
+
+ AtomicInteger numUploads = new AtomicInteger();
+ serviceRegistry.addService(new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ numUploads.incrementAndGet();
+ return new StreamObserver<WriteRequest>() {
+
+ long nextOffset = 0;
+
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ nextOffset += writeRequest.getData().size();
+ boolean lastWrite = blob.length == nextOffset;
+ assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ assertThat(nextOffset).isEqualTo(blob.length);
+
+ WriteResponse response =
+ WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+ streamObserver.onNext(response);
+ streamObserver.onCompleted();
+ }
+ };
+ }
+ });
+
+ uploader.uploadBlob(chunker, true);
+ // This should not trigger an upload.
+ uploader.uploadBlob(chunker, false);
+
+ assertThat(numUploads.get()).isEqualTo(1);
+
+ // This test should not have triggered any retries.
+ Mockito.verifyZeroInteractions(mockBackoff);
+
+ blockUntilInternalStateConsistent(uploader);
+
+ withEmptyMetadata.detach(prevContext);
+ }
+
private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
@Override
public void onNext(WriteRequest writeRequest) {
@@ -663,7 +672,7 @@ public class ByteStreamUploaderTest {
}
}
- private static class FixedBackoff implements Retrier.Backoff {
+ static class FixedBackoff implements Retrier.Backoff {
private final int maxRetries;
private final int delayMillis;
@@ -690,6 +699,80 @@ public class ByteStreamUploaderTest {
}
}
+ /**
+ * An byte stream service where an upload for a given blob may or may not fail on the first
+ * attempt but is guaranteed to succeed on the second try.
+ */
+ static class MaybeFailOnceUploadService extends ByteStreamImplBase {
+
+ private final Map<String, byte[]> blobsByHash;
+ private final Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>());
+ private final Random rand = new Random();
+
+ MaybeFailOnceUploadService(Map<String, byte[]> blobsByHash) {
+ this.blobsByHash = blobsByHash;
+ }
+
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ return new StreamObserver<WriteRequest>() {
+
+ private String digestHash;
+ private byte[] receivedData;
+ private long nextOffset;
+
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ if (nextOffset == 0) {
+ String resourceName = writeRequest.getResourceName();
+ assertThat(resourceName).isNotEmpty();
+
+ String[] components = resourceName.split("/");
+ assertThat(components).hasLength(6);
+ digestHash = components[4];
+ assertThat(blobsByHash).containsKey(digestHash);
+ receivedData = new byte[Integer.parseInt(components[5])];
+ }
+ assertThat(digestHash).isNotNull();
+ // An upload for a given blob has a 10% chance to fail once during its lifetime.
+ // This is to exercise the retry mechanism a bit.
+ boolean shouldFail =
+ rand.nextInt(10) == 0 && !uploadsFailedOnce.contains(digestHash);
+ if (shouldFail) {
+ uploadsFailedOnce.add(digestHash);
+ response.onError(Status.INTERNAL.asException());
+ return;
+ }
+
+ ByteString data = writeRequest.getData();
+ System.arraycopy(
+ data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
+ nextOffset += data.size();
+
+ boolean lastWrite = nextOffset == receivedData.length;
+ assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ byte[] expectedBlob = blobsByHash.get(digestHash);
+ assertThat(receivedData).isEqualTo(expectedBlob);
+
+ WriteResponse writeResponse =
+ WriteResponse.newBuilder().setCommittedSize(receivedData.length).build();
+
+ response.onNext(writeResponse);
+ response.onCompleted();
+ }
+ };
+ }
+ }
+
private void blockUntilInternalStateConsistent(ByteStreamUploader uploader) throws Exception {
// Poll until all upload futures have been removed from the internal hash map. The polling is
// necessary, as listeners are executed after Future.get() calls are notified about completion.
diff --git a/src/test/java/com/google/devtools/build/lib/remote/CasPathConverterTest.java b/src/test/java/com/google/devtools/build/lib/remote/CasPathConverterTest.java
deleted file mode 100644
index 5c6dcf2627..0000000000
--- a/src/test/java/com/google/devtools/build/lib/remote/CasPathConverterTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.devtools.build.lib.remote.RemoteModule.CasPathConverter;
-import com.google.devtools.build.lib.remote.util.DigestUtil;
-import com.google.devtools.build.lib.vfs.DigestHashFunction;
-import com.google.devtools.build.lib.vfs.FileSystem;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
-import com.google.devtools.common.options.Options;
-import com.google.devtools.common.options.OptionsParser;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link CasPathConverter}. */
-@RunWith(JUnit4.class)
-public class CasPathConverterTest {
- private final FileSystem fs = new InMemoryFileSystem();
- private final CasPathConverter converter = new CasPathConverter();
-
- @Test
- public void noOptionsShouldntCrash() {
- converter.digestUtil = new DigestUtil(DigestHashFunction.SHA256);
- assertThat(converter.apply(fs.getPath("/foo"))).isEqualTo("file:///foo");
- }
-
- @Test
- public void noDigestUtilShouldntCrash() {
- converter.options = Options.getDefaults(RemoteOptions.class);
- assertThat(converter.apply(fs.getPath("/foo"))).isEqualTo("file:///foo");
- }
-
- @Test
- public void disabledRemote() {
- converter.options = Options.getDefaults(RemoteOptions.class);
- converter.digestUtil = new DigestUtil(DigestHashFunction.SHA256);
- assertThat(converter.apply(fs.getPath("/foo"))).isEqualTo("file:///foo");
- }
-
- @Test
- public void enabledRemoteExecutorNoRemoteInstance() throws Exception {
- OptionsParser parser = OptionsParser.newOptionsParser(RemoteOptions.class);
- parser.parse("--remote_cache=machine");
- converter.options = parser.getOptions(RemoteOptions.class);
- converter.digestUtil = new DigestUtil(DigestHashFunction.SHA256);
- Path path = fs.getPath("/foo");
- FileSystemUtils.writeContentAsLatin1(path, "foobar");
- assertThat(converter.apply(fs.getPath("/foo")))
- .isEqualTo("bytestream://machine/blobs/3858f62230ac3c915f300c664312c63f/6");
- }
-
- @Test
- public void enabledRemoteExecutorWithRemoteInstance() throws Exception {
- OptionsParser parser = OptionsParser.newOptionsParser(RemoteOptions.class);
- parser.parse("--remote_cache=machine", "--remote_instance_name=projects/bazel");
- converter.options = parser.getOptions(RemoteOptions.class);
- converter.digestUtil = new DigestUtil(DigestHashFunction.SHA256);
- Path path = fs.getPath("/foo");
- FileSystemUtils.writeContentAsLatin1(path, "foobar");
- assertThat(converter.apply(fs.getPath("/foo")))
- .isEqualTo("bytestream://machine/projects/bazel/blobs/3858f62230ac3c915f300c664312c63f/6");
- }
-}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
index 4fc5869276..3c61fd149d 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
@@ -64,7 +64,6 @@ import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
-import io.grpc.ClientInterceptors;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
@@ -177,9 +176,9 @@ public class GrpcRemoteCacheTest {
Scratch scratch = new Scratch();
scratch.file(authTlsOptions.googleCredentials, new JacksonFactory().toString(json));
- CallCredentials creds = null;
+ CallCredentials creds;
try (InputStream in = scratch.resolve(authTlsOptions.googleCredentials).getInputStream()) {
- GoogleAuthUtils.newCallCredentials(in, authTlsOptions.googleAuthScopes);
+ creds = GoogleAuthUtils.newCallCredentials(in, authTlsOptions.googleAuthScopes);
}
RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
RemoteRetrier retrier =
@@ -188,14 +187,18 @@ public class GrpcRemoteCacheTest {
RemoteRetrier.RETRIABLE_GRPC_ERRORS,
retryService,
Retrier.ALLOW_ALL_CALLS);
- return new GrpcRemoteCache(
- ClientInterceptors.intercept(
- InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(),
- ImmutableList.of(new CallCredentialsInterceptor(creds))),
+ ReferenceCountedChannel channel =
+ new ReferenceCountedChannel(InProcessChannelBuilder.forName(fakeServerName).directExecutor()
+ .intercept(new CallCredentialsInterceptor(creds)).build());
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(remoteOptions.remoteInstanceName, channel.retain(), creds,
+ remoteOptions.remoteTimeout, retrier);
+ return new GrpcRemoteCache(channel.retain(),
creds,
remoteOptions,
retrier,
- DIGEST_UTIL);
+ DIGEST_UTIL,
+ uploader);
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index 849259f4a2..93d1cdd447 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -80,7 +80,6 @@ import com.google.watcher.v1.Request;
import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
import io.grpc.BindableService;
import io.grpc.CallCredentials;
-import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
@@ -258,13 +257,17 @@ public class GrpcRemoteExecutionClientTest {
RemoteRetrier.RETRIABLE_GRPC_ERRORS,
retryService,
Retrier.ALLOW_ALL_CALLS);
- Channel channel = InProcessChannelBuilder.forName(fakeServerName).directExecutor().build();
+ ReferenceCountedChannel channel =
+ new ReferenceCountedChannel(InProcessChannelBuilder.forName(fakeServerName).directExecutor().build());
GrpcRemoteExecutor executor =
- new GrpcRemoteExecutor(channel, null, remoteOptions.remoteTimeout, retrier);
+ new GrpcRemoteExecutor(channel.retain(), null, remoteOptions.remoteTimeout, retrier);
CallCredentials creds =
GoogleAuthUtils.newCallCredentials(Options.getDefaults(AuthAndTLSOptions.class));
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(remoteOptions.remoteInstanceName, channel.retain(), creds,
+ remoteOptions.remoteTimeout, retrier);
GrpcRemoteCache remoteCache =
- new GrpcRemoteCache(channel, creds, remoteOptions, retrier, DIGEST_UTIL);
+ new GrpcRemoteCache(channel.retain(), creds, remoteOptions, retrier, DIGEST_UTIL, uploader);
client =
new RemoteSpawnRunner(
execRoot,
@@ -281,6 +284,7 @@ public class GrpcRemoteExecutionClientTest {
DIGEST_UTIL,
logDir);
inputDigest = fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz");
+ channel.release();
}
@After