aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java108
1 files changed, 101 insertions, 7 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
index 4372932928..1076933fa0 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
@@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
@@ -216,6 +217,83 @@ public final class GrpcActionCache implements RemoteActionCache {
}
}
+ final class BlobChunkActionInputIterator implements BlobChunkIterator {
+ private final Iterator<? extends ActionInput> inputIterator;
+ private final ActionInputFileCache inputCache;
+ private final Path execRoot;
+ private InputStream currentStream;
+ private final Set<ContentDigest> digests;
+ private ContentDigest digest;
+ private long bytesLeft;
+
+ public BlobChunkActionInputIterator(
+ Set<ContentDigest> digests,
+ Path execRoot,
+ Iterator<? extends ActionInput> inputIterator,
+ ActionInputFileCache inputCache)
+ throws IOException {
+ this.digests = digests;
+ this.inputIterator = inputIterator;
+ this.inputCache = inputCache;
+ this.execRoot = execRoot;
+ advanceInput();
+ }
+
+ public BlobChunkActionInputIterator(
+ ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException {
+ inputIterator = Iterators.singletonIterator(input);
+ digests = ImmutableSet.of(ContentDigests.getDigestFromInputCache(input, inputCache));
+ this.inputCache = inputCache;
+ this.execRoot = execRoot;
+ advanceInput();
+ }
+
+ private void advanceInput() throws IOException {
+ do {
+ if (inputIterator != null && inputIterator.hasNext()) {
+ ActionInput input = inputIterator.next();
+ digest = ContentDigests.getDigestFromInputCache(input, inputCache);
+ currentStream = execRoot.getRelative(input.getExecPathString()).getInputStream();
+ bytesLeft = digest.getSizeBytes();
+ } else {
+ digest = null;
+ currentStream = null;
+ bytesLeft = 0;
+ }
+ } while (digest != null && !digests.contains(digest));
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentStream != null;
+ }
+
+ @Override
+ public BlobChunk next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ BlobChunk.Builder chunk = BlobChunk.newBuilder();
+ long offset = digest.getSizeBytes() - bytesLeft;
+ if (offset == 0) {
+ chunk.setDigest(digest);
+ } else {
+ chunk.setOffset(offset);
+ }
+ if (bytesLeft > 0) {
+ byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)];
+ currentStream.read(blob);
+ chunk.setData(ByteString.copyFrom(blob));
+ bytesLeft -= blob.length;
+ }
+ if (bytesLeft == 0) {
+ currentStream.close();
+ advanceInput();
+ }
+ return chunk.build();
+ }
+ }
+
@VisibleForTesting
public GrpcActionCache(ManagedChannel channel, RemoteOptions options) {
this.options = options;
@@ -278,11 +356,10 @@ public final class GrpcActionCache implements RemoteActionCache {
}
}
if (!actionInputs.isEmpty()) {
- ArrayList<Path> paths = new ArrayList<>();
- for (ActionInput actionInput : actionInputs) {
- paths.add(execRoot.getRelative(actionInput.getExecPathString()));
- }
- uploadChunks(paths.size(), new BlobChunkFileIterator(missingDigests, paths.iterator()));
+ uploadChunks(
+ actionInputs.size(),
+ new BlobChunkActionInputIterator(
+ missingDigests, execRoot, actionInputs.iterator(), repository.getInputFileCache()));
}
}
@@ -412,8 +489,7 @@ public final class GrpcActionCache implements RemoteActionCache {
/**
* Put the file contents cache if it is not already in it. No-op if the file is already stored in
- * cache. The given path must be a full absolute path. Note: this is horribly inefficient, need to
- * patch through an overload that uses an ActionInputFile cache to compute the digests!
+ * cache. The given path must be a full absolute path.
*
* @return The key for fetching the file contents blob from cache.
*/
@@ -428,6 +504,24 @@ public final class GrpcActionCache implements RemoteActionCache {
}
/**
+ * Put the file contents cache if it is not already in it. No-op if the file is already stored in
+ * cache. The given path must be a full absolute path.
+ *
+ * @return The key for fetching the file contents blob from cache.
+ */
+ @Override
+ public ContentDigest uploadFileContents(
+ ActionInput input, Path execRoot, ActionInputFileCache inputCache)
+ throws IOException, InterruptedException {
+ ContentDigest digest = ContentDigests.getDigestFromInputCache(input, inputCache);
+ ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
+ if (!missing.isEmpty()) {
+ uploadChunks(1, new BlobChunkActionInputIterator(input, execRoot, inputCache));
+ }
+ return digest;
+ }
+
+ /**
* Download a blob keyed by the given digest and write it to the specified path. Set the
* executable parameter to the specified value.
*/