diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/remote')
11 files changed, 511 insertions, 90 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java index 1076933fa0..2d493e12ea 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java @@ -22,10 +22,7 @@ import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub; import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub; import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; @@ -73,15 +70,14 @@ import java.util.concurrent.atomic.AtomicReference; /** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */ @ThreadSafe -public final class GrpcActionCache implements RemoteActionCache { +public class GrpcActionCache implements RemoteActionCache { + private static final int MAX_MEMORY_KBYTES = 512 * 1024; /** Channel over which to send gRPC CAS queries. */ - private final ManagedChannel channel; - + private final GrpcCasInterface casIface; + private final GrpcExecutionCacheInterface iface; private final RemoteOptions options; - private static final int MAX_MEMORY_KBYTES = 512 * 1024; - /** Reads from multiple sequential inputs and chunks the data into BlobChunks. */ static interface BlobChunkIterator { boolean hasNext(); @@ -204,7 +200,7 @@ public final class GrpcActionCache implements RemoteActionCache { chunk.setOffset(offset); } if (bytesLeft > 0) { - byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)]; + byte[] blob = new byte[(int) Math.min(bytesLeft, options.grpcMaxChunkSizeBytes)]; currentStream.read(blob); chunk.setData(ByteString.copyFrom(blob)); bytesLeft -= blob.length; @@ -294,36 +290,34 @@ public final class GrpcActionCache implements RemoteActionCache { } } + public GrpcActionCache( + RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) { + this.options = options; + this.casIface = casIface; + this.iface = iface; + } + @VisibleForTesting public GrpcActionCache(ManagedChannel channel, RemoteOptions options) { this.options = options; - this.channel = channel; + this.casIface = GrpcInterfaces.casInterface(options.grpcTimeoutSeconds, channel); + this.iface = GrpcInterfaces.executionCacheInterface(options.grpcTimeoutSeconds, channel); } public GrpcActionCache(RemoteOptions options) throws InvalidConfigurationException { - this(RemoteUtils.createChannel(options.remoteCache), options); + this(RemoteUtils.createChannelLegacy(options.remoteCache), options); } public static boolean isRemoteCacheOptions(RemoteOptions options) { return options.remoteCache != null; } - private CasServiceBlockingStub getBlockingStub() { - return CasServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); - } - - private CasServiceStub getStub() { - return CasServiceGrpc.newStub(channel) - .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); - } - private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) { CasLookupRequest.Builder request = CasLookupRequest.newBuilder().addAllDigest(digests); if (request.getDigestCount() == 0) { return ImmutableSet.of(); } - CasStatus status = getBlockingStub().lookup(request.build()).getStatus(); + CasStatus status = casIface.lookup(request.build()).getStatus(); if (!status.getSucceeded() && status.getError() != CasStatus.ErrorCode.MISSING_DIGEST) { // TODO(olaola): here and below, add basic retry logic on transient errors! throw new RuntimeException(status.getErrorDetail()); @@ -350,7 +344,7 @@ public final class GrpcActionCache implements RemoteActionCache { if (!treeNodes.isEmpty()) { CasUploadTreeMetadataRequest.Builder metaRequest = CasUploadTreeMetadataRequest.newBuilder().addAllTreeNode(treeNodes); - CasUploadTreeMetadataReply reply = getBlockingStub().uploadTreeMetadata(metaRequest.build()); + CasUploadTreeMetadataReply reply = casIface.uploadTreeMetadata(metaRequest.build()); if (!reply.getStatus().getSucceeded()) { throw new RuntimeException(reply.getStatus().getErrorDetail()); } @@ -389,6 +383,9 @@ public final class GrpcActionCache implements RemoteActionCache { @Override public void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException { + if (result.getOutputList().isEmpty()) { + return; + } // Send all the file requests in a single synchronous batch. // TODO(olaola): profile to maybe replace with separate concurrent requests. CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder(); @@ -410,7 +407,7 @@ public final class GrpcActionCache implements RemoteActionCache { downloadTree(output.getDigest(), path); } } - Iterator<CasDownloadReply> replies = getBlockingStub().downloadBlob(request.build()); + Iterator<CasDownloadReply> replies = casIface.downloadBlob(request.build()); Set<ContentDigest> results = new HashSet<>(); while (replies.hasNext()) { results.add(createFileFromStream(metadataMap, replies)); @@ -521,24 +518,6 @@ public final class GrpcActionCache implements RemoteActionCache { return digest; } - /** - * Download a blob keyed by the given digest and write it to the specified path. Set the - * executable parameter to the specified value. - */ - @Override - public void downloadFileContents(ContentDigest digest, Path dest, boolean executable) - throws IOException, CacheNotFoundException { - // Send all the file requests in a single synchronous batch. - // TODO(olaola): profile to maybe replace with separate concurrent requests. - CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder().addDigest(digest); - Iterator<CasDownloadReply> replies = getBlockingStub().downloadBlob(request.build()); - FileMetadata fileMetadata = - FileMetadata.newBuilder().setDigest(digest).setExecutable(executable).build(); - Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap = new HashMap<>(); - metadataMap.put(digest, Pair.of(dest, fileMetadata)); - createFileFromStream(metadataMap, replies); - } - static class UploadBlobReplyStreamObserver implements StreamObserver<CasUploadBlobReply> { private final CountDownLatch finishLatch; private final AtomicReference<RuntimeException> exception; @@ -579,7 +558,6 @@ public final class GrpcActionCache implements RemoteActionCache { int currentBatchBytes = 0; int batchedInputs = 0; int batches = 0; - CasServiceStub stub = getStub(); try { while (blobs.hasNext()) { BlobChunk chunk = blobs.next(); @@ -596,7 +574,7 @@ public final class GrpcActionCache implements RemoteActionCache { } batches++; responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception); - requestObserver = stub.uploadBlob(responseObserver); + requestObserver = casIface.uploadBlobAsync(responseObserver); } batchedInputs++; } @@ -682,7 +660,7 @@ public final class GrpcActionCache implements RemoteActionCache { Map<ContentDigest, byte[]> results = new HashMap<>(); int digestCount = request.getDigestCount(); if (digestCount > 0) { - replies = getBlockingStub().downloadBlob(request.build()); + replies = casIface.downloadBlob(request.build()); while (digestCount-- > 0) { Preconditions.checkArgument(replies.hasNext()); CasDownloadReply reply = replies.next(); @@ -733,12 +711,9 @@ public final class GrpcActionCache implements RemoteActionCache { /** Returns a cached result for a given Action digest, or null if not found in cache. */ @Override public ActionResult getCachedActionResult(ActionKey actionKey) { - ExecutionCacheServiceBlockingStub stub = - ExecutionCacheServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); ExecutionCacheRequest request = ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build(); - ExecutionCacheReply reply = stub.getCachedResult(request); + ExecutionCacheReply reply = iface.getCachedResult(request); ExecutionCacheStatus status = reply.getStatus(); if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.MISSING_RESULT) { @@ -751,15 +726,12 @@ public final class GrpcActionCache implements RemoteActionCache { @Override public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException { - ExecutionCacheServiceBlockingStub stub = - ExecutionCacheServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); ExecutionCacheSetRequest request = ExecutionCacheSetRequest.newBuilder() .setActionDigest(actionKey.getDigest()) .setResult(result) .build(); - ExecutionCacheSetReply reply = stub.setCachedResult(request); + ExecutionCacheSetReply reply = iface.setCachedResult(request); ExecutionCacheStatus status = reply.getStatus(); if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.UNSUPPORTED) { throw new RuntimeException(status.getErrorDetail()); diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java new file mode 100644 index 0000000000..529ff9c3db --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java @@ -0,0 +1,43 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; +import io.grpc.stub.StreamObserver; +import java.util.Iterator; + +/** + * An abstraction layer between the remote execution client and gRPC to support unit testing. This + * interface covers the CAS RPC methods, see {@link CasServiceBlockingStub} and + * {@link CasServiceStub}. + */ +public interface GrpcCasInterface { + CasLookupReply lookup(CasLookupRequest request); + CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request); + CasDownloadTreeMetadataReply downloadTreeMetadata(CasDownloadTreeMetadataRequest request); + Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request); + StreamObserver<CasUploadBlobRequest> uploadBlobAsync( + StreamObserver<CasUploadBlobReply> responseObserver); +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java new file mode 100644 index 0000000000..375c2ab359 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java @@ -0,0 +1,29 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; + +/** + * An abstraction layer between the remote execution client and gRPC to support unit testing. This + * interface covers the execution cache RPC methods, see {@link ExecutionCacheServiceBlockingStub}. + */ +public interface GrpcExecutionCacheInterface { + ExecutionCacheReply getCachedResult(ExecutionCacheRequest request); + ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request); +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java new file mode 100644 index 0000000000..fd44792b4a --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java @@ -0,0 +1,27 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import java.util.Iterator; + +/** + * An abstraction layer between the remote execution client and gRPC to support unit testing. This + * interface covers the remote execution RPC methods, see {@link ExecuteServiceBlockingStub}. + */ +public interface GrpcExecutionInterface { + Iterator<ExecuteReply> execute(ExecuteRequest request); +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java new file mode 100644 index 0000000000..73ab0372d2 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java @@ -0,0 +1,129 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub; +import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; +import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +/** + * Implementations of the gRPC interfaces that actually talk to gRPC. + */ +public class GrpcInterfaces { + /** + * Create a {@link GrpcCasInterface} instance that actually talks to gRPC. + */ + public static GrpcCasInterface casInterface( + final int grpcTimeoutSeconds, final ManagedChannel channel) { + return new GrpcCasInterface() { + private CasServiceBlockingStub getCasServiceBlockingStub() { + return CasServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + } + + private CasServiceStub getCasServiceStub() { + return CasServiceGrpc.newStub(channel) + .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + } + + @Override + public CasLookupReply lookup(CasLookupRequest request) { + return getCasServiceBlockingStub().lookup(request); + } + + @Override + public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) { + return getCasServiceBlockingStub().uploadTreeMetadata(request); + } + + @Override + public CasDownloadTreeMetadataReply downloadTreeMetadata( + CasDownloadTreeMetadataRequest request) { + return getCasServiceBlockingStub().downloadTreeMetadata(request); + } + + @Override + public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) { + return getCasServiceBlockingStub().downloadBlob(request); + } + + @Override + public StreamObserver<CasUploadBlobRequest> uploadBlobAsync( + StreamObserver<CasUploadBlobReply> responseObserver) { + return getCasServiceStub().uploadBlob(responseObserver); + } + }; + } + + /** + * Create a {@link GrpcCasInterface} instance that actually talks to gRPC. + */ + public static GrpcExecutionCacheInterface executionCacheInterface( + final int grpcTimeoutSeconds, final ManagedChannel channel) { + return new GrpcExecutionCacheInterface() { + private ExecutionCacheServiceBlockingStub getExecutionCacheServiceBlockingStub() { + return ExecutionCacheServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + } + + @Override + public ExecutionCacheReply getCachedResult(ExecutionCacheRequest request) { + return getExecutionCacheServiceBlockingStub().getCachedResult(request); + } + + @Override + public ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request) { + return getExecutionCacheServiceBlockingStub().setCachedResult(request); + } + }; + } + + /** + * Create a {@link GrpcExecutionInterface} instance that actually talks to gRPC. + */ + public static GrpcExecutionInterface executionInterface( + final int grpcTimeoutSeconds, final ManagedChannel channel) { + return new GrpcExecutionInterface() { + @Override + public Iterator<ExecuteReply> execute(ExecuteRequest request) { + ExecuteServiceBlockingStub stub = + ExecuteServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter( + grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); + return stub.execute(request); + } + }; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index 3635ab7def..d3f8cbbfe7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -14,38 +14,41 @@ package com.google.devtools.build.lib.remote; -import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; import io.grpc.ManagedChannel; import java.util.Iterator; -import java.util.concurrent.TimeUnit; /** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */ @ThreadSafe -public class RemoteWorkExecutor { - /** Channel over which to send work to run remotely. */ - private final ManagedChannel channel; - private final RemoteOptions options; +public class GrpcRemoteExecutor extends GrpcActionCache { + public static boolean isRemoteExecutionOptions(RemoteOptions options) { + return options.remoteWorker != null; + } - public RemoteWorkExecutor(RemoteOptions options) throws InvalidConfigurationException { - this.options = options; - channel = RemoteUtils.createChannel(options.remoteWorker); + private final GrpcExecutionInterface executionIface; + + public GrpcRemoteExecutor( + RemoteOptions options, + GrpcCasInterface casIface, + GrpcExecutionCacheInterface cacheIface, + GrpcExecutionInterface executionIface) { + super(options, casIface, cacheIface); + this.executionIface = executionIface; } - public static boolean isRemoteExecutionOptions(RemoteOptions options) { - return options.remoteWorker != null; + public GrpcRemoteExecutor(ManagedChannel channel, RemoteOptions options) { + super( + options, + GrpcInterfaces.casInterface(options.grpcTimeoutSeconds, channel), + GrpcInterfaces.executionCacheInterface(options.grpcTimeoutSeconds, channel)); + this.executionIface = GrpcInterfaces.executionInterface(options.grpcTimeoutSeconds, channel); } public ExecuteReply executeRemotely(ExecuteRequest request) { - ExecuteServiceBlockingStub stub = - ExecuteServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter( - options.grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); - Iterator<ExecuteReply> replies = stub.execute(request); + Iterator<ExecuteReply> replies = executionIface.execute(request); ExecuteReply reply = null; while (replies.hasNext()) { reply = replies.next(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java index 6dff9b6288..44a63c4d40 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java @@ -80,13 +80,6 @@ interface RemoteActionCache { ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException, InterruptedException; - /** - * Download a blob keyed by the given digest and write it to the specified path. Set the - * executable parameter to the specified value. - */ - void downloadFileContents(ContentDigest digest, Path dest, boolean executable) - throws IOException, CacheNotFoundException; - /** Upload the given blobs to the cache, and return their digests. */ ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException; diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java new file mode 100644 index 0000000000..3e70003f32 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -0,0 +1,220 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.eventbus.EventBus; +import com.google.devtools.build.lib.actions.ActionExecutionMetadata; +import com.google.devtools.build.lib.actions.ActionInput; +import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.devtools.build.lib.actions.ActionStatusMessage; +import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander; +import com.google.devtools.build.lib.actions.Spawn; +import com.google.devtools.build.lib.exec.SpawnInputExpander; +import com.google.devtools.build.lib.exec.SpawnResult; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.Action; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.Command; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import com.google.devtools.build.lib.remote.RemoteProtocol.Platform; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.protobuf.TextFormat; +import com.google.protobuf.TextFormat.ParseException; +import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeSet; + +/** + * A client for the remote execution service. + */ +final class RemoteSpawnRunner { + private final EventBus eventBus; + private final Path execRoot; + private final RemoteOptions options; + // TODO(olaola): This will be set on a per-action basis instead. + private final Platform platform; + private final String workspaceName; + private final SpawnInputExpander spawnInputExpander = new SpawnInputExpander(/*strict=*/false); + + private final GrpcRemoteExecutor executor; + + RemoteSpawnRunner( + Path execRoot, + EventBus eventBus, + String workspaceName, + RemoteOptions options, + GrpcRemoteExecutor executor) { + this.execRoot = execRoot; + this.eventBus = eventBus; + this.workspaceName = workspaceName; + this.options = options; + if (options.experimentalRemotePlatformOverride != null) { + Platform.Builder platformBuilder = Platform.newBuilder(); + try { + TextFormat.getParser().merge(options.experimentalRemotePlatformOverride, platformBuilder); + } catch (ParseException e) { + throw new RuntimeException("Failed to parse --experimental_remote_platform_override", e); + } + platform = platformBuilder.build(); + } else { + platform = null; + } + this.executor = executor; + } + + RemoteSpawnRunner( + Path execRoot, + EventBus eventBus, + String workspaceName, + RemoteOptions options) { + this(execRoot, eventBus, workspaceName, options, connect(options)); + } + + private static GrpcRemoteExecutor connect(RemoteOptions options) { + Preconditions.checkArgument(GrpcRemoteExecutor.isRemoteExecutionOptions(options)); + ManagedChannel channel; + try { + channel = RemoteUtils.createChannel(options.remoteWorker); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + return new GrpcRemoteExecutor(channel, options); + } + + public SpawnResult exec( + Spawn spawn, + // TODO(ulfjack): Change this back to FileOutErr. + OutErr outErr, + ActionInputFileCache actionInputFileCache, + ArtifactExpander artifactExpander, + float timeout) throws InterruptedException, IOException { + ActionExecutionMetadata owner = spawn.getResourceOwner(); + if (owner.getOwner() != null) { + eventBus.post(ActionStatusMessage.runningStrategy(owner, "remote")); + } + + try { + // Temporary hack: the TreeNodeRepository should be created and maintained upstream! + TreeNodeRepository repository = + new TreeNodeRepository(execRoot, actionInputFileCache); + SortedMap<PathFragment, ActionInput> inputMap = + spawnInputExpander.getInputMapping( + spawn, + artifactExpander, + actionInputFileCache, + workspaceName); + TreeNode inputRoot = repository.buildFromActionInputs(inputMap); + repository.computeMerkleDigests(inputRoot); + Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment()); + Action action = + buildAction( + spawn.getOutputFiles(), + ContentDigests.computeDigest(command), + repository.getMerkleDigest(inputRoot)); + + ActionKey actionKey = ContentDigests.computeActionKey(action); + ActionResult result = + this.options.remoteAcceptCached ? executor.getCachedActionResult(actionKey) : null; + if (result == null) { + // Cache miss or we don't accept cache hits. + // Upload the command and all the inputs into the remote cache. + executor.uploadBlob(command.toByteArray()); + // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests! + executor.uploadTree(repository, execRoot, inputRoot); + // TODO(olaola): set BuildInfo and input total bytes as well. + ExecuteRequest.Builder request = + ExecuteRequest.newBuilder() + .setAction(action) + .setAcceptCached(this.options.remoteAcceptCached) + .setTotalInputFileCount(inputMap.size()) + .setTimeoutMillis((int) (1000 * timeout)); + ExecuteReply reply = executor.executeRemotely(request.build()); + ExecutionStatus status = reply.getStatus(); + + if (!status.getSucceeded() + && (status.getError() != ExecutionStatus.ErrorCode.EXEC_FAILED)) { + return new SpawnResult.Builder() + .setSetupSuccess(false) + .setExitCode(-1) + .build(); + } + + result = reply.getResult(); + } + + // TODO(ulfjack): Download stdout, stderr, and the output files in a single call. + passRemoteOutErr(executor, result, outErr); + executor.downloadAllResults(result, execRoot); + return new SpawnResult.Builder() + .setSetupSuccess(true) + .setExitCode(result.getReturnCode()) + .build(); + } catch (StatusRuntimeException e) { + throw new IOException(e); + } catch (CacheNotFoundException e) { + throw new IOException(e); + } + } + + private Action buildAction( + Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) { + Action.Builder action = Action.newBuilder(); + action.setCommandDigest(command); + action.setInputRootDigest(inputRoot); + // Somewhat ugly: we rely on the stable order of outputs here for remote action caching. + for (ActionInput output : outputs) { + action.addOutputPath(output.getExecPathString()); + } + if (platform != null) { + action.setPlatform(platform); + } + return action.build(); + } + + private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) { + Command.Builder command = Command.newBuilder(); + command.addAllArgv(arguments); + // Sorting the environment pairs by variable name. + TreeSet<String> variables = new TreeSet<>(environment.keySet()); + for (String var : variables) { + command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var)); + } + return command.build(); + } + + private static void passRemoteOutErr( + RemoteActionCache cache, ActionResult result, OutErr outErr) throws CacheNotFoundException { + ImmutableList<byte[]> streams = + cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); + outErr.printOut(new String(streams.get(0), UTF_8)); + outErr.printErr(new String(streams.get(1), UTF_8)); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index ee6f2a2518..df91471a5a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -199,7 +199,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext { EventHandler eventHandler = executor.getEventHandler(); RemoteActionCache actionCache = null; - RemoteWorkExecutor workExecutor = null; + GrpcRemoteExecutor workExecutor = null; if (spawn.isRemotable()) { // Initialize remote cache and execution handlers. We use separate handlers for every // action to enable server-side parallelism (need a different gRPC channel per action). @@ -211,8 +211,9 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } // Otherwise actionCache remains null and remote caching/execution are disabled. - if (actionCache != null && RemoteWorkExecutor.isRemoteExecutionOptions(options)) { - workExecutor = new RemoteWorkExecutor(options); + if (actionCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(options)) { + workExecutor = new GrpcRemoteExecutor( + RemoteUtils.createChannelLegacy(options.remoteWorker), options); } } catch (InvalidConfigurationException e) { eventHandler.handle(Event.warn(e.toString())); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java index 9747e3ef11..89ecf43b2e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java @@ -24,19 +24,24 @@ import java.net.URISyntaxException; /** Helper methods for gRPC calls */ @ThreadSafe public final class RemoteUtils { - public static ManagedChannel createChannel(String hostAndPort) + public static ManagedChannel createChannelLegacy(String hostAndPort) throws InvalidConfigurationException { try { - URI uri = new URI("dummy://" + hostAndPort); - if (uri.getHost() == null || uri.getPort() == -1) { - throw new URISyntaxException("Invalid host or port.", ""); - } - return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()) - .usePlaintext(true) - .build(); + return createChannel(hostAndPort); } catch (URISyntaxException e) { throw new InvalidConfigurationException( "Invalid argument for the address of remote cache server: " + hostAndPort); } } + + public static ManagedChannel createChannel(String hostAndPort) + throws URISyntaxException { + URI uri = new URI("dummy://" + hostAndPort); + if (uri.getHost() == null || uri.getPort() == -1) { + throw new URISyntaxException("Invalid host or port.", ""); + } + return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()) + .usePlaintext(true) + .build(); + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index 6d7e218773..6eb90fa4a1 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -130,8 +130,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } } - @Override - public void downloadFileContents(ContentDigest digest, Path dest, boolean executable) + private void downloadFileContents(ContentDigest digest, Path dest, boolean executable) throws IOException, CacheNotFoundException { // This unconditionally downloads the whole file into memory first! byte[] contents = downloadBlob(digest); |