// Copyright 2017 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.remote; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import com.google.bytestream.ByteStreamGrpc; import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.remote.Retrier.RetryException; import com.google.devtools.remoteexecution.v1test.Digest; import io.grpc.CallCredentials; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; /** * A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. * *

Users must call {@link #shutdown()} before exiting. */ final class ByteStreamUploader { private static final Logger logger = Logger.getLogger(ByteStreamUploader.class.getName()); private final String instanceName; private final Channel channel; private final CallCredentials callCredentials; private final long callTimeoutSecs; private final RemoteRetrier retrier; private final ListeningScheduledExecutorService retryService; private final Object lock = new Object(); @GuardedBy("lock") private final Map> uploadsInProgress = new HashMap<>(); @GuardedBy("lock") private boolean isShutdown; /** * Creates a new instance. * * @param instanceName the instance name to be prepended to resource name of the {@code Write} * call. See the {@code ByteStream} service definition for details * @param channel the {@link io.grpc.Channel} to use for calls * @param callCredentials the credentials to use for authentication. May be {@code null}, in which * case no authentication is performed * @param callTimeoutSecs the timeout in seconds after which a {@code Write} gRPC call must be * complete. The timeout resets between retries * @param retrier the {@link RemoteRetrier} whose backoff strategy to use for retry timings. * @param retryService the executor service to schedule retries on. It's the responsibility of the * caller to properly shutdown the service after use. Users should avoid shutting down the * service before {@link #shutdown()} has been called */ public ByteStreamUploader( @Nullable String instanceName, Channel channel, @Nullable CallCredentials callCredentials, long callTimeoutSecs, RemoteRetrier retrier, ListeningScheduledExecutorService retryService) { checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0."); this.instanceName = instanceName; this.channel = channel; this.callCredentials = callCredentials; this.callTimeoutSecs = callTimeoutSecs; this.retrier = retrier; this.retryService = retryService; } /** * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. * *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is * transparent to the user of this API. * *

Trying to upload the same BLOB multiple times concurrently, results in only one upload being * performed. This is transparent to the user of this API. * * @throws IOException when reading of the {@link Chunker}s input source fails * @throws RetryException when the upload failed after a retry */ public void uploadBlob(Chunker chunker) throws IOException, InterruptedException { uploadBlobs(singletonList(chunker)); } /** * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks * until the upload of all BLOBs is complete, or throws an {@link Exception} after the first * upload failed. Any other uploads will continue uploading in the background, until they complete * or the {@link #shutdown()} method is called. Errors encountered by these uploads are swallowed. * *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is * transparent to the user of this API. * *

Trying to upload the same BLOB multiple times concurrently, results in only one upload being * performed. This is transparent to the user of this API. * * @throws IOException when reading of the {@link Chunker}s input source fails * @throws RetryException when the upload failed after a retry */ public void uploadBlobs(Iterable chunkers) throws IOException, InterruptedException { List> uploads = new ArrayList<>(); for (Chunker chunker : chunkers) { uploads.add(uploadBlobAsync(chunker)); } try { for (ListenableFuture upload : uploads) { upload.get(); } } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof RetryException) { throw (RetryException) cause; } else { throw Throwables.propagate(cause); } } catch (InterruptedException e) { Thread.interrupted(); throw e; } } /** * Cancels all running uploads. The method returns immediately and does NOT wait for the uploads * to be cancelled. * *

This method must be the last method called. */ public void shutdown() { synchronized (lock) { if (isShutdown) { return; } isShutdown = true; // Before cancelling, copy the futures to a separate list in order to avoid concurrently // iterating over and modifying the map (cancel triggers a listener that removes the entry // from the map. the listener is executed in the same thread.). List> uploadsToCancel = new ArrayList<>(uploadsInProgress.values()); for (Future upload : uploadsToCancel) { upload.cancel(true); } } } @VisibleForTesting ListenableFuture uploadBlobAsync(Chunker chunker) throws IOException { Digest digest = checkNotNull(chunker.digest()); synchronized (lock) { checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); ListenableFuture uploadResult = uploadsInProgress.get(digest); if (uploadResult == null) { uploadResult = SettableFuture.create(); uploadResult.addListener( () -> { synchronized (lock) { uploadsInProgress.remove(digest); } }, MoreExecutors.directExecutor()); startAsyncUploadWithRetry( chunker, retrier.newBackoff(), (SettableFuture) uploadResult); uploadsInProgress.put(digest, uploadResult); } return uploadResult; } } @VisibleForTesting boolean uploadsInProgress() { synchronized (lock) { return !uploadsInProgress.isEmpty(); } } private void startAsyncUploadWithRetry( Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture overallUploadResult) { AsyncUpload.Listener listener = new AsyncUpload.Listener() { @Override public void success() { overallUploadResult.set(null); } @Override public void failure(Status status) { StatusException cause = status.asException(); long nextDelayMillis = backoffTimes.nextDelayMillis(); if (nextDelayMillis < 0 || !retrier.isRetriable(cause)) { // Out of retries or status not retriable. RetryException error = new RetryException( "Out of retries or status not retriable.", backoffTimes.getRetryAttempts(), cause); overallUploadResult.setException(error); } else { retryAsyncUpload(nextDelayMillis, chunker, backoffTimes, overallUploadResult); } } private void retryAsyncUpload( long nextDelayMillis, Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture overallUploadResult) { try { ListenableScheduledFuture schedulingResult = retryService.schedule( Context.current() .wrap( () -> startAsyncUploadWithRetry( chunker, backoffTimes, overallUploadResult)), nextDelayMillis, MILLISECONDS); // In case the scheduled execution errors, we need to notify the overallUploadResult. schedulingResult.addListener( () -> { try { schedulingResult.get(); } catch (Exception e) { overallUploadResult.setException( new RetryException( "Scheduled execution errored.", backoffTimes.getRetryAttempts(), e)); } }, MoreExecutors.directExecutor()); } catch (RejectedExecutionException e) { // May be thrown by .schedule(...) if i.e. the executor is shutdown. overallUploadResult.setException( new RetryException("Rejected by executor.", backoffTimes.getRetryAttempts(), e)); } } }; try { chunker.reset(); } catch (IOException e) { overallUploadResult.setException(e); return; } AsyncUpload newUpload = new AsyncUpload(channel, callCredentials, callTimeoutSecs, instanceName, chunker, listener); overallUploadResult.addListener( () -> { if (overallUploadResult.isCancelled()) { newUpload.cancel(); } }, MoreExecutors.directExecutor()); newUpload.start(); } private static class AsyncUpload { interface Listener { void success(); void failure(Status status); } private final Channel channel; private final CallCredentials callCredentials; private final long callTimeoutSecs; private final String instanceName; private final Chunker chunker; private final Listener listener; private ClientCall call; AsyncUpload( Channel channel, CallCredentials callCredentials, long callTimeoutSecs, String instanceName, Chunker chunker, Listener listener) { this.channel = channel; this.callCredentials = callCredentials; this.callTimeoutSecs = callTimeoutSecs; this.instanceName = instanceName; this.chunker = chunker; this.listener = listener; } void start() { CallOptions callOptions = CallOptions.DEFAULT .withCallCredentials(callCredentials) .withDeadlineAfter(callTimeoutSecs, SECONDS); call = channel.newCall(ByteStreamGrpc.METHOD_WRITE, callOptions); ClientCall.Listener callListener = new ClientCall.Listener() { private final WriteRequest.Builder requestBuilder = WriteRequest.newBuilder(); private boolean callHalfClosed = false; @Override public void onMessage(WriteResponse response) { // TODO(buchgr): The ByteStream API allows to resume the upload at the committedSize. } @Override public void onClose(Status status, Metadata trailers) { if (status.isOk()) { listener.success(); } else { listener.failure(status); } } @Override public void onReady() { while (call.isReady()) { if (!chunker.hasNext()) { // call.halfClose() may only be called once. Guard against it being called more // often. // See: https://github.com/grpc/grpc-java/issues/3201 if (!callHalfClosed) { callHalfClosed = true; // Every chunk has been written. No more work to do. call.halfClose(); } return; } try { requestBuilder.clear(); Chunker.Chunk chunk = chunker.next(); if (chunk.getOffset() == 0) { // Resource name only needs to be set on the first write for each file. requestBuilder.setResourceName(newResourceName(chunk.getDigest())); } boolean isLastChunk = !chunker.hasNext(); WriteRequest request = requestBuilder .setData(chunk.getData()) .setWriteOffset(chunk.getOffset()) .setFinishWrite(isLastChunk) .build(); call.sendMessage(request); } catch (IOException e) { try { chunker.reset(); } catch (IOException e1) { // This exception indicates that closing the underlying input stream failed. // We don't expect this to ever happen, but don't want to swallow the exception // completely. logger.log(Level.WARNING, "Chunker failed closing data source.", e1); } finally { call.cancel("Failed to read next chunk.", e); } } } } private String newResourceName(Digest digest) { String resourceName = format( "uploads/%s/blobs/%s/%d", UUID.randomUUID(), digest.getHash(), digest.getSizeBytes()); if (!Strings.isNullOrEmpty(instanceName)) { resourceName = instanceName + "/" + resourceName; } return resourceName; } }; call.start(callListener, TracingMetadataUtils.headersFromCurrentContext()); call.request(1); } void cancel() { if (call != null) { call.cancel("Cancelled by user.", null); } } } }