aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java32
1 files changed, 18 insertions, 14 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 48bd4c8845..253ab41d83 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -49,7 +49,6 @@ import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import io.grpc.CallCredentials;
-import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
@@ -61,30 +60,31 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
public class GrpcRemoteCache extends AbstractRemoteActionCache {
private final CallCredentials credentials;
- private final Channel channel;
+ private final ReferenceCountedChannel channel;
private final RemoteRetrier retrier;
private final ByteStreamUploader uploader;
+ private AtomicBoolean closed = new AtomicBoolean();
+
@VisibleForTesting
public GrpcRemoteCache(
- Channel channel,
+ ReferenceCountedChannel channel,
CallCredentials credentials,
RemoteOptions options,
RemoteRetrier retrier,
- DigestUtil digestUtil) {
+ DigestUtil digestUtil,
+ ByteStreamUploader uploader) {
super(options, digestUtil, retrier);
this.credentials = credentials;
this.channel = channel;
this.retrier = retrier;
-
- uploader =
- new ByteStreamUploader(
- options.remoteInstanceName, channel, credentials, options.remoteTimeout, retrier);
+ this.uploader = uploader;
}
private ContentAddressableStorageBlockingStub casBlockingStub() {
@@ -110,7 +110,11 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
@Override
public void close() {
- uploader.shutdown();
+ if (closed.getAndSet(true)) {
+ return;
+ }
+ uploader.release();
+ channel.release();
}
public static boolean isRemoteCacheOptions(RemoteOptions options) {
@@ -168,7 +172,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
toUpload.add(new Chunker(actionInput, inputFileCache, execRoot, digestUtil));
}
}
- uploader.uploadBlobs(toUpload);
+ uploader.uploadBlobs(toUpload, true);
}
@Override
@@ -293,7 +297,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
}
if (!filesToUpload.isEmpty()) {
- uploader.uploadBlobs(filesToUpload);
+ uploader.uploadBlobs(filesToUpload, /*forceUpload=*/true);
}
// TODO(olaola): inline small stdout/stderr here.
@@ -317,7 +321,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
Digest digest = digestUtil.compute(file);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploader.uploadBlob(new Chunker(file));
+ uploader.uploadBlob(new Chunker(file), true);
}
return digest;
}
@@ -333,7 +337,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
Digest digest = DigestUtil.getFromInputCache(input, inputCache);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploader.uploadBlob(new Chunker(input, inputCache, execRoot, digestUtil));
+ uploader.uploadBlob(new Chunker(input, inputCache, execRoot, digestUtil), true);
}
return digest;
}
@@ -342,7 +346,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
Digest digest = digestUtil.compute(blob);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploader.uploadBlob(new Chunker(blob, digestUtil));
+ uploader.uploadBlob(new Chunker(blob, digestUtil), true);
}
return digest;
}