aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java71
1 files changed, 47 insertions, 24 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
index d44bfcf12e..2cbe3e6575 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
@@ -15,8 +15,10 @@ package com.google.devtools.build.lib.remote;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
@@ -25,11 +27,12 @@ import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.Digest;
import io.grpc.Context;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@@ -38,6 +41,8 @@ import javax.annotation.Nullable;
*/
class ByteStreamBuildEventArtifactUploader implements BuildEventArtifactUploader {
+ private final ListeningExecutorService uploadExecutor =
+ MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
private final Context ctx;
private final ByteStreamUploader uploader;
private final String remoteServerInstanceName;
@@ -63,28 +68,35 @@ class ByteStreamBuildEventArtifactUploader implements BuildEventArtifactUploader
}
List<ListenableFuture<PathDigestPair>> uploads = new ArrayList<>(files.size());
- Context prevCtx = ctx.attach();
- try {
for (Path file : files.keySet()) {
- DigestUtil digestUtil = new DigestUtil(file.getFileSystem().getDigestFunction());
- Digest digest = digestUtil.compute(file);
- Chunker chunker = Chunker.builder(digestUtil).setInput(digest, file).build();
- ListenableFuture<PathDigestPair> upload =
- Futures.transform(
- uploader.uploadBlobAsync(chunker, /*forceUpload=*/false),
- unused -> new PathDigestPair(file, digest),
- MoreExecutors.directExecutor());
- uploads.add(upload);
+ ListenableFuture<Boolean> isDirectoryFuture = uploadExecutor.submit(() -> file.isDirectory());
+ ListenableFuture<PathDigestPair> digestFuture =
+ Futures.transformAsync(
+ isDirectoryFuture,
+ isDirectory -> {
+ if (isDirectory) {
+ return Futures.immediateFuture(new PathDigestPair(file, null));
+ }
+ DigestUtil digestUtil = new DigestUtil(file.getFileSystem().getDigestFunction());
+ Digest digest = digestUtil.compute(file);
+ Chunker chunker = Chunker.builder(digestUtil).setInput(digest, file).build();
+ final ListenableFuture<Void> upload;
+ Context prevCtx = ctx.attach();
+ try {
+ upload = uploader.uploadBlobAsync(chunker, /*forceUpload=*/ false);
+ } finally {
+ ctx.detach(prevCtx);
+ }
+ return Futures.transform(
+ upload, unused -> new PathDigestPair(file, digest), uploadExecutor);
+ },
+ MoreExecutors.directExecutor());
+ uploads.add(digestFuture);
}
-
- return Futures.transform(Futures.allAsList(uploads),
- (uploadsDone) -> new PathConverterImpl(remoteServerInstanceName, uploadsDone),
- MoreExecutors.directExecutor());
- } catch (IOException e) {
- return Futures.immediateFailedFuture(e);
- } finally {
- ctx.detach(prevCtx);
- }
+ return Futures.transform(
+ Futures.allAsList(uploads),
+ pathDigestPairs -> new PathConverterImpl(remoteServerInstanceName, pathDigestPairs),
+ MoreExecutors.directExecutor());
}
@Override
@@ -99,15 +111,23 @@ class ByteStreamBuildEventArtifactUploader implements BuildEventArtifactUploader
private final String remoteServerInstanceName;
private final Map<Path, Digest> pathToDigest;
+ private final Set<Path> skippedPaths;
- PathConverterImpl(String remoteServerInstanceName,
- List<PathDigestPair> uploads) {
+ PathConverterImpl(String remoteServerInstanceName, List<PathDigestPair> uploads) {
Preconditions.checkNotNull(uploads);
this.remoteServerInstanceName = remoteServerInstanceName;
pathToDigest = new HashMap<>(uploads.size());
+ ImmutableSet.Builder<Path> skippedPaths = ImmutableSet.builder();
for (PathDigestPair pair : uploads) {
- pathToDigest.put(pair.getPath(), pair.getDigest());
+ Path path = pair.getPath();
+ Digest digest = pair.getDigest();
+ if (digest != null) {
+ pathToDigest.put(path, digest);
+ } else {
+ skippedPaths.add(path);
+ }
}
+ this.skippedPaths = skippedPaths.build();
}
@Override
@@ -115,6 +135,9 @@ class ByteStreamBuildEventArtifactUploader implements BuildEventArtifactUploader
Preconditions.checkNotNull(path);
Digest digest = pathToDigest.get(path);
if (digest == null) {
+ if (skippedPaths.contains(path)) {
+ return null;
+ }
// It's a programming error to reference a file that has not been uploaded.
throw new IllegalStateException(
String.format("Illegal file reference: '%s'", path.getPathString()));