diff options
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java | 37 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java | 98 |
2 files changed, 80 insertions, 55 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java index 6a6e0bee9a..1c97f21f60 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java @@ -58,8 +58,10 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -320,9 +322,7 @@ public class GrpcRemoteCache implements RemoteActionCache { void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result) throws IOException, InterruptedException { - ArrayList<Digest> digests = new ArrayList<>(); - ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests); - List<Chunker> filesToUpload = new ArrayList<>(digestsToUpload.size()); + Map<Digest, Path> digestToFile = new HashMap<>(); for (Path file : files) { if (!file.exists()) { // We ignore requested results that have not been generated by the action. @@ -335,28 +335,29 @@ public class GrpcRemoteCache implements RemoteActionCache { } Digest digest = Digests.computeDigest(file); - digests.add(digest); + // TODO(olaola): inline small results here. + result + .addOutputFilesBuilder() + .setPath(file.relativeTo(execRoot).getPathString()) + .setDigest(digest) + .setIsExecutable(file.isExecutable()); + digestToFile.put(digest, file); + } - if (digestsToUpload.contains(digest)) { - Chunker chunker = new Chunker(file); - filesToUpload.add(chunker); + ImmutableSet<Digest> digestsToUpload = getMissingDigests(digestToFile.keySet()); + List<Chunker> filesToUpload = new ArrayList<>(); + for (Digest digest : digestsToUpload) { + Path file = digestToFile.get(digest); + if (file == null) { + String message = "FindMissingBlobs call returned an unknown digest: " + digest; + throw new IOException(message); } + filesToUpload.add(new Chunker(file)); } - if (!filesToUpload.isEmpty()) { uploader.uploadBlobs(filesToUpload); } - int index = 0; - for (Path file : files) { - // Add to protobuf. - // TODO(olaola): inline small results here. - result - .addOutputFilesBuilder() - .setPath(file.relativeTo(execRoot).getPathString()) - .setDigest(digests.get(index++)) - .setIsExecutable(file.isExecutable()); - } // TODO(olaola): inline small stdout/stderr here. if (outErr.getErrorPath().exists()) { Digest stderr = uploadFileContents(outErr.getErrorPath()); diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java index d0d1d0bd57..81b63fa5f3 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java @@ -347,30 +347,6 @@ public class GrpcRemoteCacheTest { }; } - private Answer<StreamObserver<WriteRequest>> blobChunkedWriteAnswerError() { - return new Answer<StreamObserver<WriteRequest>>() { - @Override - @SuppressWarnings("unchecked") - public StreamObserver<WriteRequest> answer(final InvocationOnMock invocation) { - return new StreamObserver<WriteRequest>() { - @Override - public void onNext(WriteRequest request) { - ((StreamObserver<WriteResponse>) invocation.getArguments()[0]) - .onError(Status.UNAVAILABLE.asRuntimeException()); - } - - @Override - public void onCompleted() {} - - @Override - public void onError(Throwable t) { - fail("An unexpected client-side error occurred: " + t); - } - }; - } - }; - } - @Test public void testUploadBlobMultipleChunks() throws Exception { final Digest digest = Digests.computeDigestUtf8("abcdef"); @@ -395,6 +371,8 @@ public class GrpcRemoteCacheTest { .thenAnswer(blobChunkedWriteAnswer("abcdef", chunkSize)); assertThat(client.uploadBlob("abcdef".getBytes(UTF_8))).isEqualTo(digest); } + Mockito.verify(mockByteStreamImpl, Mockito.times(6)) + .write(Mockito.<StreamObserver<WriteResponse>>anyObject()); } @Test @@ -413,12 +391,7 @@ public class GrpcRemoteCacheTest { public void findMissingBlobs( FindMissingBlobsRequest request, StreamObserver<FindMissingBlobsResponse> responseObserver) { - assertThat(request) - .isEqualTo( - FindMissingBlobsRequest.newBuilder() - .addBlobDigests(fooDigest) - .addBlobDigests(barDigest) - .build()); + assertThat(request.getBlobDigestsList()).containsExactly(fooDigest, barDigest); // Nothing is missing. responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); responseObserver.onCompleted(); @@ -460,7 +433,13 @@ public class GrpcRemoteCacheTest { FindMissingBlobsRequest request, StreamObserver<FindMissingBlobsResponse> responseObserver) { if (numErrors-- <= 0) { - responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + // Everything is missing. + responseObserver.onNext( + FindMissingBlobsResponse.newBuilder() + .addMissingBlobDigests(fooDigest) + .addMissingBlobDigests(barDigest) + .addMissingBlobDigests(bazDigest) + .build()); responseObserver.onCompleted(); } else { responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); @@ -496,14 +475,59 @@ public class GrpcRemoteCacheTest { ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); serviceRegistry.addService(mockByteStreamImpl); when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject())) - .thenAnswer(blobChunkedWriteAnswerError()) // Error out for foo. - .thenAnswer(blobChunkedWriteAnswer("x", 1)) // Upload bar successfully. - .thenAnswer(blobChunkedWriteAnswerError()) // Error out for baz. - .thenAnswer(blobChunkedWriteAnswer("xyz", 3)) // Retry foo successfully. - .thenAnswer(blobChunkedWriteAnswerError()) // Error out for baz again. - .thenAnswer(blobChunkedWriteAnswer("z", 1)); // Retry baz successfully. + .thenAnswer( + new Answer<StreamObserver<WriteRequest>>() { + private int numErrors = 4; + @Override + @SuppressWarnings("unchecked") + public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) { + StreamObserver<WriteResponse> responseObserver = + (StreamObserver<WriteResponse>) invocation.getArguments()[0]; + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest request) { + numErrors--; + if (numErrors >= 0) { + responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); + return; + } + assertThat(request.getFinishWrite()).isTrue(); + String resourceName = request.getResourceName(); + String dataStr = request.getData().toStringUtf8(); + int size = 0; + if (resourceName.contains(fooDigest.getHash())) { + assertThat(dataStr).isEqualTo("xyz"); + size = 3; + } else if (resourceName.contains(barDigest.getHash())) { + assertThat(dataStr).isEqualTo("x"); + size = 1; + } else if (resourceName.contains(bazDigest.getHash())) { + assertThat(dataStr).isEqualTo("z"); + size = 1; + } else { + fail("Unexpected resource name in upload: " + resourceName); + } + responseObserver.onNext( + WriteResponse.newBuilder().setCommittedSize(size).build()); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + fail("An error occurred: " + t); + } + }; + } + }); client.upload(actionKey, execRoot, ImmutableList.<Path>of(fooFile, barFile, bazFile), outErr); + // 4 times for the errors, 3 times for the successful uploads. + Mockito.verify(mockByteStreamImpl, Mockito.times(7)) + .write(Mockito.<StreamObserver<WriteResponse>>anyObject()); } @Test |