diff options
Diffstat (limited to 'src')
15 files changed, 1107 insertions, 91 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java b/src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java index 8d8c6d1e0e..f32d238243 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java +++ b/src/main/java/com/google/devtools/build/lib/exec/SpawnInputExpander.java @@ -196,12 +196,24 @@ public class SpawnInputExpander { Spawn spawn, ArtifactExpander artifactExpander, ActionInputFileCache actionInputFileCache, FilesetActionContext filesetContext) throws IOException { + return getInputMapping( + spawn, artifactExpander, actionInputFileCache, filesetContext.getWorkspaceName()); + } + + /** + * Convert the inputs of the given spawn to a map from exec-root relative paths to action inputs. + * In some cases, this generates empty files, for which it uses {@code null}. + */ + public SortedMap<PathFragment, ActionInput> getInputMapping( + Spawn spawn, ArtifactExpander artifactExpander, ActionInputFileCache actionInputFileCache, + String workspaceName) + throws IOException { TreeMap<PathFragment, ActionInput> inputMap = new TreeMap<>(); addInputs(inputMap, spawn, artifactExpander); addRunfilesToInputs( inputMap, spawn.getRunfilesSupplier(), actionInputFileCache); for (Artifact manifest : spawn.getFilesetManifests()) { - parseFilesetManifest(inputMap, manifest, filesetContext.getWorkspaceName()); + parseFilesetManifest(inputMap, manifest, workspaceName); } return inputMap; } diff --git a/src/main/java/com/google/devtools/build/lib/exec/SpawnResult.java b/src/main/java/com/google/devtools/build/lib/exec/SpawnResult.java new file mode 100644 index 0000000000..3951e72d05 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/exec/SpawnResult.java @@ -0,0 +1,79 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.exec; + +import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; + +/** + * The result of a spawn execution. + */ +public interface SpawnResult { + /** + * Returns whether the spawn was actually run, regardless of the exit code. Returns false if there + * were network errors, missing local files, errors setting up sandboxing, etc. + */ + boolean setupSuccess(); + + /** + * The exit code of the subprocess. + */ + int exitCode(); + + /** + * Basic implementation of {@link SpawnResult}. + */ + @Immutable @ThreadSafe + public static final class SimpleSpawnResult implements SpawnResult { + private final boolean setupSuccess; + private final int exitCode; + + SimpleSpawnResult(Builder builder) { + this.setupSuccess = builder.setupSuccess; + this.exitCode = builder.exitCode; + } + + @Override + public boolean setupSuccess() { + return setupSuccess; + } + + @Override + public int exitCode() { + return exitCode; + } + } + + /** + * Builder class for {@link SpawnResult}. + */ + public static final class Builder { + private boolean setupSuccess; + private int exitCode; + + public SpawnResult build() { + return new SimpleSpawnResult(this); + } + + public Builder setSetupSuccess(boolean setupSuccess) { + this.setupSuccess = setupSuccess; + return this; + } + + public Builder setExitCode(int exitCode) { + this.exitCode = exitCode; + return this; + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java index 1076933fa0..2d493e12ea 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java @@ -22,10 +22,7 @@ import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub; import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub; import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; @@ -73,15 +70,14 @@ import java.util.concurrent.atomic.AtomicReference; /** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */ @ThreadSafe -public final class GrpcActionCache implements RemoteActionCache { +public class GrpcActionCache implements RemoteActionCache { + private static final int MAX_MEMORY_KBYTES = 512 * 1024; /** Channel over which to send gRPC CAS queries. */ - private final ManagedChannel channel; - + private final GrpcCasInterface casIface; + private final GrpcExecutionCacheInterface iface; private final RemoteOptions options; - private static final int MAX_MEMORY_KBYTES = 512 * 1024; - /** Reads from multiple sequential inputs and chunks the data into BlobChunks. */ static interface BlobChunkIterator { boolean hasNext(); @@ -204,7 +200,7 @@ public final class GrpcActionCache implements RemoteActionCache { chunk.setOffset(offset); } if (bytesLeft > 0) { - byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)]; + byte[] blob = new byte[(int) Math.min(bytesLeft, options.grpcMaxChunkSizeBytes)]; currentStream.read(blob); chunk.setData(ByteString.copyFrom(blob)); bytesLeft -= blob.length; @@ -294,36 +290,34 @@ public final class GrpcActionCache implements RemoteActionCache { } } + public GrpcActionCache( + RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) { + this.options = options; + this.casIface = casIface; + this.iface = iface; + } + @VisibleForTesting public GrpcActionCache(ManagedChannel channel, RemoteOptions options) { this.options = options; - this.channel = channel; + this.casIface = GrpcInterfaces.casInterface(options.grpcTimeoutSeconds, channel); + this.iface = GrpcInterfaces.executionCacheInterface(options.grpcTimeoutSeconds, channel); } public GrpcActionCache(RemoteOptions options) throws InvalidConfigurationException { - this(RemoteUtils.createChannel(options.remoteCache), options); + this(RemoteUtils.createChannelLegacy(options.remoteCache), options); } public static boolean isRemoteCacheOptions(RemoteOptions options) { return options.remoteCache != null; } - private CasServiceBlockingStub getBlockingStub() { - return CasServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); - } - - private CasServiceStub getStub() { - return CasServiceGrpc.newStub(channel) - .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); - } - private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) { CasLookupRequest.Builder request = CasLookupRequest.newBuilder().addAllDigest(digests); if (request.getDigestCount() == 0) { return ImmutableSet.of(); } - CasStatus status = getBlockingStub().lookup(request.build()).getStatus(); + CasStatus status = casIface.lookup(request.build()).getStatus(); if (!status.getSucceeded() && status.getError() != CasStatus.ErrorCode.MISSING_DIGEST) { // TODO(olaola): here and below, add basic retry logic on transient errors! throw new RuntimeException(status.getErrorDetail()); @@ -350,7 +344,7 @@ public final class GrpcActionCache implements RemoteActionCache { if (!treeNodes.isEmpty()) { CasUploadTreeMetadataRequest.Builder metaRequest = CasUploadTreeMetadataRequest.newBuilder().addAllTreeNode(treeNodes); - CasUploadTreeMetadataReply reply = getBlockingStub().uploadTreeMetadata(metaRequest.build()); + CasUploadTreeMetadataReply reply = casIface.uploadTreeMetadata(metaRequest.build()); if (!reply.getStatus().getSucceeded()) { throw new RuntimeException(reply.getStatus().getErrorDetail()); } @@ -389,6 +383,9 @@ public final class GrpcActionCache implements RemoteActionCache { @Override public void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException { + if (result.getOutputList().isEmpty()) { + return; + } // Send all the file requests in a single synchronous batch. // TODO(olaola): profile to maybe replace with separate concurrent requests. CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder(); @@ -410,7 +407,7 @@ public final class GrpcActionCache implements RemoteActionCache { downloadTree(output.getDigest(), path); } } - Iterator<CasDownloadReply> replies = getBlockingStub().downloadBlob(request.build()); + Iterator<CasDownloadReply> replies = casIface.downloadBlob(request.build()); Set<ContentDigest> results = new HashSet<>(); while (replies.hasNext()) { results.add(createFileFromStream(metadataMap, replies)); @@ -521,24 +518,6 @@ public final class GrpcActionCache implements RemoteActionCache { return digest; } - /** - * Download a blob keyed by the given digest and write it to the specified path. Set the - * executable parameter to the specified value. - */ - @Override - public void downloadFileContents(ContentDigest digest, Path dest, boolean executable) - throws IOException, CacheNotFoundException { - // Send all the file requests in a single synchronous batch. - // TODO(olaola): profile to maybe replace with separate concurrent requests. - CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder().addDigest(digest); - Iterator<CasDownloadReply> replies = getBlockingStub().downloadBlob(request.build()); - FileMetadata fileMetadata = - FileMetadata.newBuilder().setDigest(digest).setExecutable(executable).build(); - Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap = new HashMap<>(); - metadataMap.put(digest, Pair.of(dest, fileMetadata)); - createFileFromStream(metadataMap, replies); - } - static class UploadBlobReplyStreamObserver implements StreamObserver<CasUploadBlobReply> { private final CountDownLatch finishLatch; private final AtomicReference<RuntimeException> exception; @@ -579,7 +558,6 @@ public final class GrpcActionCache implements RemoteActionCache { int currentBatchBytes = 0; int batchedInputs = 0; int batches = 0; - CasServiceStub stub = getStub(); try { while (blobs.hasNext()) { BlobChunk chunk = blobs.next(); @@ -596,7 +574,7 @@ public final class GrpcActionCache implements RemoteActionCache { } batches++; responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception); - requestObserver = stub.uploadBlob(responseObserver); + requestObserver = casIface.uploadBlobAsync(responseObserver); } batchedInputs++; } @@ -682,7 +660,7 @@ public final class GrpcActionCache implements RemoteActionCache { Map<ContentDigest, byte[]> results = new HashMap<>(); int digestCount = request.getDigestCount(); if (digestCount > 0) { - replies = getBlockingStub().downloadBlob(request.build()); + replies = casIface.downloadBlob(request.build()); while (digestCount-- > 0) { Preconditions.checkArgument(replies.hasNext()); CasDownloadReply reply = replies.next(); @@ -733,12 +711,9 @@ public final class GrpcActionCache implements RemoteActionCache { /** Returns a cached result for a given Action digest, or null if not found in cache. */ @Override public ActionResult getCachedActionResult(ActionKey actionKey) { - ExecutionCacheServiceBlockingStub stub = - ExecutionCacheServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); ExecutionCacheRequest request = ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build(); - ExecutionCacheReply reply = stub.getCachedResult(request); + ExecutionCacheReply reply = iface.getCachedResult(request); ExecutionCacheStatus status = reply.getStatus(); if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.MISSING_RESULT) { @@ -751,15 +726,12 @@ public final class GrpcActionCache implements RemoteActionCache { @Override public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException { - ExecutionCacheServiceBlockingStub stub = - ExecutionCacheServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); ExecutionCacheSetRequest request = ExecutionCacheSetRequest.newBuilder() .setActionDigest(actionKey.getDigest()) .setResult(result) .build(); - ExecutionCacheSetReply reply = stub.setCachedResult(request); + ExecutionCacheSetReply reply = iface.setCachedResult(request); ExecutionCacheStatus status = reply.getStatus(); if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.UNSUPPORTED) { throw new RuntimeException(status.getErrorDetail()); diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java new file mode 100644 index 0000000000..529ff9c3db --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java @@ -0,0 +1,43 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; +import io.grpc.stub.StreamObserver; +import java.util.Iterator; + +/** + * An abstraction layer between the remote execution client and gRPC to support unit testing. This + * interface covers the CAS RPC methods, see {@link CasServiceBlockingStub} and + * {@link CasServiceStub}. + */ +public interface GrpcCasInterface { + CasLookupReply lookup(CasLookupRequest request); + CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request); + CasDownloadTreeMetadataReply downloadTreeMetadata(CasDownloadTreeMetadataRequest request); + Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request); + StreamObserver<CasUploadBlobRequest> uploadBlobAsync( + StreamObserver<CasUploadBlobReply> responseObserver); +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java new file mode 100644 index 0000000000..375c2ab359 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java @@ -0,0 +1,29 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; + +/** + * An abstraction layer between the remote execution client and gRPC to support unit testing. This + * interface covers the execution cache RPC methods, see {@link ExecutionCacheServiceBlockingStub}. + */ +public interface GrpcExecutionCacheInterface { + ExecutionCacheReply getCachedResult(ExecutionCacheRequest request); + ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request); +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java new file mode 100644 index 0000000000..fd44792b4a --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java @@ -0,0 +1,27 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import java.util.Iterator; + +/** + * An abstraction layer between the remote execution client and gRPC to support unit testing. This + * interface covers the remote execution RPC methods, see {@link ExecuteServiceBlockingStub}. + */ +public interface GrpcExecutionInterface { + Iterator<ExecuteReply> execute(ExecuteRequest request); +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java new file mode 100644 index 0000000000..73ab0372d2 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java @@ -0,0 +1,129 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub; +import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; +import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +/** + * Implementations of the gRPC interfaces that actually talk to gRPC. + */ +public class GrpcInterfaces { + /** + * Create a {@link GrpcCasInterface} instance that actually talks to gRPC. + */ + public static GrpcCasInterface casInterface( + final int grpcTimeoutSeconds, final ManagedChannel channel) { + return new GrpcCasInterface() { + private CasServiceBlockingStub getCasServiceBlockingStub() { + return CasServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + } + + private CasServiceStub getCasServiceStub() { + return CasServiceGrpc.newStub(channel) + .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + } + + @Override + public CasLookupReply lookup(CasLookupRequest request) { + return getCasServiceBlockingStub().lookup(request); + } + + @Override + public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) { + return getCasServiceBlockingStub().uploadTreeMetadata(request); + } + + @Override + public CasDownloadTreeMetadataReply downloadTreeMetadata( + CasDownloadTreeMetadataRequest request) { + return getCasServiceBlockingStub().downloadTreeMetadata(request); + } + + @Override + public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) { + return getCasServiceBlockingStub().downloadBlob(request); + } + + @Override + public StreamObserver<CasUploadBlobRequest> uploadBlobAsync( + StreamObserver<CasUploadBlobReply> responseObserver) { + return getCasServiceStub().uploadBlob(responseObserver); + } + }; + } + + /** + * Create a {@link GrpcCasInterface} instance that actually talks to gRPC. + */ + public static GrpcExecutionCacheInterface executionCacheInterface( + final int grpcTimeoutSeconds, final ManagedChannel channel) { + return new GrpcExecutionCacheInterface() { + private ExecutionCacheServiceBlockingStub getExecutionCacheServiceBlockingStub() { + return ExecutionCacheServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + } + + @Override + public ExecutionCacheReply getCachedResult(ExecutionCacheRequest request) { + return getExecutionCacheServiceBlockingStub().getCachedResult(request); + } + + @Override + public ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request) { + return getExecutionCacheServiceBlockingStub().setCachedResult(request); + } + }; + } + + /** + * Create a {@link GrpcExecutionInterface} instance that actually talks to gRPC. + */ + public static GrpcExecutionInterface executionInterface( + final int grpcTimeoutSeconds, final ManagedChannel channel) { + return new GrpcExecutionInterface() { + @Override + public Iterator<ExecuteReply> execute(ExecuteRequest request) { + ExecuteServiceBlockingStub stub = + ExecuteServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter( + grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); + return stub.execute(request); + } + }; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index 3635ab7def..d3f8cbbfe7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -14,38 +14,41 @@ package com.google.devtools.build.lib.remote; -import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; import io.grpc.ManagedChannel; import java.util.Iterator; -import java.util.concurrent.TimeUnit; /** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */ @ThreadSafe -public class RemoteWorkExecutor { - /** Channel over which to send work to run remotely. */ - private final ManagedChannel channel; - private final RemoteOptions options; +public class GrpcRemoteExecutor extends GrpcActionCache { + public static boolean isRemoteExecutionOptions(RemoteOptions options) { + return options.remoteWorker != null; + } - public RemoteWorkExecutor(RemoteOptions options) throws InvalidConfigurationException { - this.options = options; - channel = RemoteUtils.createChannel(options.remoteWorker); + private final GrpcExecutionInterface executionIface; + + public GrpcRemoteExecutor( + RemoteOptions options, + GrpcCasInterface casIface, + GrpcExecutionCacheInterface cacheIface, + GrpcExecutionInterface executionIface) { + super(options, casIface, cacheIface); + this.executionIface = executionIface; } - public static boolean isRemoteExecutionOptions(RemoteOptions options) { - return options.remoteWorker != null; + public GrpcRemoteExecutor(ManagedChannel channel, RemoteOptions options) { + super( + options, + GrpcInterfaces.casInterface(options.grpcTimeoutSeconds, channel), + GrpcInterfaces.executionCacheInterface(options.grpcTimeoutSeconds, channel)); + this.executionIface = GrpcInterfaces.executionInterface(options.grpcTimeoutSeconds, channel); } public ExecuteReply executeRemotely(ExecuteRequest request) { - ExecuteServiceBlockingStub stub = - ExecuteServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter( - options.grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); - Iterator<ExecuteReply> replies = stub.execute(request); + Iterator<ExecuteReply> replies = executionIface.execute(request); ExecuteReply reply = null; while (replies.hasNext()) { reply = replies.next(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java index 6dff9b6288..44a63c4d40 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java @@ -80,13 +80,6 @@ interface RemoteActionCache { ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException, InterruptedException; - /** - * Download a blob keyed by the given digest and write it to the specified path. Set the - * executable parameter to the specified value. - */ - void downloadFileContents(ContentDigest digest, Path dest, boolean executable) - throws IOException, CacheNotFoundException; - /** Upload the given blobs to the cache, and return their digests. */ ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException; diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java new file mode 100644 index 0000000000..3e70003f32 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -0,0 +1,220 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.eventbus.EventBus; +import com.google.devtools.build.lib.actions.ActionExecutionMetadata; +import com.google.devtools.build.lib.actions.ActionInput; +import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.devtools.build.lib.actions.ActionStatusMessage; +import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander; +import com.google.devtools.build.lib.actions.Spawn; +import com.google.devtools.build.lib.exec.SpawnInputExpander; +import com.google.devtools.build.lib.exec.SpawnResult; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.Action; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.Command; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import com.google.devtools.build.lib.remote.RemoteProtocol.Platform; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.protobuf.TextFormat; +import com.google.protobuf.TextFormat.ParseException; +import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeSet; + +/** + * A client for the remote execution service. + */ +final class RemoteSpawnRunner { + private final EventBus eventBus; + private final Path execRoot; + private final RemoteOptions options; + // TODO(olaola): This will be set on a per-action basis instead. + private final Platform platform; + private final String workspaceName; + private final SpawnInputExpander spawnInputExpander = new SpawnInputExpander(/*strict=*/false); + + private final GrpcRemoteExecutor executor; + + RemoteSpawnRunner( + Path execRoot, + EventBus eventBus, + String workspaceName, + RemoteOptions options, + GrpcRemoteExecutor executor) { + this.execRoot = execRoot; + this.eventBus = eventBus; + this.workspaceName = workspaceName; + this.options = options; + if (options.experimentalRemotePlatformOverride != null) { + Platform.Builder platformBuilder = Platform.newBuilder(); + try { + TextFormat.getParser().merge(options.experimentalRemotePlatformOverride, platformBuilder); + } catch (ParseException e) { + throw new RuntimeException("Failed to parse --experimental_remote_platform_override", e); + } + platform = platformBuilder.build(); + } else { + platform = null; + } + this.executor = executor; + } + + RemoteSpawnRunner( + Path execRoot, + EventBus eventBus, + String workspaceName, + RemoteOptions options) { + this(execRoot, eventBus, workspaceName, options, connect(options)); + } + + private static GrpcRemoteExecutor connect(RemoteOptions options) { + Preconditions.checkArgument(GrpcRemoteExecutor.isRemoteExecutionOptions(options)); + ManagedChannel channel; + try { + channel = RemoteUtils.createChannel(options.remoteWorker); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + return new GrpcRemoteExecutor(channel, options); + } + + public SpawnResult exec( + Spawn spawn, + // TODO(ulfjack): Change this back to FileOutErr. + OutErr outErr, + ActionInputFileCache actionInputFileCache, + ArtifactExpander artifactExpander, + float timeout) throws InterruptedException, IOException { + ActionExecutionMetadata owner = spawn.getResourceOwner(); + if (owner.getOwner() != null) { + eventBus.post(ActionStatusMessage.runningStrategy(owner, "remote")); + } + + try { + // Temporary hack: the TreeNodeRepository should be created and maintained upstream! + TreeNodeRepository repository = + new TreeNodeRepository(execRoot, actionInputFileCache); + SortedMap<PathFragment, ActionInput> inputMap = + spawnInputExpander.getInputMapping( + spawn, + artifactExpander, + actionInputFileCache, + workspaceName); + TreeNode inputRoot = repository.buildFromActionInputs(inputMap); + repository.computeMerkleDigests(inputRoot); + Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment()); + Action action = + buildAction( + spawn.getOutputFiles(), + ContentDigests.computeDigest(command), + repository.getMerkleDigest(inputRoot)); + + ActionKey actionKey = ContentDigests.computeActionKey(action); + ActionResult result = + this.options.remoteAcceptCached ? executor.getCachedActionResult(actionKey) : null; + if (result == null) { + // Cache miss or we don't accept cache hits. + // Upload the command and all the inputs into the remote cache. + executor.uploadBlob(command.toByteArray()); + // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests! + executor.uploadTree(repository, execRoot, inputRoot); + // TODO(olaola): set BuildInfo and input total bytes as well. + ExecuteRequest.Builder request = + ExecuteRequest.newBuilder() + .setAction(action) + .setAcceptCached(this.options.remoteAcceptCached) + .setTotalInputFileCount(inputMap.size()) + .setTimeoutMillis((int) (1000 * timeout)); + ExecuteReply reply = executor.executeRemotely(request.build()); + ExecutionStatus status = reply.getStatus(); + + if (!status.getSucceeded() + && (status.getError() != ExecutionStatus.ErrorCode.EXEC_FAILED)) { + return new SpawnResult.Builder() + .setSetupSuccess(false) + .setExitCode(-1) + .build(); + } + + result = reply.getResult(); + } + + // TODO(ulfjack): Download stdout, stderr, and the output files in a single call. + passRemoteOutErr(executor, result, outErr); + executor.downloadAllResults(result, execRoot); + return new SpawnResult.Builder() + .setSetupSuccess(true) + .setExitCode(result.getReturnCode()) + .build(); + } catch (StatusRuntimeException e) { + throw new IOException(e); + } catch (CacheNotFoundException e) { + throw new IOException(e); + } + } + + private Action buildAction( + Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) { + Action.Builder action = Action.newBuilder(); + action.setCommandDigest(command); + action.setInputRootDigest(inputRoot); + // Somewhat ugly: we rely on the stable order of outputs here for remote action caching. + for (ActionInput output : outputs) { + action.addOutputPath(output.getExecPathString()); + } + if (platform != null) { + action.setPlatform(platform); + } + return action.build(); + } + + private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) { + Command.Builder command = Command.newBuilder(); + command.addAllArgv(arguments); + // Sorting the environment pairs by variable name. + TreeSet<String> variables = new TreeSet<>(environment.keySet()); + for (String var : variables) { + command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var)); + } + return command.build(); + } + + private static void passRemoteOutErr( + RemoteActionCache cache, ActionResult result, OutErr outErr) throws CacheNotFoundException { + ImmutableList<byte[]> streams = + cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); + outErr.printOut(new String(streams.get(0), UTF_8)); + outErr.printErr(new String(streams.get(1), UTF_8)); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index ee6f2a2518..df91471a5a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -199,7 +199,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext { EventHandler eventHandler = executor.getEventHandler(); RemoteActionCache actionCache = null; - RemoteWorkExecutor workExecutor = null; + GrpcRemoteExecutor workExecutor = null; if (spawn.isRemotable()) { // Initialize remote cache and execution handlers. We use separate handlers for every // action to enable server-side parallelism (need a different gRPC channel per action). @@ -211,8 +211,9 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } // Otherwise actionCache remains null and remote caching/execution are disabled. - if (actionCache != null && RemoteWorkExecutor.isRemoteExecutionOptions(options)) { - workExecutor = new RemoteWorkExecutor(options); + if (actionCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(options)) { + workExecutor = new GrpcRemoteExecutor( + RemoteUtils.createChannelLegacy(options.remoteWorker), options); } } catch (InvalidConfigurationException e) { eventHandler.handle(Event.warn(e.toString())); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java index 9747e3ef11..89ecf43b2e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java @@ -24,19 +24,24 @@ import java.net.URISyntaxException; /** Helper methods for gRPC calls */ @ThreadSafe public final class RemoteUtils { - public static ManagedChannel createChannel(String hostAndPort) + public static ManagedChannel createChannelLegacy(String hostAndPort) throws InvalidConfigurationException { try { - URI uri = new URI("dummy://" + hostAndPort); - if (uri.getHost() == null || uri.getPort() == -1) { - throw new URISyntaxException("Invalid host or port.", ""); - } - return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()) - .usePlaintext(true) - .build(); + return createChannel(hostAndPort); } catch (URISyntaxException e) { throw new InvalidConfigurationException( "Invalid argument for the address of remote cache server: " + hostAndPort); } } + + public static ManagedChannel createChannel(String hostAndPort) + throws URISyntaxException { + URI uri = new URI("dummy://" + hostAndPort); + if (uri.getHost() == null || uri.getPort() == -1) { + throw new URISyntaxException("Invalid host or port.", ""); + } + return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()) + .usePlaintext(true) + .build(); + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index 6d7e218773..6eb90fa4a1 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -130,8 +130,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } } - @Override - public void downloadFileContents(ContentDigest digest, Path dest, boolean executable) + private void downloadFileContents(ContentDigest digest, Path dest, boolean executable) throws IOException, CacheNotFoundException { // This unconditionally downloads the whole file into memory first! byte[] contents = downloadBlob(digest); diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD index 0223f7e618..f43bd9d56d 100644 --- a/src/test/java/com/google/devtools/build/lib/BUILD +++ b/src/test/java/com/google/devtools/build/lib/BUILD @@ -1060,6 +1060,8 @@ java_test( ":test_runner", ":testutil", "//src/main/java/com/google/devtools/build/lib:build-base", + "//src/main/java/com/google/devtools/build/lib:inmemoryfs", + "//src/main/java/com/google/devtools/build/lib:io", "//src/main/java/com/google/devtools/build/lib:preconditions", "//src/main/java/com/google/devtools/build/lib:vfs", "//src/main/java/com/google/devtools/build/lib/actions", diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java new file mode 100644 index 0000000000..f22dc735ce --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java @@ -0,0 +1,502 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.eventbus.EventBus; +import com.google.common.hash.HashCode; +import com.google.devtools.build.lib.actions.ActionAnalysisMetadata; +import com.google.devtools.build.lib.actions.ActionExecutionContext; +import com.google.devtools.build.lib.actions.ActionExecutionMetadata; +import com.google.devtools.build.lib.actions.ActionInput; +import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.devtools.build.lib.actions.ActionInputHelper; +import com.google.devtools.build.lib.actions.ActionOwner; +import com.google.devtools.build.lib.actions.Artifact; +import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander; +import com.google.devtools.build.lib.actions.ResourceSet; +import com.google.devtools.build.lib.actions.RunfilesSupplier; +import com.google.devtools.build.lib.actions.SimpleSpawn; +import com.google.devtools.build.lib.exec.SpawnResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.vfs.FileSystem; +import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; +import com.google.devtools.common.options.Options; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Tests for {@link RemoteSpawnRunner} in combination with {@link GrpcRemoteExecutor}. */ +@RunWith(JUnit4.class) +public class GrpcRemoteExecutionClientTest { + private static class MockOwner implements ActionExecutionMetadata { + private final String mnemonic; + private final String progressMessage; + + MockOwner(String mnemonic, String progressMessage) { + this.mnemonic = mnemonic; + this.progressMessage = progressMessage; + } + + @Override + public ActionOwner getOwner() { + return mock(ActionOwner.class); + } + + @Override + public String getMnemonic() { + return mnemonic; + } + + @Override + public String getProgressMessage() { + return progressMessage; + } + + @Override + public boolean inputsDiscovered() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean discoversInputs() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterable<Artifact> getTools() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterable<Artifact> getInputs() { + throw new UnsupportedOperationException(); + } + + @Override + public RunfilesSupplier getRunfilesSupplier() { + throw new UnsupportedOperationException(); + } + + @Override + public ImmutableSet<Artifact> getOutputs() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterable<String> getClientEnvironmentVariables() { + throw new UnsupportedOperationException(); + } + + @Override + public Artifact getPrimaryInput() { + throw new UnsupportedOperationException(); + } + + @Override + public Artifact getPrimaryOutput() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterable<Artifact> getMandatoryInputs() { + throw new UnsupportedOperationException(); + } + + @Override + public String getKey() { + throw new UnsupportedOperationException(); + } + + @Override + public String describeKey() { + throw new UnsupportedOperationException(); + } + + @Override + public String prettyPrint() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterable<Artifact> getInputFilesForExtraAction( + ActionExecutionContext actionExecutionContext) { + return ImmutableList.<Artifact>of(); + } + + @Override + public ImmutableSet<Artifact> getMandatoryOutputs() { + throw new UnsupportedOperationException(); + } + + @Override + public MiddlemanType getActionType() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean shouldReportPathPrefixConflict(ActionAnalysisMetadata action) { + throw new UnsupportedOperationException(); + } + } + + private static final class FakeCas implements GrpcCasInterface { + private final Map<ByteString, ByteString> content = new HashMap<>(); + + public ContentDigest put(byte[] data) { + ContentDigest digest = ContentDigests.computeDigest(data); + ByteString key = digest.getDigest(); + ByteString value = ByteString.copyFrom(data); + content.put(key, value); + return digest; + } + + @Override + public CasLookupReply lookup(CasLookupRequest request) { + CasStatus.Builder result = CasStatus.newBuilder(); + for (ContentDigest digest : request.getDigestList()) { + ByteString key = digest.getDigest(); + if (!content.containsKey(key)) { + result.addMissingDigest(digest); + } + } + if (result.getMissingDigestCount() != 0) { + result.setError(CasStatus.ErrorCode.MISSING_DIGEST); + } else { + result.setSucceeded(true); + } + return CasLookupReply.newBuilder().setStatus(result).build(); + } + + @Override + public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) { + return CasUploadTreeMetadataReply.newBuilder() + .setStatus(CasStatus.newBuilder().setSucceeded(true)) + .build(); + } + + @Override + public CasDownloadTreeMetadataReply downloadTreeMetadata( + CasDownloadTreeMetadataRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) { + List<CasDownloadReply> result = new ArrayList<>(); + for (ContentDigest digest : request.getDigestList()) { + CasDownloadReply.Builder builder = CasDownloadReply.newBuilder(); + ByteString item = content.get(digest.getDigest()); + if (item != null) { + builder.setStatus(CasStatus.newBuilder().setSucceeded(true)); + builder.setData(BlobChunk.newBuilder().setData(item).setDigest(digest)); + } else { + throw new IllegalStateException(); + } + result.add(builder.build()); + } + return result.iterator(); + } + + @Override + public StreamObserver<CasUploadBlobRequest> uploadBlobAsync( + final StreamObserver<CasUploadBlobReply> responseObserver) { + return new StreamObserver<CasUploadBlobRequest>() { + private ContentDigest digest; + private ByteArrayOutputStream current; + + @Override + public void onNext(CasUploadBlobRequest value) { + BlobChunk chunk = value.getData(); + if (chunk.hasDigest()) { + Preconditions.checkState(digest == null); + digest = chunk.getDigest(); + current = new ByteArrayOutputStream(); + } + try { + current.write(chunk.getData().toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + responseObserver.onNext( + CasUploadBlobReply.newBuilder() + .setStatus(CasStatus.newBuilder().setSucceeded(true)) + .build()); + } + + @Override + public void onError(Throwable t) { + throw new RuntimeException(t); + } + + @Override + public void onCompleted() { + ContentDigest check = ContentDigests.computeDigest(current.toByteArray()); + Preconditions.checkState(check.equals(digest), "%s != %s", digest, check); + ByteString key = digest.getDigest(); + ByteString value = ByteString.copyFrom(current.toByteArray()); + digest = null; + current = null; + content.put(key, value); + responseObserver.onCompleted(); + } + }; + } + } + + private static final class FakeActionInputFileCache implements ActionInputFileCache { + private final Path execRoot; + private final BiMap<ActionInput, ByteString> cas = HashBiMap.create(); + + FakeActionInputFileCache(Path execRoot) { + this.execRoot = execRoot; + } + + void setDigest(ActionInput input, ByteString digest) { + cas.put(input, digest); + } + + @Override + @Nullable + public byte[] getDigest(ActionInput input) throws IOException { + return Preconditions.checkNotNull(cas.get(input), input).toByteArray(); + } + + @Override + public boolean isFile(Artifact input) { + return execRoot.getRelative(input.getExecPath()).isFile(); + } + + @Override + public long getSizeInBytes(ActionInput input) throws IOException { + return execRoot.getRelative(input.getExecPath()).getFileSize(); + } + + @Override + public boolean contentsAvailableLocally(ByteString digest) { + throw new UnsupportedOperationException(); + } + + @Override + @Nullable + public ActionInput getInputFromDigest(ByteString hexDigest) { + HashCode code = + HashCode.fromString(new String(hexDigest.toByteArray(), StandardCharsets.UTF_8)); + ByteString digest = ByteString.copyFrom(code.asBytes()); + return Preconditions.checkNotNull(cas.inverse().get(digest)); + } + + @Override + public Path getInputPath(ActionInput input) { + throw new UnsupportedOperationException(); + } + } + + private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = new ArtifactExpander() { + @Override + public void expand(Artifact artifact, Collection<? super Artifact> output) { + output.add(artifact); + } + }; + + private FileSystem fs; + private Path execRoot; + private EventBus eventBus; + private SimpleSpawn simpleSpawn; + private FakeActionInputFileCache fakeFileCache; + + @Before + public final void setUp() throws Exception { + fs = new InMemoryFileSystem(); + execRoot = fs.getPath("/exec/root"); + FileSystemUtils.createDirectoryAndParents(execRoot); + eventBus = new EventBus(); + fakeFileCache = new FakeActionInputFileCache(execRoot); + simpleSpawn = new SimpleSpawn( + new MockOwner("Mnemonic", "Progress Message"), + ImmutableList.of("/bin/echo", "Hi!"), + ImmutableMap.of("VARIABLE", "value"), + /*executionInfo=*/ImmutableMap.<String, String>of(), + /*inputs=*/ImmutableList.of(ActionInputHelper.fromPath("input")), + /*outputs=*/ImmutableList.<ActionInput>of(), + ResourceSet.ZERO + ); + } + + private void scratch(ActionInput input, String content) throws IOException { + Path inputFile = execRoot.getRelative(input.getExecPath()); + FileSystemUtils.writeContentAsLatin1(inputFile, content); + fakeFileCache.setDigest( + simpleSpawn.getInputFiles().get(0), ByteString.copyFrom(inputFile.getSHA1Digest())); + } + + @Test + public void cacheHit() throws Exception { + GrpcCasInterface casIface = Mockito.mock(GrpcCasInterface.class); + GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class); + GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class); + RemoteOptions options = Options.getDefaults(RemoteOptions.class); + GrpcRemoteExecutor executor = + new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface); + RemoteSpawnRunner client = + new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor); + + scratch(simpleSpawn.getInputFiles().get(0), "xyz"); + + ExecutionCacheReply reply = ExecutionCacheReply.newBuilder() + .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true)) + .setResult(ActionResult.newBuilder().setReturnCode(0)) + .build(); + when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + OutErr outErr = OutErr.create(out, err); + SpawnResult result = + client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1); + verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class)); + assertThat(result.setupSuccess()).isTrue(); + assertThat(result.exitCode()).isEqualTo(0); + assertThat(out.toByteArray()).isEmpty(); + assertThat(err.toByteArray()).isEmpty(); + } + + @Test + public void cacheHitWithOutput() throws Exception { + FakeCas casIface = new FakeCas(); + GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class); + GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class); + RemoteOptions options = Options.getDefaults(RemoteOptions.class); + GrpcRemoteExecutor executor = + new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface); + RemoteSpawnRunner client = + new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor); + + scratch(simpleSpawn.getInputFiles().get(0), "xyz"); + byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8); + byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8); + ContentDigest stdOutDigest = casIface.put(cacheStdOut); + ContentDigest stdErrDigest = casIface.put(cacheStdErr); + + ExecutionCacheReply reply = ExecutionCacheReply.newBuilder() + .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true)) + .setResult(ActionResult.newBuilder() + .setReturnCode(0) + .setStdoutDigest(stdOutDigest) + .setStderrDigest(stdErrDigest)) + .build(); + when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + OutErr outErr = OutErr.create(out, err); + SpawnResult result = + client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1); + verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class)); + assertThat(result.setupSuccess()).isTrue(); + assertThat(result.exitCode()).isEqualTo(0); + assertThat(out.toByteArray()).isEqualTo(cacheStdOut); + assertThat(err.toByteArray()).isEqualTo(cacheStdErr); + } + + @Test + public void remotelyExecute() throws Exception { + FakeCas casIface = new FakeCas(); + GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class); + GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class); + RemoteOptions options = Options.getDefaults(RemoteOptions.class); + GrpcRemoteExecutor executor = + new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface); + RemoteSpawnRunner client = + new RemoteSpawnRunner(execRoot, eventBus, "workspace", options, executor); + + scratch(simpleSpawn.getInputFiles().get(0), "xyz"); + byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8); + byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8); + ContentDigest stdOutDigest = casIface.put(cacheStdOut); + ContentDigest stdErrDigest = casIface.put(cacheStdErr); + + ExecutionCacheReply reply = ExecutionCacheReply.newBuilder() + .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true)) + .build(); + when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply); + + when(executionIface.execute(any(ExecuteRequest.class))).thenReturn(ImmutableList.of( + ExecuteReply.newBuilder() + .setStatus(ExecutionStatus.newBuilder().setSucceeded(true)) + .setResult(ActionResult.newBuilder() + .setReturnCode(0) + .setStdoutDigest(stdOutDigest) + .setStderrDigest(stdErrDigest)) + .build()).iterator()); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + OutErr outErr = OutErr.create(out, err); + SpawnResult result = + client.exec(simpleSpawn, outErr, fakeFileCache, SIMPLE_ARTIFACT_EXPANDER, /*timeout=*/-1); + verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class)); + assertThat(result.setupSuccess()).isTrue(); + assertThat(result.exitCode()).isEqualTo(0); + assertThat(out.toByteArray()).isEqualTo(cacheStdOut); + assertThat(err.toByteArray()).isEqualTo(cacheStdErr); + } +} |