aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java106
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java28
-rw-r--r--src/test/java/com/google/devtools/build/lib/BUILD1
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java111
4 files changed, 214 insertions, 32 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index e1f888d521..d6bcbb253c 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -108,17 +108,26 @@ public class GrpcServerImpl implements RPCServer {
private static final long NANOSECONDS_IN_MS = TimeUnit.MILLISECONDS.toNanos(1);
+ private static final long NANOS_PER_IDLE_CHECK =
+ TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS);
+
+
private class RunningCommand implements AutoCloseable {
private final Thread thread;
private final String id;
- private RunningCommand() {
+ private RunningCommand() throws InterruptedException {
thread = Thread.currentThread();
id = UUID.randomUUID().toString();
synchronized (runningCommands) {
if (runningCommands.isEmpty()) {
busy();
}
+
+ if (shuttingDown) {
+ throw new InterruptedException();
+ }
+
runningCommands.put(id, this);
runningCommands.notify();
}
@@ -444,21 +453,56 @@ public class GrpcServerImpl implements RPCServer {
}
}
+ // The synchronized block is here so that if the "PID file deleted" timer or the idle shutdown
+ // mechanism kicks in during a regular shutdown, they don't race.
+ @VisibleForTesting // productionVisibility = Visibility.PRIVATE
+ void signalShutdown() {
+ synchronized (runningCommands) {
+ shuttingDown = true;
+ server.shutdown();
+ }
+ }
+
/**
- * A thread that watches if the PID file changes and shuts down the server immediately if so.
+ * A thread that shuts the server down under the following conditions:
+ * <ul>
+ * <li>The PID file changes (in this case, *very* quickly)</li>
+ * <li>The workspace directory is deleted</li>
+ * <li>There is too much memory pressure on the host</li>
+ * </ul>
*/
- private class PidFileWatcherThread extends Thread {
- private boolean shuttingDown = false;
+ private class ShutdownWatcherThread extends Thread {
+ private long lastIdleCheckNanos;
- private PidFileWatcherThread() {
- super("pid-file-watcher");
+ private ShutdownWatcherThread() {
+ super("grpc-server-shutdown-watcher");
setDaemon(true);
}
- // The synchronized block is here so that if the "PID file deleted" timer kicks in during a
- // regular shutdown, they don't race.
- private synchronized void signalShutdown() {
- shuttingDown = true;
+ private void doIdleChecksMaybe() {
+ synchronized (runningCommands) {
+ if (!runningCommands.isEmpty()) {
+ lastIdleCheckNanos = -1;
+ return;
+ }
+
+ long currentNanos = BlazeClock.nanoTime();
+ if (lastIdleCheckNanos == -1) {
+ lastIdleCheckNanos = currentNanos;
+ return;
+ }
+
+ if (currentNanos - lastIdleCheckNanos < NANOS_PER_IDLE_CHECK) {
+ return;
+ }
+
+ if (!idleServerTasks.continueProcessing()) {
+ signalShutdown();
+ server.shutdown();
+ }
+
+ lastIdleCheckNanos = currentNanos;
+ }
}
@Override
@@ -474,8 +518,12 @@ public class GrpcServerImpl implements RPCServer {
// Handled by virtue of ok not being set to true
}
+ if (ok) {
+ doIdleChecksMaybe();
+ }
+
if (!ok) {
- synchronized (PidFileWatcherThread.this) {
+ synchronized (ShutdownWatcherThread.this) {
if (shuttingDown) {
log.warning("PID file deleted or overwritten but shutdown is already in progress");
break;
@@ -511,14 +559,16 @@ public class GrpcServerImpl implements RPCServer {
private final String responseCookie;
private final AtomicLong interruptCounter = new AtomicLong(0);
private final int maxIdleSeconds;
- private final PidFileWatcherThread pidFileWatcherThread;
+ private final ShutdownWatcherThread shutdownWatcherThread;
private final Path pidFile;
private final String pidInFile;
private Server server;
private IdleServerTasks idleServerTasks;
private final int port;
- boolean serving;
+ private InetSocketAddress address;
+ private boolean serving;
+ private boolean shuttingDown = false;
public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
Path workspace, Path serverDirectory, int maxIdleSeconds) throws IOException {
@@ -549,12 +599,22 @@ public class GrpcServerImpl implements RPCServer {
requestCookie = generateCookie(random, 16);
responseCookie = generateCookie(random, 16);
- pidFileWatcherThread = new PidFileWatcherThread();
- pidFileWatcherThread.start();
+ shutdownWatcherThread = new ShutdownWatcherThread();
+ shutdownWatcherThread.start();
idleServerTasks = new IdleServerTasks(workspace);
idleServerTasks.idle();
}
+ @VisibleForTesting // productionVisibility = Visibility.PRIVATE
+ String getRequestCookie() {
+ return requestCookie;
+ }
+
+ @VisibleForTesting // productionVisibility = Visibility.PRIVATE
+ InetSocketAddress getAddress() {
+ return address;
+ }
+
private void idle() {
Preconditions.checkState(idleServerTasks == null);
idleServerTasks = new IdleServerTasks(workspace);
@@ -642,7 +702,7 @@ public class GrpcServerImpl implements RPCServer {
}
}
- server.shutdown();
+ signalShutdown();
}
/**
@@ -668,7 +728,7 @@ public class GrpcServerImpl implements RPCServer {
@Override
public void prepareForAbruptShutdown() {
disableShutdownHooks();
- pidFileWatcherThread.signalShutdown();
+ signalShutdown();
}
@Override
@@ -723,7 +783,7 @@ public class GrpcServerImpl implements RPCServer {
timeoutThread.start();
}
serving = true;
-
+ this.address = new InetSocketAddress(address.getAddress(), server.getPort());
writeServerFile(
PORT_FILE, InetAddresses.toUriString(address.getAddress()) + ":" + server.getPort());
writeServerFile(REQUEST_COOKIE_FILE, requestCookie);
@@ -783,7 +843,8 @@ public class GrpcServerImpl implements RPCServer {
log.severe(err.toString());
}
- private void executeCommand(
+ @VisibleForTesting // productionVisibility = Visibility.PRIVATE
+ void executeCommand(
RunRequest request, StreamObserver<RunResponse> observer, GrpcSink sink) {
sink.setCommandThread(Thread.currentThread());
@@ -878,8 +939,7 @@ public class GrpcServerImpl implements RPCServer {
}
if (commandExecutor.shutdown()) {
- pidFileWatcherThread.signalShutdown();
- server.shutdown();
+ signalShutdown();
}
}
@@ -912,6 +972,8 @@ public class GrpcServerImpl implements RPCServer {
streamObserver.onNext(response.build());
streamObserver.onCompleted();
+ } catch (InterruptedException e) {
+ // Ignore, we are shutting down anyway
}
}
@@ -959,6 +1021,8 @@ public class GrpcServerImpl implements RPCServer {
log.info("Client cancelled RPC of cancellation request for "
+ request.getCommandId());
}
+ } catch (InterruptedException e) {
+ // Ignore, we are shutting down anyway
}
}
};
diff --git a/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java b/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java
index f37e97ae9f..efcbc3f6eb 100644
--- a/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java
+++ b/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java
@@ -16,6 +16,7 @@ package com.google.devtools.build.lib.server;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.unix.ProcMeminfoParser;
+import com.google.devtools.build.lib.util.BlazeClock;
import com.google.devtools.build.lib.util.LoggingUtil;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileStatus;
@@ -35,11 +36,12 @@ import javax.annotation.Nullable;
*/
class IdleServerTasks {
+ private long idleStart;
private final Path workspaceDir;
private final ScheduledThreadPoolExecutor executor;
- private static final Logger LOG = Logger.getLogger(IdleServerTasks.class.getName());
+ private static final Logger log = Logger.getLogger(IdleServerTasks.class.getName());
- private static final long FIVE_MIN_MILLIS = 1000 * 60 * 5;
+ private static final long FIVE_MIN_NANOS = 1000L * 1000 * 1000 * 60 * 5;
/**
* Must be called from the main thread.
@@ -56,6 +58,7 @@ class IdleServerTasks {
public void idle() {
Preconditions.checkState(!executor.isShutdown());
+ idleStart = BlazeClock.nanoTime();
// Do a GC cycle while the server is idle.
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
@@ -63,7 +66,7 @@ class IdleServerTasks {
new Runnable() {
@Override
public void run() {
- try (AutoProfiler p = AutoProfiler.logged("Idle GC", LOG)) {
+ try (AutoProfiler p = AutoProfiler.logged("Idle GC", log)) {
System.gc();
}
}
@@ -105,8 +108,8 @@ class IdleServerTasks {
* Return true iff the server should continue processing requests.
* Called from the main thread, so it should return quickly.
*/
- public boolean continueProcessing(long idleMillis) {
- if (!memoryHeuristic(idleMillis)) {
+ public boolean continueProcessing() {
+ if (!memoryHeuristic()) {
return false;
}
if (workspaceDir == null) {
@@ -124,8 +127,10 @@ class IdleServerTasks {
return stat != null && stat.isDirectory();
}
- private boolean memoryHeuristic(long idleMillis) {
- if (idleMillis < FIVE_MIN_MILLIS) {
+ private boolean memoryHeuristic() {
+ Preconditions.checkState(!executor.isShutdown());
+ long idleNanos = BlazeClock.nanoTime() - idleStart;
+ if (idleNanos < FIVE_MIN_NANOS) {
// Don't check memory health until after five minutes.
return true;
}
@@ -134,11 +139,12 @@ class IdleServerTasks {
try {
memInfo = new ProcMeminfoParser();
} catch (IOException e) {
- LOG.info("Could not process /proc/meminfo: " + e);
+ log.info("Could not process /proc/meminfo: " + e);
return true;
}
- long totalPhysical, totalFree;
+ long totalPhysical;
+ long totalFree;
try {
totalPhysical = memInfo.getTotalKb();
totalFree = memInfo.getFreeRamKb(); // See method javadoc.
@@ -153,8 +159,8 @@ class IdleServerTasks {
// If the system as a whole is low on memory, let this server die.
if (fractionFree < .1) {
- LOG.info("Terminating due to memory constraints");
- LOG.info(String.format("Total physical:%d\nTotal free: %d\n",
+ log.info("Terminating due to memory constraints");
+ log.info(String.format("Total physical:%d\nTotal free: %d\n",
totalPhysical, totalFree));
return false;
}
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 7f26b3e773..78f77e2d96 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -737,6 +737,7 @@ java_test(
":testutil",
"//src/main/java/com/google/devtools/build/lib:bazel-main",
"//src/main/java/com/google/devtools/build/lib:collect",
+ "//src/main/java/com/google/devtools/build/lib:inmemoryfs",
"//src/main/java/com/google/devtools/build/lib:io",
"//src/main/java/com/google/devtools/build/lib:runtime",
"//src/main/java/com/google/devtools/build/lib:unix",
diff --git a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
index 7d504d347e..cfb6b5c02c 100644
--- a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
@@ -23,18 +23,32 @@ import static org.mockito.Mockito.when;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
+import com.google.devtools.build.lib.runtime.CommandExecutor;
+import com.google.devtools.build.lib.server.CommandProtos.PingRequest;
+import com.google.devtools.build.lib.server.CommandProtos.RunRequest;
import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
+import com.google.devtools.build.lib.server.CommandServerGrpc.CommandServerBlockingStub;
import com.google.devtools.build.lib.server.GrpcServerImpl.StreamType;
import com.google.devtools.build.lib.testutil.Suite;
import com.google.devtools.build.lib.testutil.TestSpec;
import com.google.devtools.build.lib.testutil.TestThread;
import com.google.devtools.build.lib.testutil.TestUtils;
+import com.google.devtools.build.lib.util.BlazeClock;
import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.FileSystem;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -334,4 +348,101 @@ public class GrpcServerTest {
.setStandardOutput(ByteString.copyFrom(chunk3.getBytes(StandardCharsets.ISO_8859_1)))
.build());
}
+
+ private static class Server {
+ private final Path workspaceDirectory;
+ private final Path serverDirectory;
+ private final CommandExecutor commandExecutor;
+ private final GrpcServerImpl impl;
+ private final TestThread serverThread;
+ private final CommandServerBlockingStub clientStub;
+
+ private Server() throws Exception {
+ FileSystem fs = new InMemoryFileSystem(BlazeClock.instance());
+ workspaceDirectory = fs.getPath("/workspace");
+ workspaceDirectory.createDirectory();
+ serverDirectory = fs.getPath("/server");
+ serverDirectory.createDirectory();
+ Path pidfile = fs.getPath("/server/server.pid.txt");
+ FileSystemUtils.writeContentAsLatin1(pidfile, "1234");
+ commandExecutor = mock(CommandExecutor.class);
+ impl = new GrpcServerImpl(
+ commandExecutor, BlazeClock.instance(), 0, workspaceDirectory, serverDirectory, 1000);
+ serverThread = new TestThread() {
+ @Override
+ public void runTest() throws Exception {
+ impl.serve();
+ }
+ };
+
+ serverThread.start();
+ CommandServerBlockingStub stubCandidate = null;
+ boolean ok = false;
+
+ // Wait until the server starts up. Should be reasonably quick.
+ for (int i = 0; i < 20; i++) {
+ Thread.sleep(100);
+ PingRequest request = PingRequest.newBuilder()
+ .setCookie(impl.getRequestCookie())
+ .build();
+ if (impl.getAddress() == null) {
+ continue;
+ }
+
+ if (stubCandidate == null) {
+ stubCandidate = CommandServerGrpc.newBlockingStub(
+ NettyChannelBuilder.forAddress(impl.getAddress())
+ .usePlaintext(true)
+ .build());
+ }
+
+ try {
+ stubCandidate.ping(request);
+ ok = true;
+ break;
+ } catch (StatusRuntimeException e) {
+ continue;
+ }
+ }
+
+ if (!ok) {
+ throw new IllegalStateException("Server did not start up in time");
+ }
+
+ clientStub = stubCandidate;
+ }
+
+ private void finish() throws Exception {
+ impl.signalShutdown();
+ serverThread.joinAndAssertState(1000);
+ }
+ }
+
+ @Test
+ public void testRunCommand() throws Exception {
+ Server server = new Server();
+ when(server.commandExecutor.exec(any(List.class), any(OutErr.class), any(LockingMode.class),
+ any(String.class), any(Long.class)))
+ .thenReturn(42);
+
+ RunRequest request = RunRequest.newBuilder()
+ .setClientDescription("client")
+ .setCookie(server.impl.getRequestCookie())
+ .build();
+
+ Iterator<RunResponse> result = server.clientStub.run(request);
+ int exitCode = -1;
+ while (result.hasNext()) {
+ exitCode = result.next().getExitCode();
+ }
+ server.finish();
+ assertThat(exitCode).isEqualTo(42);
+ }
+
+ @Test
+ public void testIdleShutdownWhenWorkspaceDeleted() throws Exception {
+ Server server = new Server();
+ server.workspaceDirectory.delete();
+ server.serverThread.joinAndAssertState(10000); // We check the workspace dir every five seconds
+ }
}