diff options
Diffstat (limited to 'src')
4 files changed, 214 insertions, 32 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index e1f888d521..d6bcbb253c 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -108,17 +108,26 @@ public class GrpcServerImpl implements RPCServer { private static final long NANOSECONDS_IN_MS = TimeUnit.MILLISECONDS.toNanos(1); + private static final long NANOS_PER_IDLE_CHECK = + TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS); + + private class RunningCommand implements AutoCloseable { private final Thread thread; private final String id; - private RunningCommand() { + private RunningCommand() throws InterruptedException { thread = Thread.currentThread(); id = UUID.randomUUID().toString(); synchronized (runningCommands) { if (runningCommands.isEmpty()) { busy(); } + + if (shuttingDown) { + throw new InterruptedException(); + } + runningCommands.put(id, this); runningCommands.notify(); } @@ -444,21 +453,56 @@ public class GrpcServerImpl implements RPCServer { } } + // The synchronized block is here so that if the "PID file deleted" timer or the idle shutdown + // mechanism kicks in during a regular shutdown, they don't race. + @VisibleForTesting // productionVisibility = Visibility.PRIVATE + void signalShutdown() { + synchronized (runningCommands) { + shuttingDown = true; + server.shutdown(); + } + } + /** - * A thread that watches if the PID file changes and shuts down the server immediately if so. + * A thread that shuts the server down under the following conditions: + * <ul> + * <li>The PID file changes (in this case, *very* quickly)</li> + * <li>The workspace directory is deleted</li> + * <li>There is too much memory pressure on the host</li> + * </ul> */ - private class PidFileWatcherThread extends Thread { - private boolean shuttingDown = false; + private class ShutdownWatcherThread extends Thread { + private long lastIdleCheckNanos; - private PidFileWatcherThread() { - super("pid-file-watcher"); + private ShutdownWatcherThread() { + super("grpc-server-shutdown-watcher"); setDaemon(true); } - // The synchronized block is here so that if the "PID file deleted" timer kicks in during a - // regular shutdown, they don't race. - private synchronized void signalShutdown() { - shuttingDown = true; + private void doIdleChecksMaybe() { + synchronized (runningCommands) { + if (!runningCommands.isEmpty()) { + lastIdleCheckNanos = -1; + return; + } + + long currentNanos = BlazeClock.nanoTime(); + if (lastIdleCheckNanos == -1) { + lastIdleCheckNanos = currentNanos; + return; + } + + if (currentNanos - lastIdleCheckNanos < NANOS_PER_IDLE_CHECK) { + return; + } + + if (!idleServerTasks.continueProcessing()) { + signalShutdown(); + server.shutdown(); + } + + lastIdleCheckNanos = currentNanos; + } } @Override @@ -474,8 +518,12 @@ public class GrpcServerImpl implements RPCServer { // Handled by virtue of ok not being set to true } + if (ok) { + doIdleChecksMaybe(); + } + if (!ok) { - synchronized (PidFileWatcherThread.this) { + synchronized (ShutdownWatcherThread.this) { if (shuttingDown) { log.warning("PID file deleted or overwritten but shutdown is already in progress"); break; @@ -511,14 +559,16 @@ public class GrpcServerImpl implements RPCServer { private final String responseCookie; private final AtomicLong interruptCounter = new AtomicLong(0); private final int maxIdleSeconds; - private final PidFileWatcherThread pidFileWatcherThread; + private final ShutdownWatcherThread shutdownWatcherThread; private final Path pidFile; private final String pidInFile; private Server server; private IdleServerTasks idleServerTasks; private final int port; - boolean serving; + private InetSocketAddress address; + private boolean serving; + private boolean shuttingDown = false; public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, Path workspace, Path serverDirectory, int maxIdleSeconds) throws IOException { @@ -549,12 +599,22 @@ public class GrpcServerImpl implements RPCServer { requestCookie = generateCookie(random, 16); responseCookie = generateCookie(random, 16); - pidFileWatcherThread = new PidFileWatcherThread(); - pidFileWatcherThread.start(); + shutdownWatcherThread = new ShutdownWatcherThread(); + shutdownWatcherThread.start(); idleServerTasks = new IdleServerTasks(workspace); idleServerTasks.idle(); } + @VisibleForTesting // productionVisibility = Visibility.PRIVATE + String getRequestCookie() { + return requestCookie; + } + + @VisibleForTesting // productionVisibility = Visibility.PRIVATE + InetSocketAddress getAddress() { + return address; + } + private void idle() { Preconditions.checkState(idleServerTasks == null); idleServerTasks = new IdleServerTasks(workspace); @@ -642,7 +702,7 @@ public class GrpcServerImpl implements RPCServer { } } - server.shutdown(); + signalShutdown(); } /** @@ -668,7 +728,7 @@ public class GrpcServerImpl implements RPCServer { @Override public void prepareForAbruptShutdown() { disableShutdownHooks(); - pidFileWatcherThread.signalShutdown(); + signalShutdown(); } @Override @@ -723,7 +783,7 @@ public class GrpcServerImpl implements RPCServer { timeoutThread.start(); } serving = true; - + this.address = new InetSocketAddress(address.getAddress(), server.getPort()); writeServerFile( PORT_FILE, InetAddresses.toUriString(address.getAddress()) + ":" + server.getPort()); writeServerFile(REQUEST_COOKIE_FILE, requestCookie); @@ -783,7 +843,8 @@ public class GrpcServerImpl implements RPCServer { log.severe(err.toString()); } - private void executeCommand( + @VisibleForTesting // productionVisibility = Visibility.PRIVATE + void executeCommand( RunRequest request, StreamObserver<RunResponse> observer, GrpcSink sink) { sink.setCommandThread(Thread.currentThread()); @@ -878,8 +939,7 @@ public class GrpcServerImpl implements RPCServer { } if (commandExecutor.shutdown()) { - pidFileWatcherThread.signalShutdown(); - server.shutdown(); + signalShutdown(); } } @@ -912,6 +972,8 @@ public class GrpcServerImpl implements RPCServer { streamObserver.onNext(response.build()); streamObserver.onCompleted(); + } catch (InterruptedException e) { + // Ignore, we are shutting down anyway } } @@ -959,6 +1021,8 @@ public class GrpcServerImpl implements RPCServer { log.info("Client cancelled RPC of cancellation request for " + request.getCommandId()); } + } catch (InterruptedException e) { + // Ignore, we are shutting down anyway } } }; diff --git a/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java b/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java index f37e97ae9f..efcbc3f6eb 100644 --- a/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java +++ b/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java @@ -16,6 +16,7 @@ package com.google.devtools.build.lib.server; import com.google.devtools.build.lib.profiler.AutoProfiler; import com.google.devtools.build.lib.unix.ProcMeminfoParser; +import com.google.devtools.build.lib.util.BlazeClock; import com.google.devtools.build.lib.util.LoggingUtil; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.vfs.FileStatus; @@ -35,11 +36,12 @@ import javax.annotation.Nullable; */ class IdleServerTasks { + private long idleStart; private final Path workspaceDir; private final ScheduledThreadPoolExecutor executor; - private static final Logger LOG = Logger.getLogger(IdleServerTasks.class.getName()); + private static final Logger log = Logger.getLogger(IdleServerTasks.class.getName()); - private static final long FIVE_MIN_MILLIS = 1000 * 60 * 5; + private static final long FIVE_MIN_NANOS = 1000L * 1000 * 1000 * 60 * 5; /** * Must be called from the main thread. @@ -56,6 +58,7 @@ class IdleServerTasks { public void idle() { Preconditions.checkState(!executor.isShutdown()); + idleStart = BlazeClock.nanoTime(); // Do a GC cycle while the server is idle. @SuppressWarnings("unused") Future<?> possiblyIgnoredError = @@ -63,7 +66,7 @@ class IdleServerTasks { new Runnable() { @Override public void run() { - try (AutoProfiler p = AutoProfiler.logged("Idle GC", LOG)) { + try (AutoProfiler p = AutoProfiler.logged("Idle GC", log)) { System.gc(); } } @@ -105,8 +108,8 @@ class IdleServerTasks { * Return true iff the server should continue processing requests. * Called from the main thread, so it should return quickly. */ - public boolean continueProcessing(long idleMillis) { - if (!memoryHeuristic(idleMillis)) { + public boolean continueProcessing() { + if (!memoryHeuristic()) { return false; } if (workspaceDir == null) { @@ -124,8 +127,10 @@ class IdleServerTasks { return stat != null && stat.isDirectory(); } - private boolean memoryHeuristic(long idleMillis) { - if (idleMillis < FIVE_MIN_MILLIS) { + private boolean memoryHeuristic() { + Preconditions.checkState(!executor.isShutdown()); + long idleNanos = BlazeClock.nanoTime() - idleStart; + if (idleNanos < FIVE_MIN_NANOS) { // Don't check memory health until after five minutes. return true; } @@ -134,11 +139,12 @@ class IdleServerTasks { try { memInfo = new ProcMeminfoParser(); } catch (IOException e) { - LOG.info("Could not process /proc/meminfo: " + e); + log.info("Could not process /proc/meminfo: " + e); return true; } - long totalPhysical, totalFree; + long totalPhysical; + long totalFree; try { totalPhysical = memInfo.getTotalKb(); totalFree = memInfo.getFreeRamKb(); // See method javadoc. @@ -153,8 +159,8 @@ class IdleServerTasks { // If the system as a whole is low on memory, let this server die. if (fractionFree < .1) { - LOG.info("Terminating due to memory constraints"); - LOG.info(String.format("Total physical:%d\nTotal free: %d\n", + log.info("Terminating due to memory constraints"); + log.info(String.format("Total physical:%d\nTotal free: %d\n", totalPhysical, totalFree)); return false; } diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD index 7f26b3e773..78f77e2d96 100644 --- a/src/test/java/com/google/devtools/build/lib/BUILD +++ b/src/test/java/com/google/devtools/build/lib/BUILD @@ -737,6 +737,7 @@ java_test( ":testutil", "//src/main/java/com/google/devtools/build/lib:bazel-main", "//src/main/java/com/google/devtools/build/lib:collect", + "//src/main/java/com/google/devtools/build/lib:inmemoryfs", "//src/main/java/com/google/devtools/build/lib:io", "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib:unix", diff --git a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java index 7d504d347e..cfb6b5c02c 100644 --- a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java +++ b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java @@ -23,18 +23,32 @@ import static org.mockito.Mockito.when; import com.google.common.base.Strings; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; +import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; +import com.google.devtools.build.lib.runtime.CommandExecutor; +import com.google.devtools.build.lib.server.CommandProtos.PingRequest; +import com.google.devtools.build.lib.server.CommandProtos.RunRequest; import com.google.devtools.build.lib.server.CommandProtos.RunResponse; +import com.google.devtools.build.lib.server.CommandServerGrpc.CommandServerBlockingStub; import com.google.devtools.build.lib.server.GrpcServerImpl.StreamType; import com.google.devtools.build.lib.testutil.Suite; import com.google.devtools.build.lib.testutil.TestSpec; import com.google.devtools.build.lib.testutil.TestThread; import com.google.devtools.build.lib.testutil.TestUtils; +import com.google.devtools.build.lib.util.BlazeClock; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.vfs.FileSystem; +import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.ServerCallStreamObserver; import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -334,4 +348,101 @@ public class GrpcServerTest { .setStandardOutput(ByteString.copyFrom(chunk3.getBytes(StandardCharsets.ISO_8859_1))) .build()); } + + private static class Server { + private final Path workspaceDirectory; + private final Path serverDirectory; + private final CommandExecutor commandExecutor; + private final GrpcServerImpl impl; + private final TestThread serverThread; + private final CommandServerBlockingStub clientStub; + + private Server() throws Exception { + FileSystem fs = new InMemoryFileSystem(BlazeClock.instance()); + workspaceDirectory = fs.getPath("/workspace"); + workspaceDirectory.createDirectory(); + serverDirectory = fs.getPath("/server"); + serverDirectory.createDirectory(); + Path pidfile = fs.getPath("/server/server.pid.txt"); + FileSystemUtils.writeContentAsLatin1(pidfile, "1234"); + commandExecutor = mock(CommandExecutor.class); + impl = new GrpcServerImpl( + commandExecutor, BlazeClock.instance(), 0, workspaceDirectory, serverDirectory, 1000); + serverThread = new TestThread() { + @Override + public void runTest() throws Exception { + impl.serve(); + } + }; + + serverThread.start(); + CommandServerBlockingStub stubCandidate = null; + boolean ok = false; + + // Wait until the server starts up. Should be reasonably quick. + for (int i = 0; i < 20; i++) { + Thread.sleep(100); + PingRequest request = PingRequest.newBuilder() + .setCookie(impl.getRequestCookie()) + .build(); + if (impl.getAddress() == null) { + continue; + } + + if (stubCandidate == null) { + stubCandidate = CommandServerGrpc.newBlockingStub( + NettyChannelBuilder.forAddress(impl.getAddress()) + .usePlaintext(true) + .build()); + } + + try { + stubCandidate.ping(request); + ok = true; + break; + } catch (StatusRuntimeException e) { + continue; + } + } + + if (!ok) { + throw new IllegalStateException("Server did not start up in time"); + } + + clientStub = stubCandidate; + } + + private void finish() throws Exception { + impl.signalShutdown(); + serverThread.joinAndAssertState(1000); + } + } + + @Test + public void testRunCommand() throws Exception { + Server server = new Server(); + when(server.commandExecutor.exec(any(List.class), any(OutErr.class), any(LockingMode.class), + any(String.class), any(Long.class))) + .thenReturn(42); + + RunRequest request = RunRequest.newBuilder() + .setClientDescription("client") + .setCookie(server.impl.getRequestCookie()) + .build(); + + Iterator<RunResponse> result = server.clientStub.run(request); + int exitCode = -1; + while (result.hasNext()) { + exitCode = result.next().getExitCode(); + } + server.finish(); + assertThat(exitCode).isEqualTo(42); + } + + @Test + public void testIdleShutdownWhenWorkspaceDeleted() throws Exception { + Server server = new Server(); + server.workspaceDirectory.delete(); + server.serverThread.joinAndAssertState(10000); // We check the workspace dir every five seconds + } } |