aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
blob: 858f5748f5f0953e368138546e5817d4a98fedcb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.remote;

import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub;
import com.google.longrunning.Operation;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Status;
import com.google.watcher.v1.Change;
import com.google.watcher.v1.ChangeBatch;
import com.google.watcher.v1.Request;
import com.google.watcher.v1.WatcherGrpc;
import com.google.watcher.v1.WatcherGrpc.WatcherBlockingStub;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */
@ThreadSafe
class GrpcRemoteExecutor {

  private final ManagedChannel channel;
  private final CallCredentials callCredentials;
  private final int callTimeoutSecs;
  private final RemoteRetrier retrier;

  private final AtomicBoolean closed = new AtomicBoolean();

  public GrpcRemoteExecutor(
      ManagedChannel channel,
      @Nullable CallCredentials callCredentials,
      int callTimeoutSecs,
      RemoteRetrier retrier) {
    Preconditions.checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
    this.channel = channel;
    this.callCredentials = callCredentials;
    this.callTimeoutSecs = callTimeoutSecs;
    this.retrier = retrier;
  }

  private ExecutionBlockingStub execBlockingStub() {
    return ExecutionGrpc.newBlockingStub(channel)
        .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
        .withCallCredentials(callCredentials)
        .withDeadlineAfter(callTimeoutSecs, TimeUnit.SECONDS);
  }

  private WatcherBlockingStub watcherBlockingStub() {
    return WatcherGrpc.newBlockingStub(channel)
        .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
        .withCallCredentials(callCredentials);
  }

  private void handleStatus(Status statusProto, @Nullable ExecuteResponse resp) {
    if (statusProto.getCode() == Code.OK.value()) {
      return;
    }
    throw new ExecutionStatusException(statusProto, resp);
  }

  private @Nullable ExecuteResponse getOperationResponse(Operation op) throws IOException {
    if (op.getResultCase() == Operation.ResultCase.ERROR) {
      handleStatus(op.getError(), null);
    }
    if (op.getDone()) {
      Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET);
      try {
        ExecuteResponse resp = op.getResponse().unpack(ExecuteResponse.class);
        if (resp.hasStatus()) {
          handleStatus(resp.getStatus(), resp);
        }
        Preconditions.checkState(
            resp.hasResult(), "Unexpected result of remote execution: no result");
        if (resp.getResult().getExitCode() == 0) {
          Preconditions.checkState(
              resp.getResult().getOutputFilesCount() + resp.getResult().getOutputDirectoriesCount()
                  > 0,
              "Unexpected result of remote execution: no output files.");
        }
        return resp;
      } catch (InvalidProtocolBufferException e) {
        throw new IOException(e);
      }
    }
    return null;
  }

  /* Execute has two components: the execute call and the watch call.
   * This is the simple flow without any errors:
   *
   * - A call to execute returns an Operation object.
   * - That Operation may already have an inlined result; if so, we return that result.
   * - Otherwise, we call watch on that operation to receive a stream of Changes to the Operation
   *   object, until the first such change is an Operation with a result, which we return.
   *
   * Error possibilities:
   * - Any Operation object can have an error field instead of a result. Such Operations are
   *   completed and failed; however, some of these errors may be retriable. These errors should
   *   trigger a retry of the full execute+watch call, resulting in a new Operation.
   * - An execute call may fail with a retriable error (raise a StatusRuntimeException). We then
   *   retry that call.
   * - A watch call may fail with a retriable error (either raise a StatusRuntimeException, or
   *   return an ERROR in the ChangeBatch field). In that case, we retry the watch call only on the
   *   same operation object.
   * */
  public ExecuteResponse executeRemotely(ExecuteRequest request)
      throws IOException, InterruptedException {
    // The only errors retried here are transient failures of the Action itself on the server, not
    // any gRPC errors that occurred during the call.
    return retrier.execute(
        () -> {
          // Here all transient gRPC errors will be retried.
          Operation op = retrier.execute(() -> execBlockingStub().execute(request));
          ExecuteResponse resp = getOperationResponse(op);
          if (resp != null) {
            return resp;
          }
          Request wr = Request.newBuilder().setTarget(op.getName()).build();
          // Here all transient gRPC errors will be retried, while transient failures of the Action
          // itself will be propagated.
          return retrier.execute(
              () -> {
                Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
                try {
                  while (replies.hasNext()) {
                    ChangeBatch cb = replies.next();
                    for (Change ch : cb.getChangesList()) {
                      switch (ch.getState()) {
                        case INITIAL_STATE_SKIPPED:
                          continue;
                        case ERROR:
                          try {
                            throw StatusProto.toStatusRuntimeException(
                                ch.getData().unpack(Status.class));
                          } catch (InvalidProtocolBufferException e) {
                            throw new IOException(e);
                          }
                        case DOES_NOT_EXIST:
                          // TODO(olaola): either make this retriable, or use a different exception.
                          throw new IOException(
                              String.format(
                                  "Operation %s lost on the remote server.", op.getName()));
                        case EXISTS:
                          Operation o;
                          try {
                            o = ch.getData().unpack(Operation.class);
                          } catch (InvalidProtocolBufferException e) {
                            throw new IOException(e);
                          }
                          try {
                            ExecuteResponse r = getOperationResponse(o);
                            if (r != null) {
                              return r;
                            }
                          } catch (StatusRuntimeException e) {
                            // Pass through the Watch retry and retry the whole execute+watch call.
                            throw new RemoteRetrier.PassThroughException(e);
                          }
                          continue;
                        default:
                          // This can only happen if the enum gets unexpectedly extended.
                          throw new IOException(
                              String.format("Illegal change state: %s", ch.getState()));
                      }
                    }
                  }
                } finally {
                  // The blocking streaming call closes correctly only when trailers and a Status
                  // are received from the server so that onClose() is called on this call's
                  // CallListener. Under normal circumstances (no cancel/errors), these are
                  // guaranteed to be sent by the server only if replies.hasNext() has been called
                  // after all replies from the stream have been consumed.
                  try {
                    while (replies.hasNext()) {
                      replies.next();
                    }
                  } catch (StatusRuntimeException e) {
                    // Cleanup: ignore exceptions, because the meaningful errors have already been
                    // propagated.
                  }
                }
                throw new IOException(
                    String.format("Watch request for %s terminated with no result.", op.getName()));
              });
        });
  }

  public void close() {
    if (closed.getAndSet(true)) {
      return;
    }
    channel.shutdown();
  }
}