diff options
author | Lukacs Berki <lberki@google.com> | 2016-08-16 07:45:39 +0000 |
---|---|---|
committer | Philipp Wollermann <philwo@google.com> | 2016-08-16 15:21:30 +0000 |
commit | 9650f54d1b27618a0191065160ad7c409d4f4b94 (patch) | |
tree | b9f5c20a9b7549ae1888a56f3444eede033c2b5d | |
parent | aaa58714859459d2bac3814579a74f5d56297d1b (diff) |
Only call StreamObserver.onNext() from a single thread in order to avoid a memory leak in gRPC (or Netty).
--
MOS_MIGRATED_REVID=130369785
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | 87 |
1 files changed, 73 insertions, 14 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 6d22550e8a..83470c5ac5 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -14,10 +14,12 @@ package com.google.devtools.build.lib.server; +import com.google.common.base.Optional; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.net.InetAddresses; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; import com.google.devtools.build.lib.runtime.CommandExecutor; import com.google.devtools.build.lib.server.CommandProtos.CancelRequest; @@ -37,6 +39,7 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.io.OutputStream; @@ -47,7 +50,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.GuardedBy; @@ -103,21 +113,46 @@ public class GrpcServerImpl extends RPCServer { STDERR, } + private static Runnable streamRunnable( + final LinkedBlockingQueue<Optional<RunResponse>> queue, + final CallStreamObserver<RunResponse> observer) { + return new Runnable() { + @Override + public void run() { + while (true) { + Optional<RunResponse> item; + try { + item = queue.take(); + } catch (InterruptedException e) { + // Ignore. This is running on its own thread to which interrupts are never delivered + // except by explicit SIGINT to that thread, which is a case we can ignore. + continue; + } + if (!item.isPresent()) { + return; + } + + observer.onNext(item.get()); + } + } + }; + } + // TODO(lberki): Maybe we should implement line buffering? private class RpcOutputStream extends OutputStream { - private final StreamObserver<RunResponse> observer; private final String commandId; private final StreamType type; + private final LinkedBlockingQueue<Optional<RunResponse>> work; - private RpcOutputStream( - StreamObserver<RunResponse> observer, String commandId, StreamType type) { - this.observer = observer; + private RpcOutputStream(String commandId, StreamType type, + LinkedBlockingQueue<Optional<RunResponse>> work) { this.commandId = commandId; this.type = type; + this.work = work; } @Override - public synchronized void write(byte[] b, int off, int inlen) { + public void write(byte[] b, int off, int inlen) { ByteString input = ByteString.copyFrom(b, off, inlen); RunResponse.Builder response = RunResponse .newBuilder() @@ -129,8 +164,7 @@ public class GrpcServerImpl extends RPCServer { case STDERR: response.setStandardError(input); break; default: throw new IllegalStateException(); } - - observer.onNext(response.build()); + work.offer(Optional.of(response.build())); } @Override @@ -153,6 +187,7 @@ public class GrpcServerImpl extends RPCServer { private final String requestCookie; private final String responseCookie; private final AtomicLong interruptCounter = new AtomicLong(0); + private final ExecutorService streamExecutor; private final int maxIdleSeconds; private Server server; @@ -169,6 +204,17 @@ public class GrpcServerImpl extends RPCServer { this.maxIdleSeconds = maxIdleSeconds; this.serving = false; + final AtomicInteger counter = new AtomicInteger(1); + this.streamExecutor = Executors.newCachedThreadPool(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread result = new Thread(r); + result.setName("streamer-" + counter.getAndAdd(1)); + result.setDaemon(true); + return result; + } + }); + SecureRandom random = new SecureRandom(); requestCookie = generateCookie(random, 16); responseCookie = generateCookie(random, 16); @@ -336,12 +382,16 @@ public class GrpcServerImpl extends RPCServer { String commandId; int exitCode; + LinkedBlockingQueue<Optional<RunResponse>> work = new LinkedBlockingQueue<>(); + Future<?> streamFuture = streamExecutor.submit(streamRunnable( + work, (CallStreamObserver<RunResponse>) observer)); + try (RunningCommand command = new RunningCommand()) { commandId = command.id; OutErr rpcOutErr = OutErr.create( - new RpcOutputStream(observer, command.id, StreamType.STDOUT), - new RpcOutputStream(observer, command.id, StreamType.STDERR)); + new RpcOutputStream(command.id, StreamType.STDOUT, work), + new RpcOutputStream(command.id, StreamType.STDERR, work)); exitCode = commandExecutor.exec( @@ -350,16 +400,25 @@ public class GrpcServerImpl extends RPCServer { request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT, request.getClientDescription(), clock.currentTimeMillis()); + } catch (InterruptedException e) { exitCode = ExitCode.INTERRUPTED.getNumericExitCode(); commandId = ""; // The default value, the client will ignore it } - // There is a chance that a cancel request comes in after commandExecutor#exec() has - // finished and no one calls Thread.interrupted() to receive the interrupt. So we just - // reset the interruption state here to make these cancel requests not have any effect - // outside of command execution (after the try block above, the cancel request won't find - // the thread to interrupt) + // Signal the streamer thread to exit. If we don't do this, streamFuture will never get + // computed and we hang. + work.offer(Optional.<RunResponse>absent()); + try { + Uninterruptibles.getUninterruptibly(streamFuture); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } + + // There is a chance that an Uninterruptibles#getUninterruptibly() leaves us with the + // interrupt bit set. So we just reset the interruption state here to make these cancel + // requests not have any effect outside of command execution (after the try block above, + // the cancel request won't find the thread to interrupt) Thread.interrupted(); RunResponse response = |