From 3abcec17d16167ab86c0e953486ef452372d31af Mon Sep 17 00:00:00 2001 From: dapengzhang0 Date: Fri, 22 Jul 2016 20:54:44 +0000 Subject: Bump grpc-java lib to version 0.15.0 non-binaries -- Change-Id: I2da9049019b3965975fab9b7f606d099d6eab2ff Reviewed-on: https://bazel-review.googlesource.com/#/c/4040/ MOS_MIGRATED_REVID=128208129 --- .../devtools/build/lib/server/GrpcServerImpl.java | 200 +++++++++++---------- 1 file changed, 105 insertions(+), 95 deletions(-) (limited to 'src/main') diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index f6a37fa334..7385bdaaaa 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -61,7 +61,7 @@ import io.grpc.stub.StreamObserver; *

Only this class should depend on gRPC so that we only need to exclude this during * bootstrapping. */ -public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer { +public class GrpcServerImpl extends RPCServer { // UTF-8 won't do because we want to be able to pass arbitrary binary strings. // Not that the internals of Bazel handle that correctly, but why not make at least this little // part correct? @@ -271,9 +271,10 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma @Override public void serve() throws IOException { Preconditions.checkState(!serving); - server = NettyServerBuilder.forAddress(new InetSocketAddress("localhost", port)) - .addService(CommandServerGrpc.bindService(this)) - .build(); + server = + NettyServerBuilder.forAddress(new InetSocketAddress("localhost", port)) + .addService(commandServer) + .build(); server.start(); if (maxIdleSeconds > 0) { @@ -347,105 +348,114 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma return instance; } - @Override - public void run( - RunRequest request, StreamObserver observer) { - if (!request.getCookie().equals(requestCookie) - || request.getClientDescription().isEmpty()) { - observer.onNext(RunResponse.newBuilder() - .setExitCode(ExitCode.LOCAL_ENVIRONMENTAL_ERROR.getNumericExitCode()) - .build()); - observer.onCompleted(); - return; - } + private final CommandServerGrpc.CommandServerImplBase commandServer = + new CommandServerGrpc.CommandServerImplBase() { + @Override + public void run(RunRequest request, StreamObserver observer) { + if (!request.getCookie().equals(requestCookie) + || request.getClientDescription().isEmpty()) { + observer.onNext( + RunResponse.newBuilder() + .setExitCode(ExitCode.LOCAL_ENVIRONMENTAL_ERROR.getNumericExitCode()) + .build()); + observer.onCompleted(); + return; + } - ImmutableList.Builder args = ImmutableList.builder(); - for (ByteString requestArg : request.getArgList()) { - args.add(requestArg.toString(CHARSET)); - } + ImmutableList.Builder args = ImmutableList.builder(); + for (ByteString requestArg : request.getArgList()) { + args.add(requestArg.toString(CHARSET)); + } - String commandId; - int exitCode; - try (RunningCommand command = new RunningCommand()) { - commandId = command.id; - OutErr rpcOutErr = OutErr.create( - new RpcOutputStream(observer, command.id, StreamType.STDOUT), - new RpcOutputStream(observer, command.id, StreamType.STDERR)); - - exitCode = commandExecutor.exec( - args.build(), rpcOutErr, - request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT, - request.getClientDescription(), clock.currentTimeMillis()); - } catch (InterruptedException e) { - exitCode = ExitCode.INTERRUPTED.getNumericExitCode(); - commandId = ""; // The default value, the client will ignore it - } + String commandId; + int exitCode; + try (RunningCommand command = new RunningCommand()) { + commandId = command.id; + OutErr rpcOutErr = + OutErr.create( + new RpcOutputStream(observer, command.id, StreamType.STDOUT), + new RpcOutputStream(observer, command.id, StreamType.STDERR)); + + exitCode = + commandExecutor.exec( + args.build(), + rpcOutErr, + request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT, + request.getClientDescription(), + clock.currentTimeMillis()); + } catch (InterruptedException e) { + exitCode = ExitCode.INTERRUPTED.getNumericExitCode(); + commandId = ""; // The default value, the client will ignore it + } - // There is a chance that a cancel request comes in after commandExecutor#exec() has finished - // and no one calls Thread.interrupted() to receive the interrupt. So we just reset the - // interruption state here to make these cancel requests not have any effect outside of command - // execution (after the try block above, the cancel request won't find the thread to interrupt) - Thread.interrupted(); - - RunResponse response = RunResponse.newBuilder() - .setCookie(responseCookie) - .setCommandId(commandId) - .setFinished(true) - .setExitCode(exitCode) - .build(); - - observer.onNext(response); - observer.onCompleted(); - - switch (commandExecutor.shutdown()) { - case NONE: - break; - - case CLEAN: - server.shutdownNow(); - break; - - case EXPUNGE: - disableShutdownHooks(); - server.shutdownNow(); - break; - } - } + // There is a chance that a cancel request comes in after commandExecutor#exec() has + // finished and no one calls Thread.interrupted() to receive the interrupt. So we just + // reset the interruption state here to make these cancel requests not have any effect + // outside of command execution (after the try block above, the cancel request won't find + // the thread to interrupt) + Thread.interrupted(); + + RunResponse response = + RunResponse.newBuilder() + .setCookie(responseCookie) + .setCommandId(commandId) + .setFinished(true) + .setExitCode(exitCode) + .build(); + + observer.onNext(response); + observer.onCompleted(); + + switch (commandExecutor.shutdown()) { + case NONE: + break; + + case CLEAN: + server.shutdownNow(); + break; + + case EXPUNGE: + disableShutdownHooks(); + server.shutdownNow(); + break; + } + } - @Override - public void ping(PingRequest pingRequest, StreamObserver streamObserver) { - Preconditions.checkState(serving); + @Override + public void ping(PingRequest pingRequest, StreamObserver streamObserver) { + Preconditions.checkState(serving); - try (RunningCommand command = new RunningCommand()) { - PingResponse.Builder response = PingResponse.newBuilder(); - if (pingRequest.getCookie().equals(requestCookie)) { - response.setCookie(responseCookie); - } + try (RunningCommand command = new RunningCommand()) { + PingResponse.Builder response = PingResponse.newBuilder(); + if (pingRequest.getCookie().equals(requestCookie)) { + response.setCookie(responseCookie); + } - streamObserver.onNext(response.build()); - streamObserver.onCompleted(); - } - } + streamObserver.onNext(response.build()); + streamObserver.onCompleted(); + } + } - @Override - public void cancel(CancelRequest request, StreamObserver streamObserver) { - if (!request.getCookie().equals(requestCookie)) { - streamObserver.onCompleted(); - return; - } + @Override + public void cancel(CancelRequest request, StreamObserver streamObserver) { + if (!request.getCookie().equals(requestCookie)) { + streamObserver.onCompleted(); + return; + } - try (RunningCommand cancelCommand = new RunningCommand()) { - synchronized (runningCommands) { - RunningCommand pendingCommand = runningCommands.get(request.getCommandId()); - if (pendingCommand != null) { - pendingCommand.thread.interrupt(); - } + try (RunningCommand cancelCommand = new RunningCommand()) { + synchronized (runningCommands) { + RunningCommand pendingCommand = runningCommands.get(request.getCommandId()); + if (pendingCommand != null) { + pendingCommand.thread.interrupt(); + } - startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId())); - } + startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId())); + } - streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build()); - streamObserver.onCompleted(); - } - } + streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build()); + streamObserver.onCompleted(); + } + } + }; } -- cgit v1.2.3