aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Ola Rozenfeld <olaola@google.com>2016-12-16 16:37:22 +0000
committerGravatar John Cater <jcater@google.com>2016-12-16 17:07:44 +0000
commitd2997290078e9e5114d604a6cedefda0f0c6dec4 (patch)
tree6c346cd11662f8614059d36de6ede3373f561ec6 /src
parent77e01d8aeadb5d987c2314ce3f6c48a3778f2247 (diff)
Creating separate instances of CAS and execution handlers for every action. This allows Bazel to talk to multiple instances of the server, if these exist, enabling server-side parallelism (due to using separate gRPC channels).
TESTED: internally and local server -- PiperOrigin-RevId: 142262973 MOS_MIGRATED_REVID=142262973
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java34
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java6
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java41
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java102
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java6
5 files changed, 85 insertions, 104 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
index 0563c6848b..08eef889a0 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
@@ -73,11 +73,7 @@ public final class GrpcActionCache implements RemoteActionCache {
/** Channel over which to send gRPC CAS queries. */
private final ManagedChannel channel;
- // TODO(olaola): proper profiling to determine the best values for these.
- private final int grpcTimeoutSeconds;
- private final int maxBatchInputs;
- private final int maxChunkSizeBytes;
- private final int maxBatchSizeBytes;
+ private final RemoteOptions options;
private static final int MAX_MEMORY_KBYTES = 512 * 1024;
@@ -138,7 +134,7 @@ public final class GrpcActionCache implements RemoteActionCache {
} else {
chunk.setOffset(offset);
}
- int size = Math.min(currentBlob.length - offset, maxChunkSizeBytes);
+ int size = Math.min(currentBlob.length - offset, options.grpcMaxChunkSizeBytes);
if (size > 0) {
chunk.setData(ByteString.copyFrom(currentBlob, offset, size));
offset += size;
@@ -203,7 +199,7 @@ public final class GrpcActionCache implements RemoteActionCache {
chunk.setOffset(offset);
}
if (bytesLeft > 0) {
- byte[] blob = new byte[(int) Math.min(bytesLeft, (long) maxChunkSizeBytes)];
+ byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)];
currentStream.read(blob);
chunk.setData(ByteString.copyFrom(blob));
bytesLeft -= blob.length;
@@ -218,11 +214,8 @@ public final class GrpcActionCache implements RemoteActionCache {
@VisibleForTesting
public GrpcActionCache(ManagedChannel channel, RemoteOptions options) {
+ this.options = options;
this.channel = channel;
- maxBatchInputs = options.grpcMaxBatchInputs;
- maxChunkSizeBytes = options.grpcMaxChunkSizeBytes;
- maxBatchSizeBytes = options.grpcMaxBatchSizeBytes;
- grpcTimeoutSeconds = options.grpcTimeoutSeconds;
}
public GrpcActionCache(RemoteOptions options) throws InvalidConfigurationException {
@@ -235,11 +228,12 @@ public final class GrpcActionCache implements RemoteActionCache {
private CasServiceBlockingStub getBlockingStub() {
return CasServiceGrpc.newBlockingStub(channel)
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
+ .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
}
private CasServiceStub getStub() {
- return CasServiceGrpc.newStub(channel).withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
+ return CasServiceGrpc.newStub(channel).withDeadlineAfter(
+ options.grpcTimeoutSeconds, TimeUnit.SECONDS);
}
private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) {
@@ -508,13 +502,15 @@ public final class GrpcActionCache implements RemoteActionCache {
int currentBatchBytes = 0;
int batchedInputs = 0;
int batches = 0;
+ CasServiceStub stub = getStub();
try {
while (blobs.hasNext()) {
BlobChunk chunk = blobs.next();
if (chunk.hasDigest()) {
// Determine whether to start next batch.
- if (batchedInputs % maxBatchInputs == 0
- || chunk.getDigest().getSizeBytes() + currentBatchBytes > maxBatchSizeBytes) {
+ final long batchSize = chunk.getDigest().getSizeBytes() + currentBatchBytes;
+ if (batchedInputs % options.grpcMaxBatchInputs == 0
+ || batchSize > options.grpcMaxBatchSizeBytes) {
// The batches execute simultaneously.
if (requestObserver != null) {
batchedInputs = 0;
@@ -523,7 +519,7 @@ public final class GrpcActionCache implements RemoteActionCache {
}
batches++;
responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception);
- requestObserver = getStub().uploadBlob(responseObserver);
+ requestObserver = stub.uploadBlob(responseObserver);
}
batchedInputs++;
}
@@ -549,7 +545,7 @@ public final class GrpcActionCache implements RemoteActionCache {
while (batches++ < numItems) {
finishLatch.countDown(); // Non-sent batches.
}
- finishLatch.await(grpcTimeoutSeconds, TimeUnit.SECONDS);
+ finishLatch.await(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
if (exception.get() != null) {
throw exception.get(); // Re-throw the first encountered exception.
}
@@ -623,7 +619,7 @@ public final class GrpcActionCache implements RemoteActionCache {
public ActionResult getCachedActionResult(ActionKey actionKey) {
ExecutionCacheServiceBlockingStub stub =
ExecutionCacheServiceGrpc.newBlockingStub(channel)
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
+ .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
ExecutionCacheRequest request =
ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build();
ExecutionCacheReply reply = stub.getCachedResult(request);
@@ -641,7 +637,7 @@ public final class GrpcActionCache implements RemoteActionCache {
throws InterruptedException {
ExecutionCacheServiceBlockingStub stub =
ExecutionCacheServiceGrpc.newBlockingStub(channel)
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
+ .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
ExecutionCacheSetRequest request =
ExecutionCacheSetRequest.newBuilder()
.setActionDigest(actionKey.getDigest())
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
index 578a763831..d4c160d685 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
@@ -30,9 +30,7 @@ final class RemoteActionContextProvider extends ActionContextProvider {
RemoteActionContextProvider(
CommandEnvironment env,
- BuildRequest buildRequest,
- RemoteActionCache actionCache,
- RemoteWorkExecutor workExecutor) {
+ BuildRequest buildRequest) {
boolean verboseFailures = buildRequest.getOptions(ExecutionOptions.class).verboseFailures;
Builder<ActionContext> strategiesBuilder = ImmutableList.builder();
strategiesBuilder.add(
@@ -41,8 +39,6 @@ final class RemoteActionContextProvider extends ActionContextProvider {
env.getExecRoot(),
buildRequest.getOptions(RemoteOptions.class),
verboseFailures,
- actionCache,
- workExecutor,
env.getRuntime().getProductName()));
this.strategies = strategiesBuilder.build();
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 7d2845c6ff..dc9e351a5c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -17,10 +17,8 @@ package com.google.devtools.build.lib.remote;
import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.actions.ExecutorBuilder;
-import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.buildtool.BuildRequest;
import com.google.devtools.build.lib.buildtool.buildevent.BuildStartingEvent;
-import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.runtime.BlazeModule;
import com.google.devtools.build.lib.runtime.Command;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
@@ -33,8 +31,6 @@ import com.google.devtools.common.options.OptionsBase;
/** RemoteModule provides distributed cache and remote execution for Bazel. */
public final class RemoteModule extends BlazeModule {
private CommandEnvironment env;
- private RemoteActionCache actionCache;
- private RemoteWorkExecutor workExecutor;
@Override
public void beforeCommand(Command command, CommandEnvironment env) {
@@ -49,41 +45,22 @@ public final class RemoteModule extends BlazeModule {
@Override
public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorBuilder builder) {
- builder.addActionContextProvider(
- new RemoteActionContextProvider(env, request, actionCache, workExecutor));
+ builder.addActionContextProvider(new RemoteActionContextProvider(env, request));
}
@Subscribe
public void buildStarting(BuildStartingEvent event) {
RemoteOptions options = event.getRequest().getOptions(RemoteOptions.class);
- try {
- // Reinitialize the remote cache and worker from options every time, because the options
- // may change from build to build.
-
- // Don't provide the remote spawn unless at least action cache is initialized.
- if (ConcurrentMapFactory.isRemoteCacheOptions(options)) {
- actionCache = new ConcurrentMapActionCache(ConcurrentMapFactory.create(options));
- }
- if (GrpcActionCache.isRemoteCacheOptions(options)) {
- actionCache = new GrpcActionCache(options);
- }
- // Otherwise actionCache remains null and remote caching/execution are disabled.
-
- if (actionCache != null) {
- HashFunction hf = FileSystem.getDigestFunction();
- if (hf != HashFunction.SHA1) {
- env.getBlazeModuleEnvironment().exit(new AbruptExitException(
- "Remote cache/execution requires SHA1 digests, got " + hf
- + ", run with --host_jvm_args=-Dbazel.DigestFunction=SHA1",
- ExitCode.COMMAND_LINE_ERROR));
- }
- if (RemoteWorkExecutor.isRemoteExecutionOptions(options)) {
- workExecutor = new RemoteWorkExecutor(options);
- }
+ if (ConcurrentMapFactory.isRemoteCacheOptions(options)
+ || GrpcActionCache.isRemoteCacheOptions(options)) {
+ HashFunction hf = FileSystem.getDigestFunction();
+ if (hf != HashFunction.SHA1) {
+ env.getBlazeModuleEnvironment().exit(new AbruptExitException(
+ "Remote cache/execution requires SHA1 digests, got " + hf
+ + ", run with --host_jvm_args=-Dbazel.DigestFunction=SHA1",
+ ExitCode.COMMAND_LINE_ERROR));
}
- } catch (InvalidConfigurationException e) {
- env.getReporter().handle(Event.warn(e.toString()));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
index 14a275e819..c7bbdda939 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
@@ -30,6 +30,7 @@ import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnActionContext;
import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.actions.UserExecException;
+import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
@@ -63,27 +64,19 @@ import java.util.TreeSet;
final class RemoteSpawnStrategy implements SpawnActionContext {
private final Path execRoot;
private final StandaloneSpawnStrategy standaloneStrategy;
- private final RemoteActionCache remoteActionCache;
- private final RemoteWorkExecutor remoteWorkExecutor;
private final boolean verboseFailures;
- private final boolean remoteAcceptCached;
- private final boolean remoteAllowLocalFallback;
+ private final RemoteOptions options;
RemoteSpawnStrategy(
Map<String, String> clientEnv,
Path execRoot,
RemoteOptions options,
boolean verboseFailures,
- RemoteActionCache actionCache,
- RemoteWorkExecutor workExecutor,
String productName) {
this.execRoot = execRoot;
this.standaloneStrategy = new StandaloneSpawnStrategy(execRoot, verboseFailures, productName);
this.verboseFailures = verboseFailures;
- this.remoteActionCache = actionCache;
- this.remoteWorkExecutor = workExecutor;
- this.remoteAcceptCached = options.remoteAcceptCached;
- this.remoteAllowLocalFallback = options.remoteAllowLocalFallback;
+ this.options = options;
}
private Action buildAction(
@@ -115,18 +108,18 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
* remote action cache.
*/
private void execLocally(
- Spawn spawn, ActionExecutionContext actionExecutionContext, ActionKey actionKey)
- throws ExecException, InterruptedException {
+ Spawn spawn, ActionExecutionContext actionExecutionContext, RemoteActionCache actionCache,
+ ActionKey actionKey) throws ExecException, InterruptedException {
standaloneStrategy.exec(spawn, actionExecutionContext);
- if (remoteActionCache != null && actionKey != null) {
+ if (actionCache != null && actionKey != null) {
ArrayList<Path> outputFiles = new ArrayList<>();
for (ActionInput output : spawn.getOutputFiles()) {
outputFiles.add(execRoot.getRelative(output.getExecPathString()));
}
try {
ActionResult.Builder result = ActionResult.newBuilder();
- remoteActionCache.uploadAllResults(execRoot, outputFiles, result);
- remoteActionCache.setCachedActionResult(actionKey, result.build());
+ actionCache.uploadAllResults(execRoot, outputFiles, result);
+ actionCache.setCachedActionResult(actionKey, result.build());
// Handle all cache errors here.
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
@@ -146,14 +139,11 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
}
- private void passRemoteOutErr(ActionResult result, FileOutErr outErr) {
- if (remoteActionCache == null) {
- return;
- }
+ private static void passRemoteOutErr(
+ RemoteActionCache cache, ActionResult result, FileOutErr outErr) {
try {
- ImmutableList<byte[]> streams =
- remoteActionCache.downloadBlobs(
- ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
+ ImmutableList<byte[]> streams = cache.downloadBlobs(
+ ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
outErr.printOut(new String(streams.get(0), UTF_8));
outErr.printErr(new String(streams.get(1), UTF_8));
} catch (CacheNotFoundException e) {
@@ -170,11 +160,6 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
@Override
public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext)
throws ExecException, InterruptedException {
- if (!spawn.isRemotable() || remoteActionCache == null) {
- standaloneStrategy.exec(spawn, actionExecutionContext);
- return;
- }
-
ActionKey actionKey = null;
String mnemonic = spawn.getMnemonic();
Executor executor = actionExecutionContext.getExecutor();
@@ -182,6 +167,32 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
executor.getEventBus().post(
ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote"));
+ RemoteActionCache actionCache = null;
+ RemoteWorkExecutor workExecutor = null;
+ if (spawn.isRemotable()) {
+ // Initialize remote cache and execution handlers. We use separate handlers for every
+ // action to enable server-side parallelism (need a different gRPC channel per action).
+ try {
+ if (ConcurrentMapFactory.isRemoteCacheOptions(options)) {
+ actionCache = new ConcurrentMapActionCache(ConcurrentMapFactory.create(options));
+ }
+ if (GrpcActionCache.isRemoteCacheOptions(options)) {
+ actionCache = new GrpcActionCache(options);
+ }
+ // Otherwise actionCache remains null and remote caching/execution are disabled.
+
+ if (actionCache != null && RemoteWorkExecutor.isRemoteExecutionOptions(options)) {
+ workExecutor = new RemoteWorkExecutor(options);
+ }
+ } catch (InvalidConfigurationException e) {
+ eventHandler.handle(Event.warn(e.toString()));
+ }
+ }
+ if (!spawn.isRemotable() || actionCache == null) {
+ standaloneStrategy.exec(spawn, actionExecutionContext);
+ return;
+ }
+
try {
// Temporary hack: the TreeNodeRepository should be created and maintained upstream!
TreeNodeRepository repository = new TreeNodeRepository(execRoot);
@@ -199,30 +210,30 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
// Look up action cache, and reuse the action output if it is found.
actionKey = ContentDigests.computeActionKey(action);
- ActionResult result = this.remoteAcceptCached
- ? remoteActionCache.getCachedActionResult(actionKey) : null;
- boolean acceptCachedResult = this.remoteAcceptCached;
+ ActionResult result = this.options.remoteAcceptCached
+ ? actionCache.getCachedActionResult(actionKey) : null;
+ boolean acceptCachedResult = this.options.remoteAcceptCached;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
- remoteActionCache.downloadAllResults(result, execRoot);
+ actionCache.downloadAllResults(result, execRoot);
return;
} catch (CacheNotFoundException e) {
acceptCachedResult = false; // Retry the action remotely and invalidate the results.
}
}
- if (remoteWorkExecutor == null) {
- execLocally(spawn, actionExecutionContext, actionKey);
+ if (workExecutor == null) {
+ execLocally(spawn, actionExecutionContext, actionCache, actionKey);
return;
}
// Upload the command and all the inputs into the remote cache.
- remoteActionCache.uploadBlob(command.toByteArray());
+ actionCache.uploadBlob(command.toByteArray());
// TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
- remoteActionCache.uploadTree(repository, execRoot, inputRoot);
+ actionCache.uploadTree(repository, execRoot, inputRoot);
// TODO(olaola): set BuildInfo and input total bytes as well.
ExecuteRequest.Builder request =
ExecuteRequest.newBuilder()
@@ -231,23 +242,24 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
.setTotalInputFileCount(inputs.size())
.setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120));
// TODO(olaola): set sensible local and remote timouts.
- ExecuteReply reply = remoteWorkExecutor.executeRemotely(request.build());
+ ExecuteReply reply = workExecutor.executeRemotely(request.build());
ExecutionStatus status = reply.getStatus();
result = reply.getResult();
// We do not want to pass on the remote stdout and strerr if we are going to retry the
// action.
if (status.getSucceeded()) {
- passRemoteOutErr(result, actionExecutionContext.getFileOutErr());
- remoteActionCache.downloadAllResults(result, execRoot);
+ passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
+ actionCache.downloadAllResults(result, execRoot);
return;
}
- if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED || !remoteAllowLocalFallback) {
- passRemoteOutErr(result, actionExecutionContext.getFileOutErr());
+ if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED
+ || !options.remoteAllowLocalFallback) {
+ passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
throw new UserExecException(status.getErrorDetail());
}
// For now, we retry locally on all other remote errors.
// TODO(olaola): add remote retries on cache miss errors.
- execLocally(spawn, actionExecutionContext, actionKey);
+ execLocally(spawn, actionExecutionContext, actionCache, actionKey);
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
} catch (InterruptedException e) {
@@ -260,15 +272,15 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
stackTrace = "\n" + Throwables.getStackTraceAsString(e);
}
eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")" + stackTrace));
- if (remoteAllowLocalFallback) {
- execLocally(spawn, actionExecutionContext, actionKey);
+ if (options.remoteAllowLocalFallback) {
+ execLocally(spawn, actionExecutionContext, actionCache, actionKey);
} else {
throw new UserExecException(e);
}
} catch (CacheNotFoundException e) {
eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")"));
- if (remoteAllowLocalFallback) {
- execLocally(spawn, actionExecutionContext, actionKey);
+ if (options.remoteAllowLocalFallback) {
+ execLocally(spawn, actionExecutionContext, actionCache, actionKey);
} else {
throw new UserExecException(e);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java
index 9cb080b402..3635ab7def 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java
@@ -29,11 +29,11 @@ import java.util.concurrent.TimeUnit;
public class RemoteWorkExecutor {
/** Channel over which to send work to run remotely. */
private final ManagedChannel channel;
- private final int grpcTimeoutSeconds;
+ private final RemoteOptions options;
public RemoteWorkExecutor(RemoteOptions options) throws InvalidConfigurationException {
+ this.options = options;
channel = RemoteUtils.createChannel(options.remoteWorker);
- grpcTimeoutSeconds = options.grpcTimeoutSeconds;
}
public static boolean isRemoteExecutionOptions(RemoteOptions options) {
@@ -44,7 +44,7 @@ public class RemoteWorkExecutor {
ExecuteServiceBlockingStub stub =
ExecuteServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(
- grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS);
+ options.grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS);
Iterator<ExecuteReply> replies = stub.execute(request);
ExecuteReply reply = null;
while (replies.hasNext()) {