aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/server
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-08-16 07:45:39 +0000
committerGravatar Philipp Wollermann <philwo@google.com>2016-08-16 15:21:30 +0000
commit9650f54d1b27618a0191065160ad7c409d4f4b94 (patch)
treeb9f5c20a9b7549ae1888a56f3444eede033c2b5d /src/main/java/com/google/devtools/build/lib/server
parentaaa58714859459d2bac3814579a74f5d56297d1b (diff)
Only call StreamObserver.onNext() from a single thread in order to avoid a memory leak in gRPC (or Netty).
-- MOS_MIGRATED_REVID=130369785
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java87
1 files changed, 73 insertions, 14 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 6d22550e8a..83470c5ac5 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -14,10 +14,12 @@
package com.google.devtools.build.lib.server;
+import com.google.common.base.Optional;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
import com.google.devtools.build.lib.runtime.CommandExecutor;
import com.google.devtools.build.lib.server.CommandProtos.CancelRequest;
@@ -37,6 +39,7 @@ import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
+import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
@@ -47,7 +50,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
@@ -103,21 +113,46 @@ public class GrpcServerImpl extends RPCServer {
STDERR,
}
+ private static Runnable streamRunnable(
+ final LinkedBlockingQueue<Optional<RunResponse>> queue,
+ final CallStreamObserver<RunResponse> observer) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ Optional<RunResponse> item;
+ try {
+ item = queue.take();
+ } catch (InterruptedException e) {
+ // Ignore. This is running on its own thread to which interrupts are never delivered
+ // except by explicit SIGINT to that thread, which is a case we can ignore.
+ continue;
+ }
+ if (!item.isPresent()) {
+ return;
+ }
+
+ observer.onNext(item.get());
+ }
+ }
+ };
+ }
+
// TODO(lberki): Maybe we should implement line buffering?
private class RpcOutputStream extends OutputStream {
- private final StreamObserver<RunResponse> observer;
private final String commandId;
private final StreamType type;
+ private final LinkedBlockingQueue<Optional<RunResponse>> work;
- private RpcOutputStream(
- StreamObserver<RunResponse> observer, String commandId, StreamType type) {
- this.observer = observer;
+ private RpcOutputStream(String commandId, StreamType type,
+ LinkedBlockingQueue<Optional<RunResponse>> work) {
this.commandId = commandId;
this.type = type;
+ this.work = work;
}
@Override
- public synchronized void write(byte[] b, int off, int inlen) {
+ public void write(byte[] b, int off, int inlen) {
ByteString input = ByteString.copyFrom(b, off, inlen);
RunResponse.Builder response = RunResponse
.newBuilder()
@@ -129,8 +164,7 @@ public class GrpcServerImpl extends RPCServer {
case STDERR: response.setStandardError(input); break;
default: throw new IllegalStateException();
}
-
- observer.onNext(response.build());
+ work.offer(Optional.of(response.build()));
}
@Override
@@ -153,6 +187,7 @@ public class GrpcServerImpl extends RPCServer {
private final String requestCookie;
private final String responseCookie;
private final AtomicLong interruptCounter = new AtomicLong(0);
+ private final ExecutorService streamExecutor;
private final int maxIdleSeconds;
private Server server;
@@ -169,6 +204,17 @@ public class GrpcServerImpl extends RPCServer {
this.maxIdleSeconds = maxIdleSeconds;
this.serving = false;
+ final AtomicInteger counter = new AtomicInteger(1);
+ this.streamExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread result = new Thread(r);
+ result.setName("streamer-" + counter.getAndAdd(1));
+ result.setDaemon(true);
+ return result;
+ }
+ });
+
SecureRandom random = new SecureRandom();
requestCookie = generateCookie(random, 16);
responseCookie = generateCookie(random, 16);
@@ -336,12 +382,16 @@ public class GrpcServerImpl extends RPCServer {
String commandId;
int exitCode;
+ LinkedBlockingQueue<Optional<RunResponse>> work = new LinkedBlockingQueue<>();
+ Future<?> streamFuture = streamExecutor.submit(streamRunnable(
+ work, (CallStreamObserver<RunResponse>) observer));
+
try (RunningCommand command = new RunningCommand()) {
commandId = command.id;
OutErr rpcOutErr =
OutErr.create(
- new RpcOutputStream(observer, command.id, StreamType.STDOUT),
- new RpcOutputStream(observer, command.id, StreamType.STDERR));
+ new RpcOutputStream(command.id, StreamType.STDOUT, work),
+ new RpcOutputStream(command.id, StreamType.STDERR, work));
exitCode =
commandExecutor.exec(
@@ -350,16 +400,25 @@ public class GrpcServerImpl extends RPCServer {
request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT,
request.getClientDescription(),
clock.currentTimeMillis());
+
} catch (InterruptedException e) {
exitCode = ExitCode.INTERRUPTED.getNumericExitCode();
commandId = ""; // The default value, the client will ignore it
}
- // There is a chance that a cancel request comes in after commandExecutor#exec() has
- // finished and no one calls Thread.interrupted() to receive the interrupt. So we just
- // reset the interruption state here to make these cancel requests not have any effect
- // outside of command execution (after the try block above, the cancel request won't find
- // the thread to interrupt)
+ // Signal the streamer thread to exit. If we don't do this, streamFuture will never get
+ // computed and we hang.
+ work.offer(Optional.<RunResponse>absent());
+ try {
+ Uninterruptibles.getUninterruptibly(streamFuture);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+
+ // There is a chance that an Uninterruptibles#getUninterruptibly() leaves us with the
+ // interrupt bit set. So we just reset the interruption state here to make these cancel
+ // requests not have any effect outside of command execution (after the try block above,
+ // the cancel request won't find the thread to interrupt)
Thread.interrupted();
RunResponse response =