From fc5cbbeaa853800868e8c604233d85f632a16f89 Mon Sep 17 00:00:00 2001 From: ulfjack Date: Tue, 4 Jul 2017 04:44:38 -0400 Subject: Rename GrpcActionCache to GrpcRemoteCache PiperOrigin-RevId: 160872755 --- .../build/lib/remote/GrpcRemoteCacheTest.java | 527 +++++++++++++++++++++ 1 file changed, 527 insertions(+) create mode 100644 src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java (limited to 'src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java') diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java new file mode 100644 index 0000000000..9d65503b48 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java @@ -0,0 +1,527 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +import com.google.api.client.json.GenericJson; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; +import com.google.bytestream.ByteStreamProto.ReadRequest; +import com.google.bytestream.ByteStreamProto.ReadResponse; +import com.google.bytestream.ByteStreamProto.WriteRequest; +import com.google.bytestream.ByteStreamProto.WriteResponse; +import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.actions.ActionInputHelper; +import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.remote.Digests.ActionKey; +import com.google.devtools.build.lib.testutil.Scratch; +import com.google.devtools.build.lib.util.io.FileOutErr; +import com.google.devtools.build.lib.vfs.FileSystem; +import com.google.devtools.build.lib.vfs.FileSystem.HashFunction; +import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; +import com.google.devtools.common.options.Options; +import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; +import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; +import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest; +import com.google.protobuf.ByteString; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.util.MutableHandlerRegistry; +import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** Tests for {@link GrpcRemoteCache}. */ +@RunWith(JUnit4.class) +public class GrpcRemoteCacheTest { + private FileSystem fs; + private Path execRoot; + private FileOutErr outErr; + private FakeActionInputFileCache fakeFileCache; + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private final String fakeServerName = "fake server for " + getClass(); + private Server fakeServer; + + @Before + public final void setUp() throws Exception { + FileSystem.setDigestFunctionForTesting(HashFunction.SHA1); + // Use a mutable service registry for later registering the service impl for each test case. + fakeServer = + InProcessServerBuilder.forName(fakeServerName) + .fallbackHandlerRegistry(serviceRegistry) + .directExecutor() + .build() + .start(); + Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk. + fs = new InMemoryFileSystem(); + execRoot = fs.getPath("/exec/root"); + FileSystemUtils.createDirectoryAndParents(execRoot); + fakeFileCache = new FakeActionInputFileCache(execRoot); + + Path stdout = fs.getPath("/tmp/stdout"); + Path stderr = fs.getPath("/tmp/stderr"); + FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory()); + FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory()); + outErr = new FileOutErr(stdout, stderr); + } + + @After + public void tearDown() throws Exception { + fakeServer.shutdownNow(); + } + + private static class ChannelOptionsInterceptor implements ClientInterceptor { + private final ChannelOptions channelOptions; + + public ChannelOptionsInterceptor(ChannelOptions channelOptions) { + this.channelOptions = channelOptions; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + assertThat(callOptions.getCredentials()).isEqualTo(channelOptions.getCallCredentials()); + // Remove the call credentials to allow testing with dummy ones. + return next.newCall(method, callOptions.withCallCredentials(null)); + } + } + + private GrpcRemoteCache newClient() throws IOException { + AuthAndTLSOptions authTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + authTlsOptions.authEnabled = true; + authTlsOptions.authCredentials = "/exec/root/creds.json"; + authTlsOptions.authScope = "dummy.scope"; + + GenericJson json = new GenericJson(); + json.put("type", "authorized_user"); + json.put("client_id", "some_client"); + json.put("client_secret", "foo"); + json.put("refresh_token", "bar"); + Scratch scratch = new Scratch(); + scratch.file(authTlsOptions.authCredentials, new JacksonFactory().toString(json)); + + ChannelOptions channelOptions = + ChannelOptions.create( + authTlsOptions, scratch.resolve(authTlsOptions.authCredentials).getInputStream()); + return new GrpcRemoteCache( + ClientInterceptors.intercept( + InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), + ImmutableList.of(new ChannelOptionsInterceptor(channelOptions))), + channelOptions, + Options.getDefaults(RemoteOptions.class)); + } + + @Test + public void testDownloadEmptyBlob() throws Exception { + GrpcRemoteCache client = newClient(); + Digest emptyDigest = Digests.computeDigest(new byte[0]); + // Will not call the mock Bytestream interface at all. + assertThat(client.downloadBlob(emptyDigest)).isEmpty(); + } + + @Test + public void testDownloadBlobSingleChunk() throws Exception { + final GrpcRemoteCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public void read(ReadRequest request, StreamObserver responseObserver) { + assertThat(request.getResourceName().contains(digest.getHash())).isTrue(); + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abcdefg")).build()); + responseObserver.onCompleted(); + } + }); + assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg"); + } + + @Test + public void testDownloadBlobMultipleChunks() throws Exception { + final GrpcRemoteCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public void read(ReadRequest request, StreamObserver responseObserver) { + assertThat(request.getResourceName().contains(digest.getHash())).isTrue(); + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()); + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build()); + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("g")).build()); + responseObserver.onCompleted(); + } + }); + assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg"); + } + + @Test + public void testDownloadAllResults() throws Exception { + GrpcRemoteCache client = newClient(); + Digest fooDigest = Digests.computeDigestUtf8("foo-contents"); + Digest barDigest = Digests.computeDigestUtf8("bar-contents"); + Digest emptyDigest = Digests.computeDigest(new byte[0]); + serviceRegistry.addService( + new FakeImmutableCacheByteStreamImpl(fooDigest, "foo-contents", barDigest, "bar-contents")); + + ActionResult.Builder result = ActionResult.newBuilder(); + result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); + result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest); + result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true); + client.download(result.build(), execRoot, null); + assertThat(Digests.computeDigest(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest); + assertThat(Digests.computeDigest(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest); + assertThat(Digests.computeDigest(execRoot.getRelative("a/bar"))).isEqualTo(barDigest); + assertThat(execRoot.getRelative("a/bar").isExecutable()).isTrue(); + } + + @Test + public void testUploadBlobCacheHitWithRetries() throws Exception { + final GrpcRemoteCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + private int numErrors = 4; + + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver responseObserver) { + if (numErrors-- <= 0) { + responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + } + } + }); + assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest); + } + + @Test + public void testUploadBlobSingleChunk() throws Exception { + final GrpcRemoteCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build()); + responseObserver.onCompleted(); + } + }); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write( + final StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(WriteRequest request) { + assertThat(request.getResourceName()).contains(digest.getHash()); + assertThat(request.getFinishWrite()).isTrue(); + assertThat(request.getData().toStringUtf8()).isEqualTo("abcdefg"); + } + + @Override + public void onCompleted() { + responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(7).build()); + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + fail("An error occurred: " + t); + } + }; + } + }); + assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest); + } + + static class TestChunkedRequestObserver implements StreamObserver { + private final StreamObserver responseObserver; + private final String contents; + private Chunker chunker; + + public TestChunkedRequestObserver( + StreamObserver responseObserver, String contents, int chunkSizeBytes) { + this.responseObserver = responseObserver; + this.contents = contents; + try { + chunker = + new Chunker.Builder() + .chunkSize(chunkSizeBytes) + .addInput(contents.getBytes(UTF_8)) + .build(); + } catch (IOException e) { + fail("An error occurred:" + e); + } + } + + @Override + public void onNext(WriteRequest request) { + assertThat(chunker.hasNext()).isTrue(); + try { + Chunker.Chunk chunk = chunker.next(); + Digest digest = chunk.getDigest(); + long offset = chunk.getOffset(); + byte[] data = chunk.getData(); + if (offset == 0) { + assertThat(request.getResourceName()).contains(digest.getHash()); + } else { + assertThat(request.getResourceName()).isEmpty(); + } + assertThat(request.getFinishWrite()) + .isEqualTo(offset + data.length == digest.getSizeBytes()); + assertThat(request.getData().toByteArray()).isEqualTo(data); + } catch (IOException e) { + fail("An error occurred:" + e); + } + } + + @Override + public void onCompleted() { + assertThat(chunker.hasNext()).isFalse(); + responseObserver.onNext( + WriteResponse.newBuilder().setCommittedSize(contents.length()).build()); + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + fail("An error occurred: " + t); + } + } + + private Answer> blobChunkedWriteAnswer( + final String contents, final int chunkSize) { + return new Answer>() { + @Override + @SuppressWarnings("unchecked") + public StreamObserver answer(InvocationOnMock invocation) { + return new TestChunkedRequestObserver( + (StreamObserver) invocation.getArguments()[0], contents, chunkSize); + } + }; + } + + private Answer> blobChunkedWriteAnswerError() { + return new Answer>() { + @Override + @SuppressWarnings("unchecked") + public StreamObserver answer(final InvocationOnMock invocation) { + return new StreamObserver() { + @Override + public void onNext(WriteRequest request) { + ((StreamObserver) invocation.getArguments()[0]) + .onError(Status.UNAVAILABLE.asRuntimeException()); + } + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) { + fail("An unexpected client-side error occurred: " + t); + } + }; + } + }; + } + + @Test + public void testUploadBlobMultipleChunks() throws Exception { + final Digest digest = Digests.computeDigestUtf8("abcdef"); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build()); + responseObserver.onCompleted(); + } + }); + + ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); + serviceRegistry.addService(mockByteStreamImpl); + for (int chunkSize = 1; chunkSize <= 6; ++chunkSize) { + GrpcRemoteCache client = newClient(); + Chunker.setDefaultChunkSizeForTesting(chunkSize); + when(mockByteStreamImpl.write(Mockito.>anyObject())) + .thenAnswer(blobChunkedWriteAnswer("abcdef", chunkSize)); + assertThat(client.uploadBlob("abcdef".getBytes(UTF_8))).isEqualTo(digest); + } + } + + @Test + public void testUploadCacheHits() throws Exception { + final GrpcRemoteCache client = newClient(); + final Digest fooDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); + final Digest barDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x"); + final Path fooFile = execRoot.getRelative("a/foo"); + final Path barFile = execRoot.getRelative("bar"); + barFile.setExecutable(true); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver responseObserver) { + assertThat(request) + .isEqualTo( + FindMissingBlobsRequest.newBuilder() + .addBlobDigests(fooDigest) + .addBlobDigests(barDigest) + .build()); + // Nothing is missing. + responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + }); + + ActionResult.Builder result = ActionResult.newBuilder(); + client.upload(execRoot, ImmutableList.of(fooFile, barFile), outErr, result); + ActionResult.Builder expectedResult = ActionResult.newBuilder(); + expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); + expectedResult + .addOutputFilesBuilder() + .setPath("bar") + .setDigest(barDigest) + .setIsExecutable(true); + assertThat(result.build()).isEqualTo(expectedResult.build()); + } + + @Test + public void testUploadCacheMissesWithRetries() throws Exception { + final GrpcRemoteCache client = newClient(); + final Digest fooDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); + final Digest barDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x"); + final Digest bazDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("baz"), "z"); + final Path fooFile = execRoot.getRelative("a/foo"); + final Path barFile = execRoot.getRelative("bar"); + final Path bazFile = execRoot.getRelative("baz"); + ActionKey actionKey = Digests.unsafeActionKeyFromDigest(fooDigest); // Could be any key. + barFile.setExecutable(true); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + private int numErrors = 4; + + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver responseObserver) { + if (numErrors-- <= 0) { + responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + } + } + }); + ActionResult.Builder rb = ActionResult.newBuilder(); + rb.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); + rb.addOutputFilesBuilder().setPath("bar").setDigest(barDigest).setIsExecutable(true); + rb.addOutputFilesBuilder().setPath("baz").setDigest(bazDigest); + ActionResult result = rb.build(); + serviceRegistry.addService( + new ActionCacheImplBase() { + private int numErrors = 4; + + @Override + public void updateActionResult( + UpdateActionResultRequest request, StreamObserver responseObserver) { + assertThat(request) + .isEqualTo( + UpdateActionResultRequest.newBuilder() + .setActionDigest(fooDigest) + .setActionResult(result) + .build()); + if (numErrors-- <= 0) { + responseObserver.onNext(result); + responseObserver.onCompleted(); + } else { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + } + } + }); + ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); + serviceRegistry.addService(mockByteStreamImpl); + when(mockByteStreamImpl.write(Mockito.>anyObject())) + .thenAnswer(blobChunkedWriteAnswerError()) // Error out for foo. + .thenAnswer(blobChunkedWriteAnswer("x", 1)) // Upload bar successfully. + .thenAnswer(blobChunkedWriteAnswerError()) // Error out for baz. + .thenAnswer(blobChunkedWriteAnswer("xyz", 3)) // Retry foo successfully. + .thenAnswer(blobChunkedWriteAnswerError()) // Error out for baz again. + .thenAnswer(blobChunkedWriteAnswer("z", 1)); // Retry baz successfully. + + client.upload(actionKey, execRoot, ImmutableList.of(fooFile, barFile, bazFile), outErr); + } + + @Test + public void testGetCachedActionResultWithRetries() throws Exception { + final GrpcRemoteCache client = newClient(); + ActionKey actionKey = Digests.unsafeActionKeyFromDigest(Digests.computeDigestUtf8("key")); + serviceRegistry.addService( + new ActionCacheImplBase() { + private int numErrors = 4; + + @Override + public void getActionResult( + GetActionResultRequest request, StreamObserver responseObserver) { + responseObserver.onError( + (numErrors-- <= 0 ? Status.NOT_FOUND : Status.UNAVAILABLE).asRuntimeException()); + } + }); + assertThat(client.getCachedActionResult(actionKey)).isNull(); + } +} -- cgit v1.2.3