diff options
author | 2018-07-12 04:01:45 -0700 | |
---|---|---|
committer | 2018-07-12 04:02:54 -0700 | |
commit | b50fe8607a94579e30561a7cfb65315e71a50238 (patch) | |
tree | 1e1f7a2d28936561fc336af2d4b315d758a66e36 /src/main/java/com/google/devtools/build/lib/remote | |
parent | ef3fed75e478fe63d7a0b9cd61c42d3c5aee509b (diff) |
remote: add a ByteStreamBuildEventArtifactUploader
This change allows local files referenced by the BEP/BES protocol
to be uploaded to a ByteStream gRPC service.
The ByteStreamUploader is now implicitly also used by the BES
module which has a different lifecycle than the remote module.
We introduce reference counting to ensure that the channel is
closed after its no longer needed. This also fixes a bug where
we currently leak one socket per remote build until the Bazel
server is shut down.
RELNOTES: None
PiperOrigin-RevId: 204275316
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/remote')
9 files changed, 536 insertions, 117 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java new file mode 100644 index 0000000000..3f3308c5e9 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java @@ -0,0 +1,144 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile; +import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; +import com.google.devtools.build.lib.buildeventstream.PathConverter; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.remoteexecution.v1test.Digest; +import io.grpc.Context; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + +/** + * A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}. + */ +class ByteStreamBuildEventArtifactUploader implements BuildEventArtifactUploader { + + private final Context ctx; + private final ByteStreamUploader uploader; + private final String remoteServerInstanceName; + + private final AtomicBoolean shutdown = new AtomicBoolean(); + + ByteStreamBuildEventArtifactUploader( + ByteStreamUploader uploader, String remoteServerName, Context ctx, + @Nullable String remoteInstanceName) { + this.uploader = Preconditions.checkNotNull(uploader); + String remoteServerInstanceName = Preconditions.checkNotNull(remoteServerName); + if (!Strings.isNullOrEmpty(remoteInstanceName)) { + remoteServerInstanceName += "/" + remoteInstanceName; + } + this.ctx = ctx; + this.remoteServerInstanceName = remoteServerInstanceName; + } + + @Override + public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) { + if (files.isEmpty()) { + return Futures.immediateFuture(PathConverter.NO_CONVERSION); + } + List<ListenableFuture<PathDigestPair>> uploads = new ArrayList<>(files.size()); + + Context prevCtx = ctx.attach(); + try { + for (Path file : files.keySet()) { + Chunker chunker = new Chunker(file); + Digest digest = chunker.digest(); + ListenableFuture<PathDigestPair> upload = + Futures.transform( + uploader.uploadBlobAsync(chunker, /*forceUpload=*/false), + unused -> new PathDigestPair(file, digest), + MoreExecutors.directExecutor()); + uploads.add(upload); + } + + return Futures.transform(Futures.allAsList(uploads), + (uploadsDone) -> new PathConverterImpl(remoteServerInstanceName, uploadsDone), + MoreExecutors.directExecutor()); + } catch (IOException e) { + return Futures.immediateFailedFuture(e); + } finally { + ctx.detach(prevCtx); + } + } + + @Override + public void shutdown() { + if (shutdown.getAndSet(true)) { + return; + } + uploader.release(); + } + + private static class PathConverterImpl implements PathConverter { + + private final String remoteServerInstanceName; + private final Map<Path, Digest> pathToDigest; + + PathConverterImpl(String remoteServerInstanceName, + List<PathDigestPair> uploads) { + Preconditions.checkNotNull(uploads); + this.remoteServerInstanceName = remoteServerInstanceName; + pathToDigest = new HashMap<>(uploads.size()); + for (PathDigestPair pair : uploads) { + pathToDigest.put(pair.getPath(), pair.getDigest()); + } + } + + @Override + public String apply(Path path) { + Preconditions.checkNotNull(path); + Digest digest = pathToDigest.get(path); + if (digest == null) { + // It's a programming error to reference a file that has not been uploaded. + throw new IllegalStateException( + String.format("Illegal file reference: '%s'", path.getPathString())); + } + return String.format( + "bytestream://%s/blobs/%s/%d", + remoteServerInstanceName, digest.getHash(), digest.getSizeBytes()); + } + } + + private static class PathDigestPair { + + private final Path path; + private final Digest digest; + + PathDigestPair(Path path, Digest digest) { + this.path = path; + this.digest = digest; + } + + public Path getPath() { + return path; + } + + public Digest getDigest() { + return digest; + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java new file mode 100644 index 0000000000..be43302ad0 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java @@ -0,0 +1,47 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; +import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploaderFactory; +import com.google.devtools.common.options.OptionsProvider; +import io.grpc.Context; +import javax.annotation.Nullable; + +/** + * A factory for {@link ByteStreamBuildEventArtifactUploader}. + */ +class ByteStreamBuildEventArtifactUploaderFactory implements + BuildEventArtifactUploaderFactory { + + private final ByteStreamUploader uploader; + private final String remoteServerName; + private final Context ctx; + private final @Nullable String remoteInstanceName; + + ByteStreamBuildEventArtifactUploaderFactory( + ByteStreamUploader uploader, String remoteServerName, Context ctx, + @Nullable String remoteInstanceName) { + this.uploader = uploader; + this.remoteServerName = remoteServerName; + this.ctx = ctx; + this.remoteInstanceName = remoteInstanceName; + } + + @Override + public BuildEventArtifactUploader create(OptionsProvider options) { + return new ByteStreamBuildEventArtifactUploader(uploader.retain(), remoteServerName, ctx, + remoteInstanceName); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 001ba22713..42129a447a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -26,6 +26,8 @@ import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.google.common.hash.HashCode; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -39,11 +41,15 @@ import io.grpc.ClientCall; import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Status; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -55,20 +61,27 @@ import javax.annotation.concurrent.GuardedBy; /** * A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. * - * <p>Users must call {@link #shutdown()} before exiting. + * <p>The uploader supports reference counting to easily be shared between components with + * different lifecyles. After instantiation the reference coune is {@code 1}. + * + * See {@link ReferenceCounted} for more information on reference counting. */ -final class ByteStreamUploader { +class ByteStreamUploader extends AbstractReferenceCounted { private static final Logger logger = Logger.getLogger(ByteStreamUploader.class.getName()); private final String instanceName; - private final Channel channel; + private final ReferenceCountedChannel channel; private final CallCredentials callCredentials; private final long callTimeoutSecs; private final RemoteRetrier retrier; private final Object lock = new Object(); + /** Contains the hash codes of already uploaded blobs. **/ + @GuardedBy("lock") + private final Set<HashCode> uploadedBlobs = new HashSet<>(); + @GuardedBy("lock") private final Map<Digest, ListenableFuture<Void>> uploadsInProgress = new HashMap<>(); @@ -89,7 +102,7 @@ final class ByteStreamUploader { */ public ByteStreamUploader( @Nullable String instanceName, - Channel channel, + ReferenceCountedChannel channel, @Nullable CallCredentials callCredentials, long callTimeoutSecs, RemoteRetrier retrier) { @@ -112,11 +125,15 @@ final class ByteStreamUploader { * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being * performed. This is transparent to the user of this API. * + * @param chunker the data to upload. + * @param forceUpload if {@code false} the blob is not uploaded if it has previously been + * uploaded, if {@code true} the blob is uploaded. * @throws IOException when reading of the {@link Chunker}s input source fails * @throws RetryException when the upload failed after a retry */ - public void uploadBlob(Chunker chunker) throws IOException, InterruptedException { - uploadBlobs(singletonList(chunker)); + public void uploadBlob(Chunker chunker, boolean forceUpload) throws IOException, + InterruptedException { + uploadBlobs(singletonList(chunker), forceUpload); } /** @@ -131,14 +148,18 @@ final class ByteStreamUploader { * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being * performed. This is transparent to the user of this API. * + * @param chunkers the data to upload. + * @param forceUpload if {@code false} the blob is not uploaded if it has previously been + * uploaded, if {@code true} the blob is uploaded. * @throws IOException when reading of the {@link Chunker}s input source fails * @throws RetryException when the upload failed after a retry */ - public void uploadBlobs(Iterable<Chunker> chunkers) throws IOException, InterruptedException { + public void uploadBlobs(Iterable<Chunker> chunkers, boolean forceUpload) throws IOException, + InterruptedException { List<ListenableFuture<Void>> uploads = new ArrayList<>(); for (Chunker chunker : chunkers) { - uploads.add(uploadBlobAsync(chunker)); + uploads.add(uploadBlobAsync(chunker, forceUpload)); } try { @@ -162,9 +183,11 @@ final class ByteStreamUploader { * Cancels all running uploads. The method returns immediately and does NOT wait for the uploads * to be cancelled. * - * <p>This method must be the last method called. + * <p>This method should not be called directly, but will be called implicitly when the + * reference count reaches {@code 0}. */ - public void shutdown() { + @VisibleForTesting + void shutdown() { synchronized (lock) { if (isShutdown) { return; @@ -180,13 +203,33 @@ final class ByteStreamUploader { } } - @VisibleForTesting - ListenableFuture<Void> uploadBlobAsync(Chunker chunker) { + /** + * Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns + * immediately and one can listen to the returned future for the success/failure of the upload. + * + * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. + * + * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being + * performed. This is transparent to the user of this API. + * + * @param chunker the data to upload. + * @param forceUpload if {@code false} the blob is not uploaded if it has previously been + * uploaded, if {@code true} the blob is uploaded. + * @throws IOException when reading of the {@link Chunker}s input source fails + * @throws RetryException when the upload failed after a retry + */ + public ListenableFuture<Void> uploadBlobAsync(Chunker chunker, boolean forceUpload) { Digest digest = checkNotNull(chunker.digest()); + HashCode hash = HashCode.fromString(digest.getHash()); synchronized (lock) { checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); + if (!forceUpload && uploadedBlobs.contains(hash)) { + return Futures.immediateFuture(null); + } + ListenableFuture<Void> inProgress = uploadsInProgress.get(digest); if (inProgress != null) { return inProgress; @@ -197,6 +240,7 @@ final class ByteStreamUploader { () -> { synchronized (lock) { uploadsInProgress.remove(digest); + uploadedBlobs.add(hash); } }, MoreExecutors.directExecutor()); @@ -243,6 +287,27 @@ final class ByteStreamUploader { return currUpload; } + @Override + public ByteStreamUploader retain() { + return (ByteStreamUploader) super.retain(); + } + + @Override + public ByteStreamUploader retain(int increment) { + return (ByteStreamUploader) super.retain(increment); + } + + @Override + protected void deallocate() { + shutdown(); + channel.release(); + } + + @Override + public ReferenceCounted touch(Object o) { + return this; + } + private static class AsyncUpload { private final Channel channel; diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java index 48bd4c8845..253ab41d83 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java @@ -49,7 +49,6 @@ import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest; import io.grpc.CallCredentials; -import io.grpc.Channel; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -61,30 +60,31 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */ @ThreadSafe public class GrpcRemoteCache extends AbstractRemoteActionCache { private final CallCredentials credentials; - private final Channel channel; + private final ReferenceCountedChannel channel; private final RemoteRetrier retrier; private final ByteStreamUploader uploader; + private AtomicBoolean closed = new AtomicBoolean(); + @VisibleForTesting public GrpcRemoteCache( - Channel channel, + ReferenceCountedChannel channel, CallCredentials credentials, RemoteOptions options, RemoteRetrier retrier, - DigestUtil digestUtil) { + DigestUtil digestUtil, + ByteStreamUploader uploader) { super(options, digestUtil, retrier); this.credentials = credentials; this.channel = channel; this.retrier = retrier; - - uploader = - new ByteStreamUploader( - options.remoteInstanceName, channel, credentials, options.remoteTimeout, retrier); + this.uploader = uploader; } private ContentAddressableStorageBlockingStub casBlockingStub() { @@ -110,7 +110,11 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache { @Override public void close() { - uploader.shutdown(); + if (closed.getAndSet(true)) { + return; + } + uploader.release(); + channel.release(); } public static boolean isRemoteCacheOptions(RemoteOptions options) { @@ -168,7 +172,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache { toUpload.add(new Chunker(actionInput, inputFileCache, execRoot, digestUtil)); } } - uploader.uploadBlobs(toUpload); + uploader.uploadBlobs(toUpload, true); } @Override @@ -293,7 +297,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache { } if (!filesToUpload.isEmpty()) { - uploader.uploadBlobs(filesToUpload); + uploader.uploadBlobs(filesToUpload, /*forceUpload=*/true); } // TODO(olaola): inline small stdout/stderr here. @@ -317,7 +321,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache { Digest digest = digestUtil.compute(file); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploader.uploadBlob(new Chunker(file)); + uploader.uploadBlob(new Chunker(file), true); } return digest; } @@ -333,7 +337,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache { Digest digest = DigestUtil.getFromInputCache(input, inputCache); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploader.uploadBlob(new Chunker(input, inputCache, execRoot, digestUtil)); + uploader.uploadBlob(new Chunker(input, inputCache, execRoot, digestUtil), true); } return digest; } @@ -342,7 +346,7 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache { Digest digest = digestUtil.compute(blob); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploader.uploadBlob(new Chunker(blob, digestUtil)); + uploader.uploadBlob(new Chunker(blob, digestUtil), true); } return digest; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index c98384c357..858f5748f5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -30,26 +30,29 @@ import com.google.watcher.v1.Request; import com.google.watcher.v1.WatcherGrpc; import com.google.watcher.v1.WatcherGrpc.WatcherBlockingStub; import io.grpc.CallCredentials; -import io.grpc.Channel; +import io.grpc.ManagedChannel; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; /** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */ @ThreadSafe class GrpcRemoteExecutor { - private final Channel channel; + private final ManagedChannel channel; private final CallCredentials callCredentials; private final int callTimeoutSecs; private final RemoteRetrier retrier; + private final AtomicBoolean closed = new AtomicBoolean(); + public GrpcRemoteExecutor( - Channel channel, + ManagedChannel channel, @Nullable CallCredentials callCredentials, int callTimeoutSecs, RemoteRetrier retrier) { @@ -73,7 +76,7 @@ class GrpcRemoteExecutor { .withCallCredentials(callCredentials); } - private void handleStatus(Status statusProto, @Nullable ExecuteResponse resp) throws IOException { + private void handleStatus(Status statusProto, @Nullable ExecuteResponse resp) { if (statusProto.getCode() == Code.OK.value()) { return; } @@ -206,4 +209,11 @@ class GrpcRemoteExecutor { }); }); } + + public void close() { + if (closed.getAndSet(true)) { + return; + } + channel.shutdown(); + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java new file mode 100644 index 0000000000..eff9621da1 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java @@ -0,0 +1,125 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; +import java.util.concurrent.TimeUnit; + +/** A wrapper around a {@link io.grpc.ManagedChannel} exposing a reference count. + * When instantiated the reference count is 1. {@link ManagedChannel#shutdown()} will be called + * on the wrapped channel when the reference count reaches 0. + * + * See {@link ReferenceCounted} for more information about reference counting. + */ +class ReferenceCountedChannel extends ManagedChannel implements ReferenceCounted { + + private final ManagedChannel channel; + private final AbstractReferenceCounted referenceCounted = new AbstractReferenceCounted() { + @Override + protected void deallocate() { + channel.shutdown(); + } + + @Override + public ReferenceCounted touch(Object o) { + return this; + } + }; + + public ReferenceCountedChannel(ManagedChannel channel) { + this.channel = channel; + } + + @Override + public ManagedChannel shutdown() { + throw new UnsupportedOperationException("Don't call shutdown() directly, but use release() " + + "instead."); + } + + @Override + public boolean isShutdown() { + return channel.isShutdown(); + } + + @Override + public boolean isTerminated() { + return channel.isTerminated(); + } + + @Override + public ManagedChannel shutdownNow() { + throw new UnsupportedOperationException("Don't call shutdownNow() directly, but use release() " + + "instead."); + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException { + return channel.awaitTermination(l, timeUnit); + } + + @Override + public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( + MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { + return channel.<RequestT, ResponseT>newCall(methodDescriptor, callOptions); + } + + @Override + public String authority() { + return channel.authority(); + } + + @Override + public int refCnt() { + return referenceCounted.refCnt(); + } + + @Override + public ReferenceCountedChannel retain() { + referenceCounted.retain(); + return this; + } + + @Override + public ReferenceCountedChannel retain(int increment) { + referenceCounted.retain(increment); + return this; + } + + @Override + public ReferenceCounted touch() { + referenceCounted.touch(); + return this; + } + + @Override + public ReferenceCounted touch(Object hint) { + referenceCounted.touch(hint); + return this; + } + + @Override + public boolean release() { + return referenceCounted.release(); + } + + @Override + public boolean release(int decrement) { + return referenceCounted.release(decrement); + } +}
\ No newline at end of file diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java index 145ae9bd8e..88bc907a1e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java @@ -120,5 +120,8 @@ final class RemoteActionContextProvider extends ActionContextProvider { if (cache != null) { cache.close(); } + if (executor != null) { + executor.close(); + } } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index ca1b2b51bc..7141d35621 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -14,23 +14,21 @@ package com.google.devtools.build.lib.remote; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.authandtls.GoogleAuthUtils; -import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; -import com.google.devtools.build.lib.buildeventstream.PathConverter; +import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploaderFactory; import com.google.devtools.build.lib.buildtool.BuildRequest; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.exec.ExecutorBuilder; import com.google.devtools.build.lib.remote.Retrier.RetryException; import com.google.devtools.build.lib.remote.logging.LoggingInterceptor; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.runtime.BlazeModule; import com.google.devtools.build.lib.runtime.Command; import com.google.devtools.build.lib.runtime.CommandEnvironment; @@ -43,17 +41,19 @@ import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.common.options.OptionsBase; import com.google.devtools.common.options.OptionsProvider; -import com.google.devtools.remoteexecution.v1test.Digest; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.PreconditionFailure; import com.google.rpc.PreconditionFailure.Violation; -import io.grpc.Channel; -import io.grpc.ClientInterceptors; +import io.grpc.CallCredentials; +import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.ManagedChannel; import io.grpc.Status.Code; import io.grpc.protobuf.StatusProto; import java.io.IOException; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Executors; import java.util.function.Predicate; import java.util.logging.Logger; @@ -63,63 +63,16 @@ public final class RemoteModule extends BlazeModule { private static final Logger logger = Logger.getLogger(RemoteModule.class.getName()); private AsynchronousFileOutputStream rpcLogFile; - @VisibleForTesting - static final class CasPathConverter implements PathConverter { - // Not final; unfortunately, the Bazel startup process requires us to create this object before - // we have the options available, so we have to create it first, and then set the options - // afterwards. At the time of this writing, I believe that we aren't using the PathConverter - // before the options are available, so this should be safe. - // TODO(ulfjack): Change the Bazel startup process to make the options available when we create - // the PathConverter. - RemoteOptions options; - DigestUtil digestUtil; - PathConverter fallbackConverter = new FileUriPathConverter(); - - @Override - public String apply(Path path) { - if (options == null || digestUtil == null || !remoteEnabled(options)) { - return fallbackConverter.apply(path); - } - String server = options.remoteCache; - String remoteInstanceName = options.remoteInstanceName; - try { - Digest digest = digestUtil.compute(path); - return remoteInstanceName.isEmpty() - ? String.format( - "bytestream://%s/blobs/%s/%d", server, digest.getHash(), digest.getSizeBytes()) - : String.format( - "bytestream://%s/%s/blobs/%s/%d", - server, remoteInstanceName, digest.getHash(), digest.getSizeBytes()); - } catch (IOException e) { - // TODO(ulfjack): Don't fail silently! - return fallbackConverter.apply(path); - } - } - } - - private final CasPathConverter converter = new CasPathConverter(); private final ListeningScheduledExecutorService retryScheduler = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); private RemoteActionContextProvider actionContextProvider; + private final BuildEventArtifactUploaderFactoryDelegate + buildEventArtifactUploaderFactoryDelegate = new BuildEventArtifactUploaderFactoryDelegate(); + @Override public void serverInit(OptionsProvider startupOptions, ServerBuilder builder) { - builder.addBuildEventArtifactUploaderFactory( - (OptionsProvider options) -> - new BuildEventArtifactUploader() { - - @Override - public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) { - // TODO(ulfjack): Actually hook up upload here. - return Futures.immediateFuture(converter); - } - - @Override - public void shutdown() { - // Intentionally left empty. - } - }, - "remote"); + builder.addBuildEventArtifactUploaderFactory(buildEventArtifactUploaderFactoryDelegate, "remote"); } private static final String VIOLATION_TYPE_MISSING = "MISSING"; @@ -179,8 +132,6 @@ public final class RemoteModule extends BlazeModule { AuthAndTLSOptions authAndTlsOptions = env.getOptions().getOptions(AuthAndTLSOptions.class); DigestHashFunction hashFn = env.getRuntime().getFileSystem().getDigestFunction(); DigestUtil digestUtil = new DigestUtil(hashFn); - converter.options = remoteOptions; - converter.digestUtil = digestUtil; // Quit if no remote options specified. if (remoteOptions == null) { @@ -203,10 +154,10 @@ public final class RemoteModule extends BlazeModule { } try { - LoggingInterceptor logger = null; + List<ClientInterceptor> interceptors = new ArrayList<>(); if (!remoteOptions.experimentalRemoteGrpcLog.isEmpty()) { rpcLogFile = new AsynchronousFileOutputStream(remoteOptions.experimentalRemoteGrpcLog); - logger = new LoggingInterceptor(rpcLogFile, env.getRuntime().getClock()); + interceptors.add(new LoggingInterceptor(rpcLogFile, env.getRuntime().getClock())); } final RemoteRetrier executeRetrier; @@ -231,24 +182,42 @@ public final class RemoteModule extends BlazeModule { } else if (enableGrpcCache || remoteOptions.remoteExecutor != null) { // If a remote executor but no remote cache is specified, assume both at the same target. String target = enableGrpcCache ? remoteOptions.remoteCache : remoteOptions.remoteExecutor; - Channel ch = GoogleAuthUtils.newChannel(target, authAndTlsOptions); - if (logger != null) { - ch = ClientInterceptors.intercept(ch, logger); - } - RemoteRetrier retrier = + ReferenceCountedChannel channel = + new ReferenceCountedChannel( + GoogleAuthUtils.newChannel( + target, + authAndTlsOptions, + interceptors.toArray(new ClientInterceptor[0]))); + RemoteRetrier rpcRetrier = new RemoteRetrier( remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, Retrier.ALLOW_ALL_CALLS); executeRetrier = createExecuteRetrier(remoteOptions, retryScheduler); + CallCredentials credentials = GoogleAuthUtils.newCallCredentials(authAndTlsOptions); + ByteStreamUploader uploader = + new ByteStreamUploader( + remoteOptions.remoteInstanceName, + channel.retain(), + credentials, + remoteOptions.remoteTimeout, + rpcRetrier); cache = new GrpcRemoteCache( - ch, - GoogleAuthUtils.newCallCredentials(authAndTlsOptions), + channel.retain(), + credentials, remoteOptions, - retrier, - digestUtil); + rpcRetrier, + digestUtil, + uploader.retain()); + Context requestContext = + TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, "bes-upload"); + buildEventArtifactUploaderFactoryDelegate.init( + new ByteStreamBuildEventArtifactUploaderFactory( + uploader, target, requestContext, remoteOptions.remoteInstanceName)); + uploader.release(); + channel.release(); } else { executeRetrier = null; cache = null; @@ -256,19 +225,20 @@ public final class RemoteModule extends BlazeModule { final GrpcRemoteExecutor executor; if (remoteOptions.remoteExecutor != null) { - Channel ch = GoogleAuthUtils.newChannel(remoteOptions.remoteExecutor, authAndTlsOptions); + ManagedChannel channel = + GoogleAuthUtils.newChannel( + remoteOptions.remoteExecutor, + authAndTlsOptions, + interceptors.toArray(new ClientInterceptor[0])); RemoteRetrier retrier = new RemoteRetrier( remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, Retrier.ALLOW_ALL_CALLS); - if (logger != null) { - ch = ClientInterceptors.intercept(ch, logger); - } executor = new GrpcRemoteExecutor( - ch, + channel, GoogleAuthUtils.newCallCredentials(authAndTlsOptions), remoteOptions.remoteTimeout, retrier); @@ -297,6 +267,7 @@ public final class RemoteModule extends BlazeModule { rpcLogFile = null; } } + buildEventArtifactUploaderFactoryDelegate.reset(); } @Override @@ -319,7 +290,7 @@ public final class RemoteModule extends BlazeModule { || GrpcRemoteCache.isRemoteCacheOptions(options); } - public static RemoteRetrier createExecuteRetrier( + static RemoteRetrier createExecuteRetrier( RemoteOptions options, ListeningScheduledExecutorService retryService) { return new RemoteRetrier( options.experimentalRemoteRetry @@ -329,4 +300,28 @@ public final class RemoteModule extends BlazeModule { retryService, Retrier.ALLOW_ALL_CALLS); } + + private static class BuildEventArtifactUploaderFactoryDelegate + implements BuildEventArtifactUploaderFactory { + + private volatile BuildEventArtifactUploaderFactory uploaderFactory; + + public void init(BuildEventArtifactUploaderFactory uploaderFactory) { + Preconditions.checkState(this.uploaderFactory == null); + this.uploaderFactory = uploaderFactory; + } + + public void reset() { + this.uploaderFactory = null; + } + + @Override + public BuildEventArtifactUploader create(OptionsProvider options) { + BuildEventArtifactUploaderFactory uploaderFactory0 = this.uploaderFactory; + if (uploaderFactory0 == null) { + return BuildEventArtifactUploader.LOCAL_FILES_UPLOADER; + } + return uploaderFactory0.create(options); + } + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java index eac9e5aac8..5955d5da5f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/TracingMetadataUtils.java @@ -14,6 +14,7 @@ package com.google.devtools.build.lib.remote.util; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey; import com.google.devtools.remoteexecution.v1test.RequestMetadata; @@ -51,17 +52,42 @@ public class TracingMetadataUtils { */ public static Context contextWithMetadata( String buildRequestId, String commandId, ActionKey actionKey) { - RequestMetadata metadata = + Preconditions.checkNotNull(buildRequestId); + Preconditions.checkNotNull(commandId); + Preconditions.checkNotNull(actionKey); + RequestMetadata.Builder metadata = RequestMetadata.newBuilder() .setCorrelatedInvocationsId(buildRequestId) - .setToolInvocationId(commandId) - .setActionId(actionKey.getDigest().getHash()) - .setToolDetails( - ToolDetails.newBuilder() - .setToolName("bazel") - .setToolVersion(BlazeVersionInfo.instance().getVersion())) + .setToolInvocationId(commandId); + metadata.setActionId(actionKey.getDigest().getHash()); + metadata.setToolDetails(ToolDetails.newBuilder() + .setToolName("bazel") + .setToolVersion(BlazeVersionInfo.instance().getVersion())) .build(); - return Context.current().withValue(CONTEXT_KEY, metadata); + return Context.current().withValue(CONTEXT_KEY, metadata.build()); + } + + /** + * Returns a new gRPC context derived from the current context, with {@link RequestMetadata} + * accessible by the {@link fromCurrentContext()} method. + * + * <p>The {@link RequestMetadata} is constructed using the provided arguments and the current tool + * version. + */ + public static Context contextWithMetadata( + String buildRequestId, String commandId, String actionId) { + Preconditions.checkNotNull(buildRequestId); + Preconditions.checkNotNull(commandId); + RequestMetadata.Builder metadata = + RequestMetadata.newBuilder() + .setCorrelatedInvocationsId(buildRequestId) + .setToolInvocationId(commandId); + metadata.setActionId(actionId); + metadata.setToolDetails(ToolDetails.newBuilder() + .setToolName("bazel") + .setToolVersion(BlazeVersionInfo.instance().getVersion())) + .build(); + return Context.current().withValue(CONTEXT_KEY, metadata.build()); } /** |