// Copyright 2016 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.remote; import com.google.common.base.Preconditions; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.remoteexecution.v1test.ExecuteRequest; import com.google.devtools.remoteexecution.v1test.ExecuteResponse; import com.google.devtools.remoteexecution.v1test.ExecutionGrpc; import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub; import com.google.longrunning.Operation; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Status; import com.google.watcher.v1.Change; import com.google.watcher.v1.ChangeBatch; import com.google.watcher.v1.Request; import com.google.watcher.v1.WatcherGrpc; import com.google.watcher.v1.WatcherGrpc.WatcherBlockingStub; import io.grpc.CallCredentials; import io.grpc.ManagedChannel; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; /** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */ @ThreadSafe class GrpcRemoteExecutor { private final ManagedChannel channel; private final CallCredentials callCredentials; private final int callTimeoutSecs; private final RemoteRetrier retrier; private final AtomicBoolean closed = new AtomicBoolean(); public GrpcRemoteExecutor( ManagedChannel channel, @Nullable CallCredentials callCredentials, int callTimeoutSecs, RemoteRetrier retrier) { Preconditions.checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0."); this.channel = channel; this.callCredentials = callCredentials; this.callTimeoutSecs = callTimeoutSecs; this.retrier = retrier; } private ExecutionBlockingStub execBlockingStub() { return ExecutionGrpc.newBlockingStub(channel) .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) .withCallCredentials(callCredentials) .withDeadlineAfter(callTimeoutSecs, TimeUnit.SECONDS); } private WatcherBlockingStub watcherBlockingStub() { return WatcherGrpc.newBlockingStub(channel) .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) .withCallCredentials(callCredentials); } private void handleStatus(Status statusProto, @Nullable ExecuteResponse resp) { if (statusProto.getCode() == Code.OK.value()) { return; } throw new ExecutionStatusException(statusProto, resp); } private @Nullable ExecuteResponse getOperationResponse(Operation op) throws IOException { if (op.getResultCase() == Operation.ResultCase.ERROR) { handleStatus(op.getError(), null); } if (op.getDone()) { Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET); try { ExecuteResponse resp = op.getResponse().unpack(ExecuteResponse.class); if (resp.hasStatus()) { handleStatus(resp.getStatus(), resp); } Preconditions.checkState( resp.hasResult(), "Unexpected result of remote execution: no result"); if (resp.getResult().getExitCode() == 0) { Preconditions.checkState( resp.getResult().getOutputFilesCount() + resp.getResult().getOutputDirectoriesCount() > 0, "Unexpected result of remote execution: no output files."); } return resp; } catch (InvalidProtocolBufferException e) { throw new IOException(e); } } return null; } /* Execute has two components: the execute call and the watch call. * This is the simple flow without any errors: * * - A call to execute returns an Operation object. * - That Operation may already have an inlined result; if so, we return that result. * - Otherwise, we call watch on that operation to receive a stream of Changes to the Operation * object, until the first such change is an Operation with a result, which we return. * * Error possibilities: * - Any Operation object can have an error field instead of a result. Such Operations are * completed and failed; however, some of these errors may be retriable. These errors should * trigger a retry of the full execute+watch call, resulting in a new Operation. * - An execute call may fail with a retriable error (raise a StatusRuntimeException). We then * retry that call. * - A watch call may fail with a retriable error (either raise a StatusRuntimeException, or * return an ERROR in the ChangeBatch field). In that case, we retry the watch call only on the * same operation object. * */ public ExecuteResponse executeRemotely(ExecuteRequest request) throws IOException, InterruptedException { // The only errors retried here are transient failures of the Action itself on the server, not // any gRPC errors that occurred during the call. return retrier.execute( () -> { // Here all transient gRPC errors will be retried. Operation op = retrier.execute(() -> execBlockingStub().execute(request)); ExecuteResponse resp = getOperationResponse(op); if (resp != null) { return resp; } Request wr = Request.newBuilder().setTarget(op.getName()).build(); // Here all transient gRPC errors will be retried, while transient failures of the Action // itself will be propagated. return retrier.execute( () -> { Iterator replies = watcherBlockingStub().watch(wr); try { while (replies.hasNext()) { ChangeBatch cb = replies.next(); for (Change ch : cb.getChangesList()) { switch (ch.getState()) { case INITIAL_STATE_SKIPPED: continue; case ERROR: try { throw StatusProto.toStatusRuntimeException( ch.getData().unpack(Status.class)); } catch (InvalidProtocolBufferException e) { throw new IOException(e); } case DOES_NOT_EXIST: // TODO(olaola): either make this retriable, or use a different exception. throw new IOException( String.format( "Operation %s lost on the remote server.", op.getName())); case EXISTS: Operation o; try { o = ch.getData().unpack(Operation.class); } catch (InvalidProtocolBufferException e) { throw new IOException(e); } try { ExecuteResponse r = getOperationResponse(o); if (r != null) { return r; } } catch (StatusRuntimeException e) { // Pass through the Watch retry and retry the whole execute+watch call. throw new RemoteRetrier.PassThroughException(e); } continue; default: // This can only happen if the enum gets unexpectedly extended. throw new IOException( String.format("Illegal change state: %s", ch.getState())); } } } } finally { // The blocking streaming call closes correctly only when trailers and a Status // are received from the server so that onClose() is called on this call's // CallListener. Under normal circumstances (no cancel/errors), these are // guaranteed to be sent by the server only if replies.hasNext() has been called // after all replies from the stream have been consumed. try { while (replies.hasNext()) { replies.next(); } } catch (StatusRuntimeException e) { // Cleanup: ignore exceptions, because the meaningful errors have already been // propagated. } } throw new IOException( String.format("Watch request for %s terminated with no result.", op.getName())); }); }); } public void close() { if (closed.getAndSet(true)) { return; } channel.shutdown(); } }