aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java7
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java21
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java44
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java19
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java11
5 files changed, 86 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 4ee108e2e7..aa8f9b529a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -39,6 +39,7 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.Status;
+import io.grpc.Status.Code;
import io.grpc.StatusException;
import java.io.IOException;
import java.util.ArrayList;
@@ -352,10 +353,10 @@ final class ByteStreamUploader {
@Override
public void onClose(Status status, Metadata trailers) {
- if (!status.isOk()) {
- listener.failure(status);
- } else {
+ if (status.isOk() || Code.ALREADY_EXISTS.equals(status.getCode())) {
listener.success();
+ } else {
+ listener.failure(status);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 1c97f21f60..c637a8dfff 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -145,15 +145,15 @@ public class GrpcRemoteCache implements RemoteActionCache {
ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root));
// Only upload data that was missing from the cache.
- ArrayList<ActionInput> actionInputs = new ArrayList<>();
- ArrayList<Directory> treeNodes = new ArrayList<>();
- repository.getDataFromDigests(missingDigests, actionInputs, treeNodes);
+ ArrayList<ActionInput> missingActionInputs = new ArrayList<>();
+ ArrayList<Directory> missingTreeNodes = new ArrayList<>();
+ repository.getDataFromDigests(missingDigests, missingActionInputs, missingTreeNodes);
- if (!treeNodes.isEmpty()) {
+ if (!missingTreeNodes.isEmpty()) {
// TODO(olaola): split this into multiple requests if total size is > 10MB.
BatchUpdateBlobsRequest.Builder treeBlobRequest =
BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName);
- for (Directory d : treeNodes) {
+ for (Directory d : missingTreeNodes) {
byte[] data = d.toByteArray();
treeBlobRequest
.addRequestsBuilder()
@@ -173,17 +173,12 @@ public class GrpcRemoteCache implements RemoteActionCache {
});
}
uploadBlob(command.toByteArray());
- if (!actionInputs.isEmpty()) {
+ if (!missingActionInputs.isEmpty()) {
List<Chunker> inputsToUpload = new ArrayList<>();
ActionInputFileCache inputFileCache = repository.getInputFileCache();
- for (ActionInput actionInput : actionInputs) {
- Digest digest = Digests.getDigestFromInputCache(actionInput, inputFileCache);
- if (missingDigests.contains(digest)) {
- Chunker chunker = new Chunker(actionInput, inputFileCache, execRoot);
- inputsToUpload.add(chunker);
- }
+ for (ActionInput actionInput : missingActionInputs) {
+ inputsToUpload.add(new Chunker(actionInput, inputFileCache, execRoot));
}
-
uploader.uploadBlobs(inputsToUpload);
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
index 81b63fa5f3..c43a09f690 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
@@ -59,6 +59,7 @@ import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -284,6 +285,49 @@ public class GrpcRemoteCacheTest {
assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
}
+ @Test
+ public void testUploadBlobRemoteAlreadyExists() throws Exception {
+ final GrpcRemoteCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build());
+ responseObserver.onCompleted();
+ }
+ });
+ final AtomicBoolean sentError = new AtomicBoolean(false);
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(
+ final StreamObserver<WriteResponse> responseObserver) {
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest request) {
+ responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
+ sentError.set(true);
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ fail("An error occurred: " + t);
+ }
+ };
+ }
+ });
+ assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
+ assertThat(sentError.get()).isTrue();
+ }
+
static class TestChunkedRequestObserver implements StreamObserver<WriteRequest> {
private final StreamObserver<WriteResponse> responseObserver;
private final String contents;
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
index acc6685bf2..50f88578de 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
@@ -132,6 +132,25 @@ final class ByteStreamServer extends ByteStreamImplBase {
return;
}
+ if (offset == 0) {
+ try {
+ if (cache.containsKey(digest)) {
+ responseObserver.onError(StatusUtils.alreadyExistsError(digest));
+ closed = true;
+ return;
+ }
+ } catch (InterruptedException e) {
+ responseObserver.onError(StatusUtils.interruptedError(digest));
+ Thread.currentThread().interrupt();
+ closed = true;
+ return;
+ } catch (IOException e) {
+ responseObserver.onError(StatusUtils.internalError(e));
+ closed = true;
+ return;
+ }
+ }
+
if (request.getWriteOffset() != offset) {
responseObserver.onError(
StatusUtils.invalidArgumentError(
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java
index 494ee58453..be606d2bf4 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java
@@ -51,6 +51,17 @@ final class StatusUtils {
.build();
}
+ static StatusException alreadyExistsError(Digest digest) {
+ return StatusProto.toStatusException(alreadyExistsStatus(digest));
+ }
+
+ static com.google.rpc.Status alreadyExistsStatus(Digest digest) {
+ return Status.newBuilder()
+ .setCode(Code.ALREADY_EXISTS.getNumber())
+ .setMessage("Digest already uploaded:" + digest)
+ .build();
+ }
+
static StatusException interruptedError(Digest digest) {
return StatusProto.toStatusException(interruptedStatus(digest));
}