diff options
5 files changed, 86 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 4ee108e2e7..aa8f9b529a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -39,6 +39,7 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StatusException; import java.io.IOException; import java.util.ArrayList; @@ -352,10 +353,10 @@ final class ByteStreamUploader { @Override public void onClose(Status status, Metadata trailers) { - if (!status.isOk()) { - listener.failure(status); - } else { + if (status.isOk() || Code.ALREADY_EXISTS.equals(status.getCode())) { listener.success(); + } else { + listener.failure(status); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java index 1c97f21f60..c637a8dfff 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java @@ -145,15 +145,15 @@ public class GrpcRemoteCache implements RemoteActionCache { ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root)); // Only upload data that was missing from the cache. - ArrayList<ActionInput> actionInputs = new ArrayList<>(); - ArrayList<Directory> treeNodes = new ArrayList<>(); - repository.getDataFromDigests(missingDigests, actionInputs, treeNodes); + ArrayList<ActionInput> missingActionInputs = new ArrayList<>(); + ArrayList<Directory> missingTreeNodes = new ArrayList<>(); + repository.getDataFromDigests(missingDigests, missingActionInputs, missingTreeNodes); - if (!treeNodes.isEmpty()) { + if (!missingTreeNodes.isEmpty()) { // TODO(olaola): split this into multiple requests if total size is > 10MB. BatchUpdateBlobsRequest.Builder treeBlobRequest = BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName); - for (Directory d : treeNodes) { + for (Directory d : missingTreeNodes) { byte[] data = d.toByteArray(); treeBlobRequest .addRequestsBuilder() @@ -173,17 +173,12 @@ public class GrpcRemoteCache implements RemoteActionCache { }); } uploadBlob(command.toByteArray()); - if (!actionInputs.isEmpty()) { + if (!missingActionInputs.isEmpty()) { List<Chunker> inputsToUpload = new ArrayList<>(); ActionInputFileCache inputFileCache = repository.getInputFileCache(); - for (ActionInput actionInput : actionInputs) { - Digest digest = Digests.getDigestFromInputCache(actionInput, inputFileCache); - if (missingDigests.contains(digest)) { - Chunker chunker = new Chunker(actionInput, inputFileCache, execRoot); - inputsToUpload.add(chunker); - } + for (ActionInput actionInput : missingActionInputs) { + inputsToUpload.add(new Chunker(actionInput, inputFileCache, execRoot)); } - uploader.uploadBlobs(inputsToUpload); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java index 81b63fa5f3..c43a09f690 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java @@ -59,6 +59,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -284,6 +285,49 @@ public class GrpcRemoteCacheTest { assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest); } + @Test + public void testUploadBlobRemoteAlreadyExists() throws Exception { + final GrpcRemoteCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + responseObserver.onNext( + FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build()); + responseObserver.onCompleted(); + } + }); + final AtomicBoolean sentError = new AtomicBoolean(false); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver<WriteRequest> write( + final StreamObserver<WriteResponse> responseObserver) { + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest request) { + responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException()); + sentError.set(true); + } + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable t) { + fail("An error occurred: " + t); + } + }; + } + }); + assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest); + assertThat(sentError.get()).isTrue(); + } + static class TestChunkedRequestObserver implements StreamObserver<WriteRequest> { private final StreamObserver<WriteResponse> responseObserver; private final String contents; diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java index acc6685bf2..50f88578de 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java @@ -132,6 +132,25 @@ final class ByteStreamServer extends ByteStreamImplBase { return; } + if (offset == 0) { + try { + if (cache.containsKey(digest)) { + responseObserver.onError(StatusUtils.alreadyExistsError(digest)); + closed = true; + return; + } + } catch (InterruptedException e) { + responseObserver.onError(StatusUtils.interruptedError(digest)); + Thread.currentThread().interrupt(); + closed = true; + return; + } catch (IOException e) { + responseObserver.onError(StatusUtils.internalError(e)); + closed = true; + return; + } + } + if (request.getWriteOffset() != offset) { responseObserver.onError( StatusUtils.invalidArgumentError( diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java index 494ee58453..be606d2bf4 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java @@ -51,6 +51,17 @@ final class StatusUtils { .build(); } + static StatusException alreadyExistsError(Digest digest) { + return StatusProto.toStatusException(alreadyExistsStatus(digest)); + } + + static com.google.rpc.Status alreadyExistsStatus(Digest digest) { + return Status.newBuilder() + .setCode(Code.ALREADY_EXISTS.getNumber()) + .setMessage("Digest already uploaded:" + digest) + .build(); + } + static StatusException interruptedError(Digest digest) { return StatusProto.toStatusException(interruptedStatus(digest)); } |