aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/server
diff options
context:
space:
mode:
authorGravatar Han-Wen Nienhuys <hanwen@google.com>2015-02-25 16:45:20 +0100
committerGravatar Han-Wen Nienhuys <hanwen@google.com>2015-02-25 16:45:20 +0100
commitd08b27fa9701fecfdb69e1b0d1ac2459efc2129b (patch)
tree5d50963026239ca5aebfb47ea5b8db7e814e57c8 /src/main/java/com/google/devtools/build/lib/server
Update from Google.
-- MOE_MIGRATED_REVID=85702957
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java158
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/RPCServer.java562
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/RPCService.java95
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/ServerCommand.java38
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/ServerResponse.java114
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/signal/InterruptSignalHandler.java58
6 files changed, 1025 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java b/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java
new file mode 100644
index 0000000000..ad3e475cb8
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java
@@ -0,0 +1,158 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import com.google.common.base.Preconditions;
+import com.google.devtools.build.lib.util.LoggingUtil;
+import com.google.devtools.build.lib.util.ProcMeminfoParser;
+import com.google.devtools.build.lib.vfs.FileStatus;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.Symlinks;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+/**
+ * Run cleanup-related tasks during idle periods in the server.
+ * idle() and busy() must be called in that order, and only once.
+ */
+class IdleServerTasks {
+
+ private final Path workspaceDir;
+ private final ScheduledThreadPoolExecutor executor;
+ private static final Logger LOG = Logger.getLogger(IdleServerTasks.class.getName());
+
+ private static final long FIVE_MIN_MILLIS = 1000 * 60 * 5;
+
+ /**
+ * Must be called from the main thread.
+ */
+ public IdleServerTasks(@Nullable Path workspaceDir) {
+ this.executor = new ScheduledThreadPoolExecutor(1);
+ this.workspaceDir = workspaceDir;
+ }
+
+ /**
+ * Called when the server becomes idle. Should not block, but may invoke
+ * new threads.
+ */
+ public void idle() {
+ Preconditions.checkState(!executor.isShutdown());
+
+ // Do a GC cycle while the server is idle.
+ executor.schedule(new Runnable() {
+ @Override public void run() {
+ long before = System.currentTimeMillis();
+ System.gc();
+ LOG.info("Idle GC: " + (System.currentTimeMillis() - before) + "ms");
+ }
+ }, 10, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Called by the main thread when the server gets to work.
+ * Should return quickly.
+ */
+ public void busy() {
+ Preconditions.checkState(!executor.isShutdown());
+
+ // Make sure tasks are finished after shutdown(), so they do not intefere
+ // with subsequent server invocations.
+ executor.shutdown();
+ executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+
+ boolean interrupted = false;
+ while (true) {
+ try {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
+ break;
+ } catch (InterruptedException e) {
+ // It's unsafe to leak threads - just reset the interrupt bit later.
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Return true iff the server should continue processing requests.
+ * Called from the main thread, so it should return quickly.
+ */
+ public boolean continueProcessing(long idleMillis) {
+ if (!memoryHeuristic(idleMillis)) {
+ return false;
+ }
+ if (workspaceDir == null) {
+ return false;
+ }
+
+ FileStatus stat;
+ try {
+ stat = workspaceDir.statIfFound(Symlinks.FOLLOW);
+ } catch (IOException e) {
+ // Do not terminate the server if the workspace is temporarily inaccessible, for example,
+ // if it is on a network filesystem and the connection is down.
+ return true;
+ }
+ return stat != null && stat.isDirectory();
+ }
+
+ private boolean memoryHeuristic(long idleMillis) {
+ if (idleMillis < FIVE_MIN_MILLIS) {
+ // Don't check memory health until after five minutes.
+ return true;
+ }
+
+ ProcMeminfoParser memInfo = null;
+ try {
+ memInfo = new ProcMeminfoParser();
+ } catch (IOException e) {
+ LOG.info("Could not process /proc/meminfo: " + e);
+ return true;
+ }
+
+ long totalPhysical, totalFree;
+ try {
+ totalPhysical = memInfo.getTotalKb();
+ totalFree = memInfo.getFreeRamKb(); // See method javadoc.
+ } catch (IllegalArgumentException e) {
+ // Ugly capture of unchecked exception, similar to that in
+ // LocalHostCapacity.
+ LoggingUtil.logToRemote(Level.WARNING,
+ "Could not read memInfo during idle query", e);
+ return true;
+ }
+ double fractionFree = (double) totalFree / totalPhysical;
+
+ // If the system as a whole is low on memory, let this server die.
+ if (fractionFree < .1) {
+ LOG.info("Terminating due to memory constraints");
+ LOG.info(String.format("Total physical:%d\nTotal free: %d\n",
+ totalPhysical, totalFree));
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
new file mode 100644
index 0000000000..a1e9982a73
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
@@ -0,0 +1,562 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.server.RPCService.UnknownCommandException;
+import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
+import com.google.devtools.build.lib.unix.FilesystemUtils;
+import com.google.devtools.build.lib.unix.LocalClientSocket;
+import com.google.devtools.build.lib.unix.LocalServerSocket;
+import com.google.devtools.build.lib.unix.LocalSocketAddress;
+import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.lib.util.ThreadUtils;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.util.io.StreamMultiplexer;
+import com.google.devtools.build.lib.vfs.Path;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+/**
+ * An RPCServer server is a Java object that sits and waits for RPC requests
+ * (the sit-and-wait is implemented in {@link #serve()}). These requests
+ * arrive via UNIX file sockets. The RPCServer then calls the application
+ * (which implements ServerCommand) to handle the request. (Since the Blaze
+ * server may need to stat hundreds of directories during initialization, this
+ * is a significant speedup.) The server thread will terminate after idling
+ * for a user-specified time.
+ *
+ * Note: If you are contemplating to call into the RPCServer from
+ * within Java, consider using the {@link RPCService} class instead.
+ */
+// TODO(bazel-team): Signal handling.
+// TODO(bazel-team): Gives clients status information when the server is busy. One
+// way to do this is to put the server status in a file (pid, the current
+// target, etc) in the server directory. Alternatively, we can have a separate
+// thread taking care of the server socket and put the information into socket
+// handshakes.
+// TODO(bazel-team): Use Reporter for server-side messages.
+public final class RPCServer {
+
+ private final Clock clock;
+ private final RPCService rpcService;
+ private final LocalServerSocket serverSocket;
+ private final long maxIdleMillis;
+ private final long statusCheckMillis;
+ private final Path serverDirectory;
+ private final Path workspaceDir;
+ private static final Logger LOG = Logger.getLogger(RPCServer.class.getName());
+ private volatile boolean lameDuck;
+
+ private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute.
+ private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0');
+
+ /**
+ * Create a new server instance. After creating the server, you can start it
+ * by calling the {@link #serve()} method.
+ *
+ * @param clock The clock to take time measurements
+ * @param rpcService The underlying service object, which takes
+ * care of dispatching to the {@link ServerCommand}
+ * instances, as requests arrive.
+ * @param maxIdleMillis The maximum time the server will wait idly.
+ * @param statusCheckPeriodMillis How long to wait between system status checks.
+ * @param serverDirectory Directory to put file socket and pid files, etc.
+ * @param workspaceDir The workspace. Used solely to ensure it persists.
+ * @throws IOException
+ */
+ public RPCServer(Clock clock, RPCService rpcService,
+ long maxIdleMillis, long statusCheckPeriodMillis,
+ Path serverDirectory, Path workspaceDir)
+ throws IOException {
+ this.clock = clock;
+ this.rpcService = rpcService;
+ this.maxIdleMillis = maxIdleMillis;
+ this.statusCheckMillis = statusCheckPeriodMillis;
+ this.serverDirectory = serverDirectory;
+ this.workspaceDir = workspaceDir;
+
+ this.serverSocket = openServerSocket();
+ serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis));
+ lameDuck = false;
+ }
+
+ /**
+ * Create a new server instance. After creating the server, you can start it
+ * by calling the {@link #serve()} method.
+ *
+ * @param clock The clock to take time measurements
+ * @param rpcService The underlying service object, which takes
+ * care of dispatching to the {@link ServerCommand}
+ * instances, as requests arrive.
+ * @param maxIdleMillis The maximum time the server will wait idly.
+ * @param serverDirectory Directory to put file socket and pid files, etc.
+ * @param workspaceDir The workspace. Used solely to ensure it persists.
+ * @throws IOException
+ */
+ public RPCServer(Clock clock, RPCService rpcService,
+ long maxIdleMillis, Path serverDirectory, Path workspaceDir)
+ throws IOException {
+ this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS,
+ serverDirectory, workspaceDir);
+ }
+
+ private static void printStack(IOException e) {
+ /*
+ * Hopefully this never happens. It's not very nice to just write this
+ * to the user's console, but I'm not sure what better choice we have.
+ */
+ StringWriter err = new StringWriter();
+ PrintWriter printErr = new PrintWriter(err);
+ printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]=======");
+ e.printStackTrace(printErr);
+ printErr.println("=====================================================");
+ LOG.severe(err.toString());
+ }
+
+ /**
+ * Wait on a socket for business (answer requests). Note that this
+ * method won't return until the server shuts down.
+ */
+ public void serve() {
+ // Register the signal handler.
+ final AtomicBoolean inAction = new AtomicBoolean(false);
+ final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
+ final AtomicLong cmdNum = new AtomicLong();
+ final Thread mainThread = Thread.currentThread();
+ final Object interruptLock = new Object();
+
+ InterruptSignalHandler sigintHandler = new InterruptSignalHandler() {
+ @Override
+ public void run() {
+ LOG.severe("User interrupt");
+
+ // Only interrupt during actions - otherwise we may end up setting the interrupt bit
+ // at the end of a build and responding to it at the beginning of the subsequent build.
+ synchronized (interruptLock) {
+ if (allowingInterrupt.get()) {
+ mainThread.interrupt();
+ }
+ }
+
+ Runnable interruptWatcher = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long originalCmd = cmdNum.get();
+ Thread.sleep(10 * 1000);
+ if (inAction.get() && cmdNum.get() == originalCmd) {
+ // We're still operating on the same command.
+ // Interrupt took too long.
+ ThreadUtils.warnAboutSlowInterrupt();
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ };
+
+ if (inAction.get()) {
+ Thread interruptWatcherThread =
+ new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ }
+ }
+ };
+
+ try {
+ while (!lameDuck) {
+ try {
+ IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir);
+ idleChecker.idle();
+ RequestIo requestIo;
+
+ long startTime = clock.currentTimeMillis();
+ while (true) {
+ try {
+ allowingInterrupt.set(true);
+ Socket socket = serverSocket.accept();
+ long firstContactTime = clock.currentTimeMillis();
+ requestIo = new RequestIo(socket, firstContactTime);
+ break;
+ } catch (SocketTimeoutException e) {
+ long idleTime = clock.currentTimeMillis() - startTime;
+ if (lameDuck) {
+ closeServerSocket();
+ return;
+ } else if (idleTime > maxIdleMillis ||
+ (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) {
+ enterLameDuck();
+ }
+ }
+ }
+ idleChecker.busy();
+
+ try {
+ cmdNum.incrementAndGet();
+ inAction.set(true);
+ executeRequest(requestIo);
+ } finally {
+ inAction.set(false);
+ synchronized (interruptLock) {
+ allowingInterrupt.set(false);
+ Thread.interrupted(); // clears thread interrupted status
+ }
+ requestIo.shutdown();
+ if (rpcService.isShutdown()) {
+ return;
+ }
+ }
+ } catch (IOException e) {
+ if (e.getMessage().equals("Broken pipe")) {
+ LOG.info("Connection to the client lost: "
+ + e.getMessage());
+ } else {
+ // Other cases: print the stack for debugging.
+ printStack(e);
+ }
+ }
+ }
+ } finally {
+ rpcService.shutdown();
+ LOG.info("Logging finished");
+ sigintHandler.uninstall();
+ }
+ }
+
+ private void closeServerSocket() {
+ LOG.info("Closing serverSocket.");
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+
+ if (!lameDuck) {
+ try {
+ getSocketPath().delete();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+ }
+
+ /**
+ * Allow one last request to be serviced.
+ */
+ private void enterLameDuck() {
+ lameDuck = true;
+ try {
+ getSocketPath().delete();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ serverSocket.setSoTimeout(1);
+ }
+
+ /**
+ * Returns the path of the socket file to be used.
+ */
+ public Path getSocketPath() {
+ return serverDirectory.getRelative("server.socket");
+ }
+
+ /**
+ * Ensures no other server is running for the current socket file. This
+ * guarantees that no two servers are running against the same output
+ * directory.
+ *
+ * @throws IOException if another server holds the lock for the socket file.
+ */
+ public static void ensureExclusiveAccess(Path socketFile) throws IOException {
+ LocalSocketAddress address =
+ new LocalSocketAddress(socketFile.getPathFile());
+ if (socketFile.exists()) {
+ try {
+ new LocalClientSocket(address).close();
+ } catch (IOException e) {
+ // The previous server process is dead--unlink the file:
+ socketFile.delete();
+ return;
+ }
+ // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message
+ // and add it to the message.
+ throw new IOException("Socket file " + socketFile.getPathString()
+ + " is locked by another server");
+ }
+ }
+
+ /**
+ * Schedule the specified file for (attempted) deletion at JVM exit.
+ */
+ private static void deleteAtExit(final Path socketFile, final boolean deleteParent) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ socketFile.delete();
+ if (deleteParent) {
+ socketFile.getParentDirectory().delete();
+ }
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Opens a UNIX local server socket.
+ * @throws IOException if the socket file is used by another server or can
+ * not be made exclusive.
+ */
+ private LocalServerSocket openServerSocket() throws IOException {
+ // This is the "well known" socket path via which the server is found...
+ Path socketFile = getSocketPath();
+
+ // ...but it may have a name that's too long for AF_UNIX, in which case we
+ // make it a symlink to /tmp/something. This typically only happens in
+ // tests where the --output_base is beneath a very deep temp dir.
+ // (All this extra complexity is just used in tests... *sigh*).
+ if (socketFile.toString().length() >= 108) { // = UNIX_PATH_MAX
+ Path socketLink = socketFile;
+ String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", "/tmp");
+ socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)).
+ getRelative("server.socket");
+ LOG.info("Using symlinked socket at " + socketFile);
+
+ socketLink.delete(); // Remove stale symlink, if any.
+ socketLink.createSymbolicLink(socketFile);
+
+ deleteAtExit(socketLink, /*deleteParent=*/false);
+ deleteAtExit(socketFile, /*deleteParent=*/true);
+ } else {
+ deleteAtExit(socketFile, /*deleteParent=*/false);
+ }
+
+ ensureExclusiveAccess(socketFile);
+
+ LocalServerSocket serverSocket = new LocalServerSocket();
+ serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile()));
+ FilesystemUtils.chmod(socketFile.getPathFile(), 0600); // Lock it down.
+ serverSocket.listen(/*backlog=*/50);
+ return serverSocket;
+ }
+
+ // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a
+ // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it
+ // succeeds.
+ private static Path createTempSocketDirectory(Path tempDir) {
+ Random random = new Random();
+ while (true) {
+ Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt()));
+ try {
+ if (socketDir.createDirectory()) {
+ // Make sure it's private; unfortunately, createDirectory() doesn't take a mode
+ // argument.
+ socketDir.chmod(0700);
+ return socketDir; // Created.
+ }
+ // Already existed; try again.
+ } catch (IOException e) {
+ // Failed; try again.
+ }
+ }
+ }
+
+ /**
+ * Read a string in platform default encoding and split it into a list of
+ * NUL-separated words.
+ *
+ * <p>Blaze consistently uses the platform default encoding (defined in
+ * blaze.cc) to interface with Unix APIs.
+ */
+ private static List<String> readRequest(InputStream input) throws IOException {
+ byte[] inputBytes = ByteStreams.toByteArray(input);
+ if (inputBytes.length == 0) {
+ return null;
+ }
+ String s = new String(inputBytes, Charset.defaultCharset());
+ return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s));
+ }
+
+ private void executeRequest(RequestIo requestIo) {
+ int exitStatus = 2;
+ try {
+ List<String> request = readRequest(requestIo.in);
+ if (request == null) {
+ LOG.info("Short-circuiting empty request");
+ return;
+ }
+ exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr,
+ requestIo.firstContactTime);
+ LOG.info("Finished executing request");
+ } catch (UnknownCommandException e) {
+ requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage());
+ LOG.severe("SERVER ERROR: " + e.getMessage());
+ } catch (Exception e) {
+ // Stacktrace for unknown exception.
+ StringWriter trace = new StringWriter();
+ e.printStackTrace(new PrintWriter(trace, true));
+ requestIo.requestOutErr.printErr("SERVER ERROR: " + trace);
+ LOG.severe("SERVER ERROR: " + trace);
+ }
+
+ if (rpcService.isShutdown()) {
+ // In case of shutdown, disable the listening socket *before* we write
+ // the last part of the response. Otherwise, a sufficiently fast client
+ // could read the response and exit, and a new client could make a
+ // connection to this server, which is still in the listening state, even
+ // though it is about to shut down imminently.
+ closeServerSocket();
+ }
+
+ requestIo.writeExitStatus(exitStatus);
+ }
+
+ /**
+ * Because it's a little complicated, this class factors out all the IO Hook
+ * up we need per request, that is, in
+ * {@link RPCServer#executeRequest(RequestIo)}.
+ * It's unfortunately complicated, so it's explained here.
+ */
+ private static class RequestIo {
+
+ // Used by the client code
+ private final InputStream in;
+ private final OutErr requestOutErr;
+ private final OutputStream controlChannel;
+
+ // just used by this class to keep the state around
+ private final Socket requestSocket;
+ private final OutputStream requestOut;
+ private final long firstContactTime;
+
+ RequestIo(Socket requestSocket, long firstContactTime) throws IOException {
+ this.requestSocket = requestSocket;
+ this.firstContactTime = firstContactTime;
+ this.in = requestSocket.getInputStream();
+ this.requestOut = requestSocket.getOutputStream();
+
+ // We encode the response sent to the client with a multiplexer so
+ // we can send three streams (out / err / control) over one wire stream
+ // (requestOut).
+ StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut);
+
+ // We'll be writing control messages (exit code + out of date message)
+ // to this control channel.
+ controlChannel = multiplexer.createControl();
+
+ // This is the outErr part of the multiplexed output.
+ requestOutErr = OutErr.create(multiplexer.createStdout(),
+ multiplexer.createStderr());
+ // We hook up System.out / System.err to our IO object. Stuff written to
+ // System.out / System.err will show up on the user's screen, prefixed
+ // with "System.out "/"System.err ".
+ requestOutErr.addSystemOutErrAsSource();
+ }
+
+ public void writeExitStatus(int exitStatus) {
+ // Make sure to flush the output / error streams prior to writing the exit status.
+ // The client may stop reading that direction of the socket immediately upon reading the
+ // exit code.
+ flushOutErr();
+ try {
+ controlChannel.write(("" + exitStatus + "\n").getBytes(UTF_8));
+ controlChannel.flush();
+ LOG.info("" + exitStatus);
+ } catch (IOException ignored) {
+ // This exception is historically ignored.
+ }
+ }
+
+ private void flushOutErr() {
+ try {
+ requestOutErr.getOutputStream().flush();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ requestOutErr.getErrorStream().flush();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+
+ public void shutdown() {
+ try {
+ requestOut.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ in.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ requestSocket.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+ }
+
+ /**
+ * Creates and returns a new RPC server.
+ * Use {@link RPCServer#serve()} to start the server.
+ *
+ * @param appCommand The application's ServerCommand implementation.
+ * @param serverDirectory The directory for server-related files. The caller
+ * must ensure the directory has been created.
+ * @param workspaceDir The workspace, used solely to ensure it persists.
+ * @param maxIdleSeconds The idle time in seconds after which the rpc
+ * server will die unless it receives a request.
+ */
+ public static RPCServer newServerWith(Clock clock,
+ ServerCommand appCommand,
+ Path serverDirectory,
+ Path workspaceDir,
+ int maxIdleSeconds)
+ throws IOException {
+ if (!serverDirectory.exists()) {
+ serverDirectory.createDirectory();
+ }
+
+ // Creates and starts the RPC server.
+ RPCService service = new RPCService(appCommand);
+
+ return new RPCServer(clock, service, maxIdleSeconds * 1000L,
+ serverDirectory, workspaceDir);
+ }
+
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCService.java b/src/main/java/com/google/devtools/build/lib/server/RPCService.java
new file mode 100644
index 0000000000..379e83ce2f
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCService.java
@@ -0,0 +1,95 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import com.google.devtools.build.lib.util.io.OutErr;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * An RPCService is a Java object that can process RPC requests. Requests may
+ * be of the form:
+ * <pre>
+ * blaze <blaze-arguments>
+ * </pre>
+ * Requests are delegated to the ServerCommand instance provided
+ * to the constructor.
+ */
+public final class RPCService {
+
+ private boolean isShutdown;
+ private static final Logger LOG = Logger.getLogger(RPCService.class.getName());
+ private final ServerCommand appCommand;
+
+ public RPCService(ServerCommand appCommand) {
+ this.appCommand = appCommand;
+ }
+
+ /**
+ * The {@link #executeRequest(List, OutErr, long)} method may
+ * throw this exception if a command is unknown to the RPC service.
+ */
+ public static class UnknownCommandException extends Exception {
+ private static final long serialVersionUID = 1L;
+ UnknownCommandException(String command) {
+ super("Unknown command: " + command);
+ }
+ }
+
+ /**
+ * Executes the request; returns Unix like return codes (0 means success). May
+ * also throw arbitrary exceptions.
+ */
+ public int executeRequest(List<String> request,
+ OutErr outErr,
+ long firstContactTime) throws Exception {
+ if (isShutdown) {
+ throw new IllegalStateException("Received request after shutdown.");
+ }
+ String command = request.isEmpty() ? "" : request.get(0);
+ if (appCommand != null && command.equals("blaze")) { // an application request
+ int result = appCommand.exec(request.subList(1, request.size()), outErr, firstContactTime);
+ if (appCommand.shutdown()) { // an application shutdown request
+ shutdown();
+ }
+ return result;
+ } else {
+ throw new UnknownCommandException(command);
+ }
+ }
+
+ /**
+ * After executing this function, further requests will fail, and
+ * {@link #isShutdown()} will return true.
+ */
+ public void shutdown() {
+ if (isShutdown) {
+ return;
+ }
+ LOG.info("RPC Service: shutting down ...");
+ isShutdown = true;
+ }
+
+ /**
+ * Has this service been shutdown. If so, any call to
+ * {@link #executeRequest(List, OutErr, long)} will result in an
+ * {@link IllegalStateException}
+ */
+ public boolean isShutdown() {
+ return isShutdown;
+ }
+
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/ServerCommand.java b/src/main/java/com/google/devtools/build/lib/server/ServerCommand.java
new file mode 100644
index 0000000000..972753c924
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/ServerCommand.java
@@ -0,0 +1,38 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.server;
+
+import com.google.devtools.build.lib.util.io.OutErr;
+
+import java.util.List;
+
+/**
+ * The {@link RPCServer} calls an arbitrary command implementing this
+ * interface.
+ */
+public interface ServerCommand {
+
+ /**
+ * Executes the request, writing any output or error messages into err.
+ * Returns 0 on success; any other value or exception indicates an error.
+ */
+ int exec(List<String> args, OutErr outErr, long firstContactTime) throws Exception;
+
+ /**
+ * The implementation returns true from this method to initiate a shutdown.
+ * No further requests will be handled.
+ */
+ boolean shutdown();
+
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java b/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java
new file mode 100644
index 0000000000..e5ab9300f7
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java
@@ -0,0 +1,114 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import com.google.common.base.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.UnsupportedEncodingException;
+
+/**
+ * This class models a response from the {@link RPCServer}. This is a
+ * tuple of an error message and the exit status. The encoding of the response
+ * is extremely simple {@link #toString()}:
+ *
+ * <ul><li>Iff a message is present, the wire format is
+ * <pre>message + '\n' + exit code as string + '\n'</pre>
+ * </li>
+ * <li>Otherwise it's just the exit code as string + '\n'</li>
+ * </ul>
+ */
+final class ServerResponse {
+
+ /**
+ * Parses an input string into a {@link ServerResponse} object.
+ */
+ public static ServerResponse parseFrom(String input) {
+ if (input.charAt(input.length() - 1) != '\n') {
+ String msg = "Response must end with newline (" + input + ")";
+ throw new IllegalArgumentException(msg);
+ }
+ int newlineAt = input.lastIndexOf('\n', input.length() - 2);
+
+ final String exitStatusString;
+ final String errorMessage;
+ if (newlineAt == -1) {
+ errorMessage = "";
+ exitStatusString = input.substring(0, input.length() - 1);
+ } else {
+ errorMessage = input.substring(0, newlineAt);
+ exitStatusString = input.substring(newlineAt + 1, input.length() - 1);
+ }
+
+ return new ServerResponse(errorMessage, Integer.parseInt(exitStatusString));
+ }
+
+ /**
+ * Parses {@code bytes} into a {@link ServerResponse} instance, assuming
+ * Latin 1 encoding.
+ */
+ public static ServerResponse parseFrom(byte[] bytes) {
+ try {
+ return parseFrom(new String(bytes, "ISO-8859-1"));
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e); // Latin 1 is everywhere.
+ }
+ }
+
+ /**
+ * Parses {@code bytes} into a {@link ServerResponse} instance, assuming
+ * Latin 1 encoding.
+ */
+ public static ServerResponse parseFrom(ByteArrayOutputStream bytes) {
+ return parseFrom(bytes.toByteArray());
+ }
+
+ private final String errorMessage;
+ private final int exitStatus;
+
+ /**
+ * Construct a new instance given an error message and an exit status.
+ */
+ public ServerResponse(String errorMessage, int exitStatus) {
+ Preconditions.checkNotNull(errorMessage);
+ this.errorMessage = errorMessage;
+ this.exitStatus = exitStatus;
+ }
+
+ /**
+ * The wire representation of this response object.
+ */
+ @Override
+ public String toString() {
+ if (errorMessage.length() == 0) {
+ return Integer.toString(exitStatus) + '\n';
+ }
+ return errorMessage + '\n' + Integer.toString(exitStatus) + '\n';
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof ServerResponse)) return false;
+ ServerResponse otherResponse = (ServerResponse) other;
+ return exitStatus == otherResponse.exitStatus
+ && errorMessage.equals(otherResponse.errorMessage);
+ }
+
+ @Override
+ public int hashCode() {
+ return exitStatus * 31 ^ errorMessage.hashCode();
+ }
+
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/signal/InterruptSignalHandler.java b/src/main/java/com/google/devtools/build/lib/server/signal/InterruptSignalHandler.java
new file mode 100644
index 0000000000..521dcef3b7
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/signal/InterruptSignalHandler.java
@@ -0,0 +1,58 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.server.signal;
+
+
+import com.google.common.base.Preconditions;
+
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
+/**
+ * A facade around sun.misc.Signal providing special-purpose SIGINT handling.
+ *
+ * We use this code in preference to using sun.misc directly since the latter
+ * is deprecated, and depending on it causes the jdk1.6 javac to emit an
+ * unsuppressable warning that sun.misc is "Sun proprietary API and may be
+ * removed in a future release".
+ */
+public abstract class InterruptSignalHandler implements Runnable {
+
+ private static final Signal SIGINT = new Signal("INT");
+
+ private SignalHandler oldHandler;
+
+ /**
+ * Constructs an InterruptSignalHandler instance. Until the uninstall()
+ * method is invoked, the delivery of a SIGINT signal to this process will
+ * cause the run() method to be invoked in another thread.
+ */
+ protected InterruptSignalHandler() {
+ this.oldHandler = Signal.handle(SIGINT, new SignalHandler() {
+ @Override
+ public void handle(Signal signal) {
+ run();
+ }
+ });
+ }
+
+ /**
+ * Disables SIGINT handling.
+ */
+ public synchronized final void uninstall() {
+ Preconditions.checkNotNull(oldHandler, "uninstall() already called");
+ Signal.handle(SIGINT, oldHandler);
+ oldHandler = null;
+ }
+}