aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java14
-rw-r--r--src/main/java/com/google/devtools/build/lib/exec/SpawnResult.java79
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java78
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java43
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java27
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java129
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java (renamed from src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java)37
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java7
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java220
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java7
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java21
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java3
-rw-r--r--src/test/java/com/google/devtools/build/lib/BUILD2
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java502
15 files changed, 1107 insertions, 91 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java b/src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java
index 8d8c6d1e0e..f32d238243 100644
--- a/src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java
+++ b/src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java
@@ -196,12 +196,24 @@ public class SpawnInputExpander {
Spawn spawn, ArtifactExpander artifactExpander, ActionInputFileCache actionInputFileCache,
FilesetActionContext filesetContext)
throws IOException {
+ return getInputMapping(
+ spawn, artifactExpander, actionInputFileCache, filesetContext.getWorkspaceName());
+ }
+
+ /**
+ * Convert the inputs of the given spawn to a map from exec-root relative paths to action inputs.
+ * In some cases, this generates empty files, for which it uses {@code null}.
+ */
+ public SortedMap<PathFragment, ActionInput> getInputMapping(
+ Spawn spawn, ArtifactExpander artifactExpander, ActionInputFileCache actionInputFileCache,
+ String workspaceName)
+ throws IOException {
TreeMap<PathFragment, ActionInput> inputMap = new TreeMap<>();
addInputs(inputMap, spawn, artifactExpander);
addRunfilesToInputs(
inputMap, spawn.getRunfilesSupplier(), actionInputFileCache);
for (Artifact manifest : spawn.getFilesetManifests()) {
- parseFilesetManifest(inputMap, manifest, filesetContext.getWorkspaceName());
+ parseFilesetManifest(inputMap, manifest, workspaceName);
}
return inputMap;
}
diff --git a/src/main/java/com/google/devtools/build/lib/exec/SpawnResult.java b/src/main/java/com/google/devtools/build/lib/exec/SpawnResult.java
new file mode 100644
index 0000000000..3951e72d05
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/exec/SpawnResult.java
@@ -0,0 +1,79 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.exec;
+
+import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+
+/**
+ * The result of a spawn execution.
+ */
+public interface SpawnResult {
+ /**
+ * Returns whether the spawn was actually run, regardless of the exit code. Returns false if there
+ * were network errors, missing local files, errors setting up sandboxing, etc.
+ */
+ boolean setupSuccess();
+
+ /**
+ * The exit code of the subprocess.
+ */
+ int exitCode();
+
+ /**
+ * Basic implementation of {@link SpawnResult}.
+ */
+ @Immutable @ThreadSafe
+ public static final class SimpleSpawnResult implements SpawnResult {
+ private final boolean setupSuccess;
+ private final int exitCode;
+
+ SimpleSpawnResult(Builder builder) {
+ this.setupSuccess = builder.setupSuccess;
+ this.exitCode = builder.exitCode;
+ }
+
+ @Override
+ public boolean setupSuccess() {
+ return setupSuccess;
+ }
+
+ @Override
+ public int exitCode() {
+ return exitCode;
+ }
+ }
+
+ /**
+ * Builder class for {@link SpawnResult}.
+ */
+ public static final class Builder {
+ private boolean setupSuccess;
+ private int exitCode;
+
+ public SpawnResult build() {
+ return new SimpleSpawnResult(this);
+ }
+
+ public Builder setSetupSuccess(boolean setupSuccess) {
+ this.setupSuccess = setupSuccess;
+ return this;
+ }
+
+ public Builder setExitCode(int exitCode) {
+ this.exitCode = exitCode;
+ return this;
+ }
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
index 1076933fa0..2d493e12ea 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
@@ -22,10 +22,7 @@ import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
@@ -73,15 +70,14 @@ import java.util.concurrent.atomic.AtomicReference;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
-public final class GrpcActionCache implements RemoteActionCache {
+public class GrpcActionCache implements RemoteActionCache {
+ private static final int MAX_MEMORY_KBYTES = 512 * 1024;
/** Channel over which to send gRPC CAS queries. */
- private final ManagedChannel channel;
-
+ private final GrpcCasInterface casIface;
+ private final GrpcExecutionCacheInterface iface;
private final RemoteOptions options;
- private static final int MAX_MEMORY_KBYTES = 512 * 1024;
-
/** Reads from multiple sequential inputs and chunks the data into BlobChunks. */
static interface BlobChunkIterator {
boolean hasNext();
@@ -204,7 +200,7 @@ public final class GrpcActionCache implements RemoteActionCache {
chunk.setOffset(offset);
}
if (bytesLeft > 0) {
- byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)];
+ byte[] blob = new byte[(int) Math.min(bytesLeft, options.grpcMaxChunkSizeBytes)];
currentStream.read(blob);
chunk.setData(ByteString.copyFrom(blob));
bytesLeft -= blob.length;
@@ -294,36 +290,34 @@ public final class GrpcActionCache implements RemoteActionCache {
}
}
+ public GrpcActionCache(
+ RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) {
+ this.options = options;
+ this.casIface = casIface;
+ this.iface = iface;
+ }
+
@VisibleForTesting
public GrpcActionCache(ManagedChannel channel, RemoteOptions options) {
this.options = options;
- this.channel = channel;
+ this.casIface = GrpcInterfaces.casInterface(options.grpcTimeoutSeconds, channel);
+ this.iface = GrpcInterfaces.executionCacheInterface(options.grpcTimeoutSeconds, channel);
}
public GrpcActionCache(RemoteOptions options) throws InvalidConfigurationException {
- this(RemoteUtils.createChannel(options.remoteCache), options);
+ this(RemoteUtils.createChannelLegacy(options.remoteCache), options);
}
public static boolean isRemoteCacheOptions(RemoteOptions options) {
return options.remoteCache != null;
}
- private CasServiceBlockingStub getBlockingStub() {
- return CasServiceGrpc.newBlockingStub(channel)
- .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- private CasServiceStub getStub() {
- return CasServiceGrpc.newStub(channel)
- .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) {
CasLookupRequest.Builder request = CasLookupRequest.newBuilder().addAllDigest(digests);
if (request.getDigestCount() == 0) {
return ImmutableSet.of();
}
- CasStatus status = getBlockingStub().lookup(request.build()).getStatus();
+ CasStatus status = casIface.lookup(request.build()).getStatus();
if (!status.getSucceeded() && status.getError() != CasStatus.ErrorCode.MISSING_DIGEST) {
// TODO(olaola): here and below, add basic retry logic on transient errors!
throw new RuntimeException(status.getErrorDetail());
@@ -350,7 +344,7 @@ public final class GrpcActionCache implements RemoteActionCache {
if (!treeNodes.isEmpty()) {
CasUploadTreeMetadataRequest.Builder metaRequest =
CasUploadTreeMetadataRequest.newBuilder().addAllTreeNode(treeNodes);
- CasUploadTreeMetadataReply reply = getBlockingStub().uploadTreeMetadata(metaRequest.build());
+ CasUploadTreeMetadataReply reply = casIface.uploadTreeMetadata(metaRequest.build());
if (!reply.getStatus().getSucceeded()) {
throw new RuntimeException(reply.getStatus().getErrorDetail());
}
@@ -389,6 +383,9 @@ public final class GrpcActionCache implements RemoteActionCache {
@Override
public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
+ if (result.getOutputList().isEmpty()) {
+ return;
+ }
// Send all the file requests in a single synchronous batch.
// TODO(olaola): profile to maybe replace with separate concurrent requests.
CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
@@ -410,7 +407,7 @@ public final class GrpcActionCache implements RemoteActionCache {
downloadTree(output.getDigest(), path);
}
}
- Iterator<CasDownloadReply> replies = getBlockingStub().downloadBlob(request.build());
+ Iterator<CasDownloadReply> replies = casIface.downloadBlob(request.build());
Set<ContentDigest> results = new HashSet<>();
while (replies.hasNext()) {
results.add(createFileFromStream(metadataMap, replies));
@@ -521,24 +518,6 @@ public final class GrpcActionCache implements RemoteActionCache {
return digest;
}
- /**
- * Download a blob keyed by the given digest and write it to the specified path. Set the
- * executable parameter to the specified value.
- */
- @Override
- public void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
- throws IOException, CacheNotFoundException {
- // Send all the file requests in a single synchronous batch.
- // TODO(olaola): profile to maybe replace with separate concurrent requests.
- CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder().addDigest(digest);
- Iterator<CasDownloadReply> replies = getBlockingStub().downloadBlob(request.build());
- FileMetadata fileMetadata =
- FileMetadata.newBuilder().setDigest(digest).setExecutable(executable).build();
- Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap = new HashMap<>();
- metadataMap.put(digest, Pair.of(dest, fileMetadata));
- createFileFromStream(metadataMap, replies);
- }
-
static class UploadBlobReplyStreamObserver implements StreamObserver<CasUploadBlobReply> {
private final CountDownLatch finishLatch;
private final AtomicReference<RuntimeException> exception;
@@ -579,7 +558,6 @@ public final class GrpcActionCache implements RemoteActionCache {
int currentBatchBytes = 0;
int batchedInputs = 0;
int batches = 0;
- CasServiceStub stub = getStub();
try {
while (blobs.hasNext()) {
BlobChunk chunk = blobs.next();
@@ -596,7 +574,7 @@ public final class GrpcActionCache implements RemoteActionCache {
}
batches++;
responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception);
- requestObserver = stub.uploadBlob(responseObserver);
+ requestObserver = casIface.uploadBlobAsync(responseObserver);
}
batchedInputs++;
}
@@ -682,7 +660,7 @@ public final class GrpcActionCache implements RemoteActionCache {
Map<ContentDigest, byte[]> results = new HashMap<>();
int digestCount = request.getDigestCount();
if (digestCount > 0) {
- replies = getBlockingStub().downloadBlob(request.build());
+ replies = casIface.downloadBlob(request.build());
while (digestCount-- > 0) {
Preconditions.checkArgument(replies.hasNext());
CasDownloadReply reply = replies.next();
@@ -733,12 +711,9 @@ public final class GrpcActionCache implements RemoteActionCache {
/** Returns a cached result for a given Action digest, or null if not found in cache. */
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
- ExecutionCacheServiceBlockingStub stub =
- ExecutionCacheServiceGrpc.newBlockingStub(channel)
- .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
ExecutionCacheRequest request =
ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build();
- ExecutionCacheReply reply = stub.getCachedResult(request);
+ ExecutionCacheReply reply = iface.getCachedResult(request);
ExecutionCacheStatus status = reply.getStatus();
if (!status.getSucceeded()
&& status.getError() != ExecutionCacheStatus.ErrorCode.MISSING_RESULT) {
@@ -751,15 +726,12 @@ public final class GrpcActionCache implements RemoteActionCache {
@Override
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
- ExecutionCacheServiceBlockingStub stub =
- ExecutionCacheServiceGrpc.newBlockingStub(channel)
- .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
ExecutionCacheSetRequest request =
ExecutionCacheSetRequest.newBuilder()
.setActionDigest(actionKey.getDigest())
.setResult(result)
.build();
- ExecutionCacheSetReply reply = stub.setCachedResult(request);
+ ExecutionCacheSetReply reply = iface.setCachedResult(request);
ExecutionCacheStatus status = reply.getStatus();
if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.UNSUPPORTED) {
throw new RuntimeException(status.getErrorDetail());
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java
new file mode 100644
index 0000000000..529ff9c3db
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java
@@ -0,0 +1,43 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
+import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
+import io.grpc.stub.StreamObserver;
+import java.util.Iterator;
+
+/**
+ * An abstraction layer between the remote execution client and gRPC to support unit testing. This
+ * interface covers the CAS RPC methods, see {@link CasServiceBlockingStub} and
+ * {@link CasServiceStub}.
+ */
+public interface GrpcCasInterface {
+ CasLookupReply lookup(CasLookupRequest request);
+ CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request);
+ CasDownloadTreeMetadataReply downloadTreeMetadata(CasDownloadTreeMetadataRequest request);
+ Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request);
+ StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
+ StreamObserver<CasUploadBlobReply> responseObserver);
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java
new file mode 100644
index 0000000000..375c2ab359
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java
@@ -0,0 +1,29 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
+
+/**
+ * An abstraction layer between the remote execution client and gRPC to support unit testing. This
+ * interface covers the execution cache RPC methods, see {@link ExecutionCacheServiceBlockingStub}.
+ */
+public interface GrpcExecutionCacheInterface {
+ ExecutionCacheReply getCachedResult(ExecutionCacheRequest request);
+ ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request);
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java
new file mode 100644
index 0000000000..fd44792b4a
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java
@@ -0,0 +1,27 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import java.util.Iterator;
+
+/**
+ * An abstraction layer between the remote execution client and gRPC to support unit testing. This
+ * interface covers the remote execution RPC methods, see {@link ExecuteServiceBlockingStub}.
+ */
+public interface GrpcExecutionInterface {
+ Iterator<ExecuteReply> execute(ExecuteRequest request);
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java
new file mode 100644
index 0000000000..73ab0372d2
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java
@@ -0,0 +1,129 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
+import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
+import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
+import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementations of the gRPC interfaces that actually talk to gRPC.
+ */
+public class GrpcInterfaces {
+ /**
+ * Create a {@link GrpcCasInterface} instance that actually talks to gRPC.
+ */
+ public static GrpcCasInterface casInterface(
+ final int grpcTimeoutSeconds, final ManagedChannel channel) {
+ return new GrpcCasInterface() {
+ private CasServiceBlockingStub getCasServiceBlockingStub() {
+ return CasServiceGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
+ }
+
+ private CasServiceStub getCasServiceStub() {
+ return CasServiceGrpc.newStub(channel)
+ .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CasLookupReply lookup(CasLookupRequest request) {
+ return getCasServiceBlockingStub().lookup(request);
+ }
+
+ @Override
+ public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) {
+ return getCasServiceBlockingStub().uploadTreeMetadata(request);
+ }
+
+ @Override
+ public CasDownloadTreeMetadataReply downloadTreeMetadata(
+ CasDownloadTreeMetadataRequest request) {
+ return getCasServiceBlockingStub().downloadTreeMetadata(request);
+ }
+
+ @Override
+ public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) {
+ return getCasServiceBlockingStub().downloadBlob(request);
+ }
+
+ @Override
+ public StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
+ StreamObserver<CasUploadBlobReply> responseObserver) {
+ return getCasServiceStub().uploadBlob(responseObserver);
+ }
+ };
+ }
+
+ /**
+ * Create a {@link GrpcCasInterface} instance that actually talks to gRPC.
+ */
+ public static GrpcExecutionCacheInterface executionCacheInterface(
+ final int grpcTimeoutSeconds, final ManagedChannel channel) {
+ return new GrpcExecutionCacheInterface() {
+ private ExecutionCacheServiceBlockingStub getExecutionCacheServiceBlockingStub() {
+ return ExecutionCacheServiceGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public ExecutionCacheReply getCachedResult(ExecutionCacheRequest request) {
+ return getExecutionCacheServiceBlockingStub().getCachedResult(request);
+ }
+
+ @Override
+ public ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request) {
+ return getExecutionCacheServiceBlockingStub().setCachedResult(request);
+ }
+ };
+ }
+
+ /**
+ * Create a {@link GrpcExecutionInterface} instance that actually talks to gRPC.
+ */
+ public static GrpcExecutionInterface executionInterface(
+ final int grpcTimeoutSeconds, final ManagedChannel channel) {
+ return new GrpcExecutionInterface() {
+ @Override
+ public Iterator<ExecuteReply> execute(ExecuteRequest request) {
+ ExecuteServiceBlockingStub stub =
+ ExecuteServiceGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(
+ grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS);
+ return stub.execute(request);
+ }
+ };
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index 3635ab7def..d3f8cbbfe7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -14,38 +14,41 @@
package com.google.devtools.build.lib.remote;
-import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
import io.grpc.ManagedChannel;
import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */
@ThreadSafe
-public class RemoteWorkExecutor {
- /** Channel over which to send work to run remotely. */
- private final ManagedChannel channel;
- private final RemoteOptions options;
+public class GrpcRemoteExecutor extends GrpcActionCache {
+ public static boolean isRemoteExecutionOptions(RemoteOptions options) {
+ return options.remoteWorker != null;
+ }
- public RemoteWorkExecutor(RemoteOptions options) throws InvalidConfigurationException {
- this.options = options;
- channel = RemoteUtils.createChannel(options.remoteWorker);
+ private final GrpcExecutionInterface executionIface;
+
+ public GrpcRemoteExecutor(
+ RemoteOptions options,
+ GrpcCasInterface casIface,
+ GrpcExecutionCacheInterface cacheIface,
+ GrpcExecutionInterface executionIface) {
+ super(options, casIface, cacheIface);
+ this.executionIface = executionIface;
}
- public static boolean isRemoteExecutionOptions(RemoteOptions options) {
- return options.remoteWorker != null;
+ public GrpcRemoteExecutor(ManagedChannel channel, RemoteOptions options) {
+ super(
+ options,
+ GrpcInterfaces.casInterface(options.grpcTimeoutSeconds, channel),
+ GrpcInterfaces.executionCacheInterface(options.grpcTimeoutSeconds, channel));
+ this.executionIface = GrpcInterfaces.executionInterface(options.grpcTimeoutSeconds, channel);
}
public ExecuteReply executeRemotely(ExecuteRequest request) {
- ExecuteServiceBlockingStub stub =
- ExecuteServiceGrpc.newBlockingStub(channel)
- .withDeadlineAfter(
- options.grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS);
- Iterator<ExecuteReply> replies = stub.execute(request);
+ Iterator<ExecuteReply> replies = executionIface.execute(request);
ExecuteReply reply = null;
while (replies.hasNext()) {
reply = replies.next();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
index 6dff9b6288..44a63c4d40 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
@@ -80,13 +80,6 @@ interface RemoteActionCache {
ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException;
- /**
- * Download a blob keyed by the given digest and write it to the specified path. Set the
- * executable parameter to the specified value.
- */
- void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
- throws IOException, CacheNotFoundException;
-
/** Upload the given blobs to the cache, and return their digests. */
ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
new file mode 100644
index 0000000000..3e70003f32
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -0,0 +1,220 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.eventbus.EventBus;
+import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.devtools.build.lib.actions.ActionStatusMessage;
+import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander;
+import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.exec.SpawnInputExpander;
+import com.google.devtools.build.lib.exec.SpawnResult;
+import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.protobuf.TextFormat;
+import com.google.protobuf.TextFormat.ParseException;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeSet;
+
+/**
+ * A client for the remote execution service.
+ */
+final class RemoteSpawnRunner {
+ private final EventBus eventBus;
+ private final Path execRoot;
+ private final RemoteOptions options;
+ // TODO(olaola): This will be set on a per-action basis instead.
+ private final Platform platform;
+ private final String workspaceName;
+ private final SpawnInputExpander spawnInputExpander = new SpawnInputExpander(/*strict=*/false);
+
+ private final GrpcRemoteExecutor executor;
+
+ RemoteSpawnRunner(
+ Path execRoot,
+ EventBus eventBus,
+ String workspaceName,
+ RemoteOptions options,
+ GrpcRemoteExecutor executor) {
+ this.execRoot = execRoot;
+ this.eventBus = eventBus;
+ this.workspaceName = workspaceName;
+ this.options = options;
+ if (options.experimentalRemotePlatformOverride != null) {
+ Platform.Builder platformBuilder = Platform.newBuilder();
+ try {
+ TextFormat.getParser().merge(options.experimentalRemotePlatformOverride, platformBuilder);
+ } catch (ParseException e) {
+ throw new RuntimeException("Failed to parse --experimental_remote_platform_override", e);
+ }
+ platform = platformBuilder.build();
+ } else {
+ platform = null;
+ }
+ this.executor = executor;
+ }
+
+ RemoteSpawnRunner(
+ Path execRoot,
+ EventBus eventBus,
+ String workspaceName,
+ RemoteOptions options) {
+ this(execRoot, eventBus, workspaceName, options, connect(options));
+ }
+
+ private static GrpcRemoteExecutor connect(RemoteOptions options) {
+ Preconditions.checkArgument(GrpcRemoteExecutor.isRemoteExecutionOptions(options));
+ ManagedChannel channel;
+ try {
+ channel = RemoteUtils.createChannel(options.remoteWorker);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ return new GrpcRemoteExecutor(channel, options);
+ }
+
+ public SpawnResult exec(
+ Spawn spawn,
+ // TODO(ulfjack): Change this back to FileOutErr.
+ OutErr outErr,
+ ActionInputFileCache actionInputFileCache,
+ ArtifactExpander artifactExpander,
+ float timeout) throws InterruptedException, IOException {
+ ActionExecutionMetadata owner = spawn.getResourceOwner();
+ if (owner.getOwner() != null) {
+ eventBus.post(ActionStatusMessage.runningStrategy(owner, "remote"));
+ }
+
+ try {
+ // Temporary hack: the TreeNodeRepository should be created and maintained upstream!
+ TreeNodeRepository repository =
+ new TreeNodeRepository(execRoot, actionInputFileCache);
+ SortedMap<PathFragment, ActionInput> inputMap =
+ spawnInputExpander.getInputMapping(
+ spawn,
+ artifactExpander,
+ actionInputFileCache,
+ workspaceName);
+ TreeNode inputRoot = repository.buildFromActionInputs(inputMap);
+ repository.computeMerkleDigests(inputRoot);
+ Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment());
+ Action action =
+ buildAction(
+ spawn.getOutputFiles(),
+ ContentDigests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot));
+
+ ActionKey actionKey = ContentDigests.computeActionKey(action);
+ ActionResult result =
+ this.options.remoteAcceptCached ? executor.getCachedActionResult(actionKey) : null;
+ if (result == null) {
+ // Cache miss or we don't accept cache hits.
+ // Upload the command and all the inputs into the remote cache.
+ executor.uploadBlob(command.toByteArray());
+ // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
+ executor.uploadTree(repository, execRoot, inputRoot);
+ // TODO(olaola): set BuildInfo and input total bytes as well.
+ ExecuteRequest.Builder request =
+ ExecuteRequest.newBuilder()
+ .setAction(action)
+ .setAcceptCached(this.options.remoteAcceptCached)
+ .setTotalInputFileCount(inputMap.size())
+ .setTimeoutMillis((int) (1000 * timeout));
+ ExecuteReply reply = executor.executeRemotely(request.build());
+ ExecutionStatus status = reply.getStatus();
+
+ if (!status.getSucceeded()
+ && (status.getError() != ExecutionStatus.ErrorCode.EXEC_FAILED)) {
+ return new SpawnResult.Builder()
+ .setSetupSuccess(false)
+ .setExitCode(-1)
+ .build();
+ }
+
+ result = reply.getResult();
+ }
+
+ // TODO(ulfjack): Download stdout, stderr, and the output files in a single call.
+ passRemoteOutErr(executor, result, outErr);
+ executor.downloadAllResults(result, execRoot);
+ return new SpawnResult.Builder()
+ .setSetupSuccess(true)
+ .setExitCode(result.getReturnCode())
+ .build();
+ } catch (StatusRuntimeException e) {
+ throw new IOException(e);
+ } catch (CacheNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Action buildAction(
+ Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Action.Builder action = Action.newBuilder();
+ action.setCommandDigest(command);
+ action.setInputRootDigest(inputRoot);
+ // Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
+ for (ActionInput output : outputs) {
+ action.addOutputPath(output.getExecPathString());
+ }
+ if (platform != null) {
+ action.setPlatform(platform);
+ }
+ return action.build();
+ }
+
+ private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
+ Command.Builder command = Command.newBuilder();
+ command.addAllArgv(arguments);
+ // Sorting the environment pairs by variable name.
+ TreeSet<String> variables = new TreeSet<>(environment.keySet());
+ for (String var : variables) {
+ command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ }
+ return command.build();
+ }
+
+ private static void passRemoteOutErr(
+ RemoteActionCache cache, ActionResult result, OutErr outErr) throws CacheNotFoundException {
+ ImmutableList<byte[]> streams =
+ cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
+ outErr.printOut(new String(streams.get(0), UTF_8));
+ outErr.printErr(new String(streams.get(1), UTF_8));
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
index ee6f2a2518..df91471a5a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
@@ -199,7 +199,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
EventHandler eventHandler = executor.getEventHandler();
RemoteActionCache actionCache = null;
- RemoteWorkExecutor workExecutor = null;
+ GrpcRemoteExecutor workExecutor = null;
if (spawn.isRemotable()) {
// Initialize remote cache and execution handlers. We use separate handlers for every
// action to enable server-side parallelism (need a different gRPC channel per action).
@@ -211,8 +211,9 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
// Otherwise actionCache remains null and remote caching/execution are disabled.
- if (actionCache != null && RemoteWorkExecutor.isRemoteExecutionOptions(options)) {
- workExecutor = new RemoteWorkExecutor(options);
+ if (actionCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(options)) {
+ workExecutor = new GrpcRemoteExecutor(
+ RemoteUtils.createChannelLegacy(options.remoteWorker), options);
}
} catch (InvalidConfigurationException e) {
eventHandler.handle(Event.warn(e.toString()));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
index 9747e3ef11..89ecf43b2e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
@@ -24,19 +24,24 @@ import java.net.URISyntaxException;
/** Helper methods for gRPC calls */
@ThreadSafe
public final class RemoteUtils {
- public static ManagedChannel createChannel(String hostAndPort)
+ public static ManagedChannel createChannelLegacy(String hostAndPort)
throws InvalidConfigurationException {
try {
- URI uri = new URI("dummy://" + hostAndPort);
- if (uri.getHost() == null || uri.getPort() == -1) {
- throw new URISyntaxException("Invalid host or port.", "");
- }
- return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort())
- .usePlaintext(true)
- .build();
+ return createChannel(hostAndPort);
} catch (URISyntaxException e) {
throw new InvalidConfigurationException(
"Invalid argument for the address of remote cache server: " + hostAndPort);
}
}
+
+ public static ManagedChannel createChannel(String hostAndPort)
+ throws URISyntaxException {
+ URI uri = new URI("dummy://" + hostAndPort);
+ if (uri.getHost() == null || uri.getPort() == -1) {
+ throw new URISyntaxException("Invalid host or port.", "");
+ }
+ return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort())
+ .usePlaintext(true)
+ .build();
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 6d7e218773..6eb90fa4a1 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -130,8 +130,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
}
- @Override
- public void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
+ private void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
throws IOException, CacheNotFoundException {
// This unconditionally downloads the whole file into memory first!
byte[] contents = downloadBlob(digest);
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 0223f7e618..f43bd9d56d 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -1060,6 +1060,8 @@ java_test(
":test_runner",
":testutil",
"//src/main/java/com/google/devtools/build/lib:build-base",
+ "//src/main/java/com/google/devtools/build/lib:inmemoryfs",
+ "//src/main/java/com/google/devtools/build/lib:io",
"//src/main/java/com/google/devtools/build/lib:preconditions",
"//src/main/java/com/google/devtools/build/lib:vfs",
"//src/main/java/com/google/devtools/build/lib/actions",
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
new file mode 100644
index 0000000000..f22dc735ce
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -0,0 +1,502 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.eventbus.EventBus;
+import com.google.common.hash.HashCode;
+import com.google.devtools.build.lib.actions.ActionAnalysisMetadata;
+import com.google.devtools.build.lib.actions.ActionExecutionContext;
+import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
+import com.google.devtools.build.lib.actions.ActionOwner;
+import com.google.devtools.build.lib.actions.Artifact;
+import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander;
+import com.google.devtools.build.lib.actions.ResourceSet;
+import com.google.devtools.build.lib.actions.RunfilesSupplier;
+import com.google.devtools.build.lib.actions.SimpleSpawn;
+import com.google.devtools.build.lib.exec.SpawnResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.FileSystem;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
+import com.google.devtools.common.options.Options;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Tests for {@link RemoteSpawnRunner} in combination with {@link GrpcRemoteExecutor}. */
+@RunWith(JUnit4.class)
+public class GrpcRemoteExecutionClientTest {
+ private static class MockOwner implements ActionExecutionMetadata {
+ private final String mnemonic;
+ private final String progressMessage;
+
+ MockOwner(String mnemonic, String progressMessage) {
+ this.mnemonic = mnemonic;
+ this.progressMessage = progressMessage;
+ }
+
+ @Override
+ public ActionOwner getOwner() {
+ return mock(ActionOwner.class);
+ }
+
+ @Override
+ public String getMnemonic() {
+ return mnemonic;
+ }
+
+ @Override
+ public String getProgressMessage() {
+ return progressMessage;
+ }
+
+ @Override
+ public boolean inputsDiscovered() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean discoversInputs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterable<Artifact> getTools() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterable<Artifact> getInputs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RunfilesSupplier getRunfilesSupplier() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ImmutableSet<Artifact> getOutputs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterable<String> getClientEnvironmentVariables() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Artifact getPrimaryInput() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Artifact getPrimaryOutput() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterable<Artifact> getMandatoryInputs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getKey() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String describeKey() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String prettyPrint() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterable<Artifact> getInputFilesForExtraAction(
+ ActionExecutionContext actionExecutionContext) {
+ return ImmutableList.<Artifact>of();
+ }
+
+ @Override
+ public ImmutableSet<Artifact> getMandatoryOutputs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MiddlemanType getActionType() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean shouldReportPathPrefixConflict(ActionAnalysisMetadata action) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static final class FakeCas implements GrpcCasInterface {
+ private final Map<ByteString, ByteString> content = new HashMap<>();
+
+ public ContentDigest put(byte[] data) {
+ ContentDigest digest = ContentDigests.computeDigest(data);
+ ByteString key = digest.getDigest();
+ ByteString value = ByteString.copyFrom(data);
+ content.put(key, value);
+ return digest;
+ }
+
+ @Override
+ public CasLookupReply lookup(CasLookupRequest request) {
+ CasStatus.Builder result = CasStatus.newBuilder();
+ for (ContentDigest digest : request.getDigestList()) {
+ ByteString key = digest.getDigest();
+ if (!content.containsKey(key)) {
+ result.addMissingDigest(digest);
+ }
+ }
+ if (result.getMissingDigestCount() != 0) {
+ result.setError(CasStatus.ErrorCode.MISSING_DIGEST);
+ } else {
+ result.setSucceeded(true);
+ }
+ return CasLookupReply.newBuilder().setStatus(result).build();
+ }
+
+ @Override
+ public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) {
+ return CasUploadTreeMetadataReply.newBuilder()
+ .setStatus(CasStatus.newBuilder().setSucceeded(true))
+ .build();
+ }
+
+ @Override
+ public CasDownloadTreeMetadataReply downloadTreeMetadata(
+ CasDownloadTreeMetadataRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) {
+ List<CasDownloadReply> result = new ArrayList<>();
+ for (ContentDigest digest : request.getDigestList()) {
+ CasDownloadReply.Builder builder = CasDownloadReply.newBuilder();
+ ByteString item = content.get(digest.getDigest());
+ if (item != null) {
+ builder.setStatus(CasStatus.newBuilder().setSucceeded(true));
+ builder.setData(BlobChunk.newBuilder().setData(item).setDigest(digest));
+ } else {
+ throw new IllegalStateException();
+ }
+ result.add(builder.build());
+ }
+ return result.iterator();
+ }
+
+ @Override
+ public StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
+ final StreamObserver<CasUploadBlobReply> responseObserver) {
+ return new StreamObserver<CasUploadBlobRequest>() {
+ private ContentDigest digest;
+ private ByteArrayOutputStream current;
+
+ @Override
+ public void onNext(CasUploadBlobRequest value) {
+ BlobChunk chunk = value.getData();
+ if (chunk.hasDigest()) {
+ Preconditions.checkState(digest == null);
+ digest = chunk.getDigest();
+ current = new ByteArrayOutputStream();
+ }
+ try {
+ current.write(chunk.getData().toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ responseObserver.onNext(
+ CasUploadBlobReply.newBuilder()
+ .setStatus(CasStatus.newBuilder().setSucceeded(true))
+ .build());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ throw new RuntimeException(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ ContentDigest check = ContentDigests.computeDigest(current.toByteArray());
+ Preconditions.checkState(check.equals(digest), "%s != %s", digest, check);
+ ByteString key = digest.getDigest();
+ ByteString value = ByteString.copyFrom(current.toByteArray());
+ digest = null;
+ current = null;
+ content.put(key, value);
+ responseObserver.onCompleted();
+ }
+ };
+ }
+ }
+
+ private static final class FakeActionInputFileCache implements ActionInputFileCache {
+ private final Path execRoot;
+ private final BiMap<ActionInput, ByteString> cas = HashBiMap.create();
+
+ FakeActionInputFileCache(Path execRoot) {
+ this.execRoot = execRoot;
+ }
+
+ void setDigest(ActionInput input, ByteString digest) {
+ cas.put(input, digest);
+ }
+
+ @Override
+ @Nullable
+ public byte[] getDigest(ActionInput input) throws IOException {
+ return Preconditions.checkNotNull(cas.get(input), input).toByteArray();
+ }
+
+ @Override
+ public boolean isFile(Artifact input) {
+ return execRoot.getRelative(input.getExecPath()).isFile();
+ }
+
+ @Override
+ public long getSizeInBytes(ActionInput input) throws IOException {
+ return execRoot.getRelative(input.getExecPath()).getFileSize();
+ }
+
+ @Override
+ public boolean contentsAvailableLocally(ByteString digest) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @Nullable
+ public ActionInput getInputFromDigest(ByteString hexDigest) {
+ HashCode code =
+ HashCode.fromString(new String(hexDigest.toByteArray(), StandardCharsets.UTF_8));
+ ByteString digest = ByteString.copyFrom(code.asBytes());
+ return Preconditions.checkNotNull(cas.inverse().get(digest));
+ }
+
+ @Override
+ public Path getInputPath(ActionInput input) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = new ArtifactExpander() {
+ @Override
+ public void expand(Artifact artifact, Collection<? super Artifact> output) {
+ output.add(artifact);
+ }
+ };
+
+ private FileSystem fs;
+ private Path execRoot;
+ private EventBus eventBus;
+ private SimpleSpawn simpleSpawn;
+ private FakeActionInputFileCache fakeFileCache;
+
+ @Before
+ public final void setUp() throws Exception {
+ fs = new InMemoryFileSystem();
+ execRoot = fs.getPath("/exec/root");
+ FileSystemUtils.createDirectoryAndParents(execRoot);
+ eventBus = new EventBus();
+ fakeFileCache = new FakeActionInputFileCache(execRoot);
+ simpleSpawn = new SimpleSpawn(
+ new MockOwner("Mnemonic", "Progress Message"),
+ ImmutableList.of("/bin/echo", "Hi!"),
+ ImmutableMap.of("VARIABLE", "value"),
+ /*executionInfo=*/ImmutableMap.<String, String>of(),
+ /*inputs=*/ImmutableList.of(ActionInputHelper.fromPath("input")),
+ /*outputs=*/ImmutableList.<ActionInput>of(),
+ ResourceSet.ZERO
+ );
+ }
+
+ private void scratch(ActionInput input, String content) throws IOException {
+ Path inputFile = execRoot.getRelative(input.getExecPath());
+ FileSystemUtils.writeContentAsLatin1(inputFile, content);
+ fakeFileCache.setDigest(
+ simpleSpawn.getInputFiles().get(0), ByteString.copyFrom(inputFile.getSHA1Digest()));
+ }
+
+ @Test
+ public void cacheHit() throws Exception {
+ GrpcCasInterface casIface = Mockito.mock(GrpcCasInterface.class);
+ GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
+ GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
+ RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+ GrpcRemoteExecutor executor =
+ new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
+ RemoteSpawnRunner client =
+ new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor);
+
+ scratch(simpleSpawn.getInputFiles().get(0), "xyz");
+
+ ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
+ .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
+ .setResult(ActionResult.newBuilder().setReturnCode(0))
+ .build();
+ when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ OutErr outErr = OutErr.create(out, err);
+ SpawnResult result =
+ client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1);
+ verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(out.toByteArray()).isEmpty();
+ assertThat(err.toByteArray()).isEmpty();
+ }
+
+ @Test
+ public void cacheHitWithOutput() throws Exception {
+ FakeCas casIface = new FakeCas();
+ GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
+ GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
+ RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+ GrpcRemoteExecutor executor =
+ new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
+ RemoteSpawnRunner client =
+ new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor);
+
+ scratch(simpleSpawn.getInputFiles().get(0), "xyz");
+ byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
+ byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8);
+ ContentDigest stdOutDigest = casIface.put(cacheStdOut);
+ ContentDigest stdErrDigest = casIface.put(cacheStdErr);
+
+ ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
+ .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
+ .setResult(ActionResult.newBuilder()
+ .setReturnCode(0)
+ .setStdoutDigest(stdOutDigest)
+ .setStderrDigest(stdErrDigest))
+ .build();
+ when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ OutErr outErr = OutErr.create(out, err);
+ SpawnResult result =
+ client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1);
+ verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(out.toByteArray()).isEqualTo(cacheStdOut);
+ assertThat(err.toByteArray()).isEqualTo(cacheStdErr);
+ }
+
+ @Test
+ public void remotelyExecute() throws Exception {
+ FakeCas casIface = new FakeCas();
+ GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
+ GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
+ RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+ GrpcRemoteExecutor executor =
+ new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
+ RemoteSpawnRunner client =
+ new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor);
+
+ scratch(simpleSpawn.getInputFiles().get(0), "xyz");
+ byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
+ byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8);
+ ContentDigest stdOutDigest = casIface.put(cacheStdOut);
+ ContentDigest stdErrDigest = casIface.put(cacheStdErr);
+
+ ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
+ .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
+ .build();
+ when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
+
+ when(executionIface.execute(any(ExecuteRequest.class))).thenReturn(ImmutableList.of(
+ ExecuteReply.newBuilder()
+ .setStatus(ExecutionStatus.newBuilder().setSucceeded(true))
+ .setResult(ActionResult.newBuilder()
+ .setReturnCode(0)
+ .setStdoutDigest(stdOutDigest)
+ .setStderrDigest(stdErrDigest))
+ .build()).iterator());
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ OutErr outErr = OutErr.create(out, err);
+ SpawnResult result =
+ client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1);
+ verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(out.toByteArray()).isEqualTo(cacheStdOut);
+ assertThat(err.toByteArray()).isEqualTo(cacheStdErr);
+ }
+}