diff options
author | 2015-02-25 16:45:20 +0100 | |
---|---|---|
committer | 2015-02-25 16:45:20 +0100 | |
commit | d08b27fa9701fecfdb69e1b0d1ac2459efc2129b (patch) | |
tree | 5d50963026239ca5aebfb47ea5b8db7e814e57c8 /src/main/java/com/google/devtools/build/lib/server |
Update from Google.
--
MOE_MIGRATED_REVID=85702957
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server')
6 files changed, 1025 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java b/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java new file mode 100644 index 0000000000..ad3e475cb8 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/IdleServerTasks.java @@ -0,0 +1,158 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import com.google.common.base.Preconditions; +import com.google.devtools.build.lib.util.LoggingUtil; +import com.google.devtools.build.lib.util.ProcMeminfoParser; +import com.google.devtools.build.lib.vfs.FileStatus; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.Symlinks; + +import java.io.IOException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +/** + * Run cleanup-related tasks during idle periods in the server. + * idle() and busy() must be called in that order, and only once. + */ +class IdleServerTasks { + + private final Path workspaceDir; + private final ScheduledThreadPoolExecutor executor; + private static final Logger LOG = Logger.getLogger(IdleServerTasks.class.getName()); + + private static final long FIVE_MIN_MILLIS = 1000 * 60 * 5; + + /** + * Must be called from the main thread. + */ + public IdleServerTasks(@Nullable Path workspaceDir) { + this.executor = new ScheduledThreadPoolExecutor(1); + this.workspaceDir = workspaceDir; + } + + /** + * Called when the server becomes idle. Should not block, but may invoke + * new threads. + */ + public void idle() { + Preconditions.checkState(!executor.isShutdown()); + + // Do a GC cycle while the server is idle. + executor.schedule(new Runnable() { + @Override public void run() { + long before = System.currentTimeMillis(); + System.gc(); + LOG.info("Idle GC: " + (System.currentTimeMillis() - before) + "ms"); + } + }, 10, TimeUnit.SECONDS); + } + + /** + * Called by the main thread when the server gets to work. + * Should return quickly. + */ + public void busy() { + Preconditions.checkState(!executor.isShutdown()); + + // Make sure tasks are finished after shutdown(), so they do not intefere + // with subsequent server invocations. + executor.shutdown(); + executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + + boolean interrupted = false; + while (true) { + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS); + break; + } catch (InterruptedException e) { + // It's unsafe to leak threads - just reset the interrupt bit later. + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + + /** + * Return true iff the server should continue processing requests. + * Called from the main thread, so it should return quickly. + */ + public boolean continueProcessing(long idleMillis) { + if (!memoryHeuristic(idleMillis)) { + return false; + } + if (workspaceDir == null) { + return false; + } + + FileStatus stat; + try { + stat = workspaceDir.statIfFound(Symlinks.FOLLOW); + } catch (IOException e) { + // Do not terminate the server if the workspace is temporarily inaccessible, for example, + // if it is on a network filesystem and the connection is down. + return true; + } + return stat != null && stat.isDirectory(); + } + + private boolean memoryHeuristic(long idleMillis) { + if (idleMillis < FIVE_MIN_MILLIS) { + // Don't check memory health until after five minutes. + return true; + } + + ProcMeminfoParser memInfo = null; + try { + memInfo = new ProcMeminfoParser(); + } catch (IOException e) { + LOG.info("Could not process /proc/meminfo: " + e); + return true; + } + + long totalPhysical, totalFree; + try { + totalPhysical = memInfo.getTotalKb(); + totalFree = memInfo.getFreeRamKb(); // See method javadoc. + } catch (IllegalArgumentException e) { + // Ugly capture of unchecked exception, similar to that in + // LocalHostCapacity. + LoggingUtil.logToRemote(Level.WARNING, + "Could not read memInfo during idle query", e); + return true; + } + double fractionFree = (double) totalFree / totalPhysical; + + // If the system as a whole is low on memory, let this server die. + if (fractionFree < .1) { + LOG.info("Terminating due to memory constraints"); + LOG.info(String.format("Total physical:%d\nTotal free: %d\n", + totalPhysical, totalFree)); + return false; + } + + return true; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java new file mode 100644 index 0000000000..a1e9982a73 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java @@ -0,0 +1,562 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.server.RPCService.UnknownCommandException; +import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; +import com.google.devtools.build.lib.unix.FilesystemUtils; +import com.google.devtools.build.lib.unix.LocalClientSocket; +import com.google.devtools.build.lib.unix.LocalServerSocket; +import com.google.devtools.build.lib.unix.LocalSocketAddress; +import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.lib.util.ThreadUtils; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.util.io.StreamMultiplexer; +import com.google.devtools.build.lib.vfs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** + * An RPCServer server is a Java object that sits and waits for RPC requests + * (the sit-and-wait is implemented in {@link #serve()}). These requests + * arrive via UNIX file sockets. The RPCServer then calls the application + * (which implements ServerCommand) to handle the request. (Since the Blaze + * server may need to stat hundreds of directories during initialization, this + * is a significant speedup.) The server thread will terminate after idling + * for a user-specified time. + * + * Note: If you are contemplating to call into the RPCServer from + * within Java, consider using the {@link RPCService} class instead. + */ +// TODO(bazel-team): Signal handling. +// TODO(bazel-team): Gives clients status information when the server is busy. One +// way to do this is to put the server status in a file (pid, the current +// target, etc) in the server directory. Alternatively, we can have a separate +// thread taking care of the server socket and put the information into socket +// handshakes. +// TODO(bazel-team): Use Reporter for server-side messages. +public final class RPCServer { + + private final Clock clock; + private final RPCService rpcService; + private final LocalServerSocket serverSocket; + private final long maxIdleMillis; + private final long statusCheckMillis; + private final Path serverDirectory; + private final Path workspaceDir; + private static final Logger LOG = Logger.getLogger(RPCServer.class.getName()); + private volatile boolean lameDuck; + + private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute. + private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0'); + + /** + * Create a new server instance. After creating the server, you can start it + * by calling the {@link #serve()} method. + * + * @param clock The clock to take time measurements + * @param rpcService The underlying service object, which takes + * care of dispatching to the {@link ServerCommand} + * instances, as requests arrive. + * @param maxIdleMillis The maximum time the server will wait idly. + * @param statusCheckPeriodMillis How long to wait between system status checks. + * @param serverDirectory Directory to put file socket and pid files, etc. + * @param workspaceDir The workspace. Used solely to ensure it persists. + * @throws IOException + */ + public RPCServer(Clock clock, RPCService rpcService, + long maxIdleMillis, long statusCheckPeriodMillis, + Path serverDirectory, Path workspaceDir) + throws IOException { + this.clock = clock; + this.rpcService = rpcService; + this.maxIdleMillis = maxIdleMillis; + this.statusCheckMillis = statusCheckPeriodMillis; + this.serverDirectory = serverDirectory; + this.workspaceDir = workspaceDir; + + this.serverSocket = openServerSocket(); + serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis)); + lameDuck = false; + } + + /** + * Create a new server instance. After creating the server, you can start it + * by calling the {@link #serve()} method. + * + * @param clock The clock to take time measurements + * @param rpcService The underlying service object, which takes + * care of dispatching to the {@link ServerCommand} + * instances, as requests arrive. + * @param maxIdleMillis The maximum time the server will wait idly. + * @param serverDirectory Directory to put file socket and pid files, etc. + * @param workspaceDir The workspace. Used solely to ensure it persists. + * @throws IOException + */ + public RPCServer(Clock clock, RPCService rpcService, + long maxIdleMillis, Path serverDirectory, Path workspaceDir) + throws IOException { + this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS, + serverDirectory, workspaceDir); + } + + private static void printStack(IOException e) { + /* + * Hopefully this never happens. It's not very nice to just write this + * to the user's console, but I'm not sure what better choice we have. + */ + StringWriter err = new StringWriter(); + PrintWriter printErr = new PrintWriter(err); + printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]======="); + e.printStackTrace(printErr); + printErr.println("====================================================="); + LOG.severe(err.toString()); + } + + /** + * Wait on a socket for business (answer requests). Note that this + * method won't return until the server shuts down. + */ + public void serve() { + // Register the signal handler. + final AtomicBoolean inAction = new AtomicBoolean(false); + final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); + final AtomicLong cmdNum = new AtomicLong(); + final Thread mainThread = Thread.currentThread(); + final Object interruptLock = new Object(); + + InterruptSignalHandler sigintHandler = new InterruptSignalHandler() { + @Override + public void run() { + LOG.severe("User interrupt"); + + // Only interrupt during actions - otherwise we may end up setting the interrupt bit + // at the end of a build and responding to it at the beginning of the subsequent build. + synchronized (interruptLock) { + if (allowingInterrupt.get()) { + mainThread.interrupt(); + } + } + + Runnable interruptWatcher = new Runnable() { + @Override + public void run() { + try { + long originalCmd = cmdNum.get(); + Thread.sleep(10 * 1000); + if (inAction.get() && cmdNum.get() == originalCmd) { + // We're still operating on the same command. + // Interrupt took too long. + ThreadUtils.warnAboutSlowInterrupt(); + } + } catch (InterruptedException e) { + // Ignore. + } + } + }; + + if (inAction.get()) { + Thread interruptWatcherThread = + new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + } + } + }; + + try { + while (!lameDuck) { + try { + IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir); + idleChecker.idle(); + RequestIo requestIo; + + long startTime = clock.currentTimeMillis(); + while (true) { + try { + allowingInterrupt.set(true); + Socket socket = serverSocket.accept(); + long firstContactTime = clock.currentTimeMillis(); + requestIo = new RequestIo(socket, firstContactTime); + break; + } catch (SocketTimeoutException e) { + long idleTime = clock.currentTimeMillis() - startTime; + if (lameDuck) { + closeServerSocket(); + return; + } else if (idleTime > maxIdleMillis || + (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) { + enterLameDuck(); + } + } + } + idleChecker.busy(); + + try { + cmdNum.incrementAndGet(); + inAction.set(true); + executeRequest(requestIo); + } finally { + inAction.set(false); + synchronized (interruptLock) { + allowingInterrupt.set(false); + Thread.interrupted(); // clears thread interrupted status + } + requestIo.shutdown(); + if (rpcService.isShutdown()) { + return; + } + } + } catch (IOException e) { + if (e.getMessage().equals("Broken pipe")) { + LOG.info("Connection to the client lost: " + + e.getMessage()); + } else { + // Other cases: print the stack for debugging. + printStack(e); + } + } + } + } finally { + rpcService.shutdown(); + LOG.info("Logging finished"); + sigintHandler.uninstall(); + } + } + + private void closeServerSocket() { + LOG.info("Closing serverSocket."); + try { + serverSocket.close(); + } catch (IOException e) { + printStack(e); + } + + if (!lameDuck) { + try { + getSocketPath().delete(); + } catch (IOException e) { + printStack(e); + } + } + } + + /** + * Allow one last request to be serviced. + */ + private void enterLameDuck() { + lameDuck = true; + try { + getSocketPath().delete(); + } catch (IOException e) { + e.printStackTrace(); + } + serverSocket.setSoTimeout(1); + } + + /** + * Returns the path of the socket file to be used. + */ + public Path getSocketPath() { + return serverDirectory.getRelative("server.socket"); + } + + /** + * Ensures no other server is running for the current socket file. This + * guarantees that no two servers are running against the same output + * directory. + * + * @throws IOException if another server holds the lock for the socket file. + */ + public static void ensureExclusiveAccess(Path socketFile) throws IOException { + LocalSocketAddress address = + new LocalSocketAddress(socketFile.getPathFile()); + if (socketFile.exists()) { + try { + new LocalClientSocket(address).close(); + } catch (IOException e) { + // The previous server process is dead--unlink the file: + socketFile.delete(); + return; + } + // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message + // and add it to the message. + throw new IOException("Socket file " + socketFile.getPathString() + + " is locked by another server"); + } + } + + /** + * Schedule the specified file for (attempted) deletion at JVM exit. + */ + private static void deleteAtExit(final Path socketFile, final boolean deleteParent) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + socketFile.delete(); + if (deleteParent) { + socketFile.getParentDirectory().delete(); + } + } catch (IOException e) { + printStack(e); + } + } + }); + } + + /** + * Opens a UNIX local server socket. + * @throws IOException if the socket file is used by another server or can + * not be made exclusive. + */ + private LocalServerSocket openServerSocket() throws IOException { + // This is the "well known" socket path via which the server is found... + Path socketFile = getSocketPath(); + + // ...but it may have a name that's too long for AF_UNIX, in which case we + // make it a symlink to /tmp/something. This typically only happens in + // tests where the --output_base is beneath a very deep temp dir. + // (All this extra complexity is just used in tests... *sigh*). + if (socketFile.toString().length() >= 108) { // = UNIX_PATH_MAX + Path socketLink = socketFile; + String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", "/tmp"); + socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)). + getRelative("server.socket"); + LOG.info("Using symlinked socket at " + socketFile); + + socketLink.delete(); // Remove stale symlink, if any. + socketLink.createSymbolicLink(socketFile); + + deleteAtExit(socketLink, /*deleteParent=*/false); + deleteAtExit(socketFile, /*deleteParent=*/true); + } else { + deleteAtExit(socketFile, /*deleteParent=*/false); + } + + ensureExclusiveAccess(socketFile); + + LocalServerSocket serverSocket = new LocalServerSocket(); + serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile())); + FilesystemUtils.chmod(socketFile.getPathFile(), 0600); // Lock it down. + serverSocket.listen(/*backlog=*/50); + return serverSocket; + } + + // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a + // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it + // succeeds. + private static Path createTempSocketDirectory(Path tempDir) { + Random random = new Random(); + while (true) { + Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt())); + try { + if (socketDir.createDirectory()) { + // Make sure it's private; unfortunately, createDirectory() doesn't take a mode + // argument. + socketDir.chmod(0700); + return socketDir; // Created. + } + // Already existed; try again. + } catch (IOException e) { + // Failed; try again. + } + } + } + + /** + * Read a string in platform default encoding and split it into a list of + * NUL-separated words. + * + * <p>Blaze consistently uses the platform default encoding (defined in + * blaze.cc) to interface with Unix APIs. + */ + private static List<String> readRequest(InputStream input) throws IOException { + byte[] inputBytes = ByteStreams.toByteArray(input); + if (inputBytes.length == 0) { + return null; + } + String s = new String(inputBytes, Charset.defaultCharset()); + return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s)); + } + + private void executeRequest(RequestIo requestIo) { + int exitStatus = 2; + try { + List<String> request = readRequest(requestIo.in); + if (request == null) { + LOG.info("Short-circuiting empty request"); + return; + } + exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr, + requestIo.firstContactTime); + LOG.info("Finished executing request"); + } catch (UnknownCommandException e) { + requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage()); + LOG.severe("SERVER ERROR: " + e.getMessage()); + } catch (Exception e) { + // Stacktrace for unknown exception. + StringWriter trace = new StringWriter(); + e.printStackTrace(new PrintWriter(trace, true)); + requestIo.requestOutErr.printErr("SERVER ERROR: " + trace); + LOG.severe("SERVER ERROR: " + trace); + } + + if (rpcService.isShutdown()) { + // In case of shutdown, disable the listening socket *before* we write + // the last part of the response. Otherwise, a sufficiently fast client + // could read the response and exit, and a new client could make a + // connection to this server, which is still in the listening state, even + // though it is about to shut down imminently. + closeServerSocket(); + } + + requestIo.writeExitStatus(exitStatus); + } + + /** + * Because it's a little complicated, this class factors out all the IO Hook + * up we need per request, that is, in + * {@link RPCServer#executeRequest(RequestIo)}. + * It's unfortunately complicated, so it's explained here. + */ + private static class RequestIo { + + // Used by the client code + private final InputStream in; + private final OutErr requestOutErr; + private final OutputStream controlChannel; + + // just used by this class to keep the state around + private final Socket requestSocket; + private final OutputStream requestOut; + private final long firstContactTime; + + RequestIo(Socket requestSocket, long firstContactTime) throws IOException { + this.requestSocket = requestSocket; + this.firstContactTime = firstContactTime; + this.in = requestSocket.getInputStream(); + this.requestOut = requestSocket.getOutputStream(); + + // We encode the response sent to the client with a multiplexer so + // we can send three streams (out / err / control) over one wire stream + // (requestOut). + StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut); + + // We'll be writing control messages (exit code + out of date message) + // to this control channel. + controlChannel = multiplexer.createControl(); + + // This is the outErr part of the multiplexed output. + requestOutErr = OutErr.create(multiplexer.createStdout(), + multiplexer.createStderr()); + // We hook up System.out / System.err to our IO object. Stuff written to + // System.out / System.err will show up on the user's screen, prefixed + // with "System.out "/"System.err ". + requestOutErr.addSystemOutErrAsSource(); + } + + public void writeExitStatus(int exitStatus) { + // Make sure to flush the output / error streams prior to writing the exit status. + // The client may stop reading that direction of the socket immediately upon reading the + // exit code. + flushOutErr(); + try { + controlChannel.write(("" + exitStatus + "\n").getBytes(UTF_8)); + controlChannel.flush(); + LOG.info("" + exitStatus); + } catch (IOException ignored) { + // This exception is historically ignored. + } + } + + private void flushOutErr() { + try { + requestOutErr.getOutputStream().flush(); + } catch (IOException e) { + printStack(e); + } + try { + requestOutErr.getErrorStream().flush(); + } catch (IOException e) { + printStack(e); + } + } + + public void shutdown() { + try { + requestOut.close(); + } catch (IOException e) { + printStack(e); + } + try { + in.close(); + } catch (IOException e) { + printStack(e); + } + try { + requestSocket.close(); + } catch (IOException e) { + printStack(e); + } + } + } + + /** + * Creates and returns a new RPC server. + * Use {@link RPCServer#serve()} to start the server. + * + * @param appCommand The application's ServerCommand implementation. + * @param serverDirectory The directory for server-related files. The caller + * must ensure the directory has been created. + * @param workspaceDir The workspace, used solely to ensure it persists. + * @param maxIdleSeconds The idle time in seconds after which the rpc + * server will die unless it receives a request. + */ + public static RPCServer newServerWith(Clock clock, + ServerCommand appCommand, + Path serverDirectory, + Path workspaceDir, + int maxIdleSeconds) + throws IOException { + if (!serverDirectory.exists()) { + serverDirectory.createDirectory(); + } + + // Creates and starts the RPC server. + RPCService service = new RPCService(appCommand); + + return new RPCServer(clock, service, maxIdleSeconds * 1000L, + serverDirectory, workspaceDir); + } + +} diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCService.java b/src/main/java/com/google/devtools/build/lib/server/RPCService.java new file mode 100644 index 0000000000..379e83ce2f --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/RPCService.java @@ -0,0 +1,95 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import com.google.devtools.build.lib.util.io.OutErr; + +import java.util.List; +import java.util.logging.Logger; + +/** + * An RPCService is a Java object that can process RPC requests. Requests may + * be of the form: + * <pre> + * blaze <blaze-arguments> + * </pre> + * Requests are delegated to the ServerCommand instance provided + * to the constructor. + */ +public final class RPCService { + + private boolean isShutdown; + private static final Logger LOG = Logger.getLogger(RPCService.class.getName()); + private final ServerCommand appCommand; + + public RPCService(ServerCommand appCommand) { + this.appCommand = appCommand; + } + + /** + * The {@link #executeRequest(List, OutErr, long)} method may + * throw this exception if a command is unknown to the RPC service. + */ + public static class UnknownCommandException extends Exception { + private static final long serialVersionUID = 1L; + UnknownCommandException(String command) { + super("Unknown command: " + command); + } + } + + /** + * Executes the request; returns Unix like return codes (0 means success). May + * also throw arbitrary exceptions. + */ + public int executeRequest(List<String> request, + OutErr outErr, + long firstContactTime) throws Exception { + if (isShutdown) { + throw new IllegalStateException("Received request after shutdown."); + } + String command = request.isEmpty() ? "" : request.get(0); + if (appCommand != null && command.equals("blaze")) { // an application request + int result = appCommand.exec(request.subList(1, request.size()), outErr, firstContactTime); + if (appCommand.shutdown()) { // an application shutdown request + shutdown(); + } + return result; + } else { + throw new UnknownCommandException(command); + } + } + + /** + * After executing this function, further requests will fail, and + * {@link #isShutdown()} will return true. + */ + public void shutdown() { + if (isShutdown) { + return; + } + LOG.info("RPC Service: shutting down ..."); + isShutdown = true; + } + + /** + * Has this service been shutdown. If so, any call to + * {@link #executeRequest(List, OutErr, long)} will result in an + * {@link IllegalStateException} + */ + public boolean isShutdown() { + return isShutdown; + } + +} diff --git a/src/main/java/com/google/devtools/build/lib/server/ServerCommand.java b/src/main/java/com/google/devtools/build/lib/server/ServerCommand.java new file mode 100644 index 0000000000..972753c924 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/ServerCommand.java @@ -0,0 +1,38 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.server; + +import com.google.devtools.build.lib.util.io.OutErr; + +import java.util.List; + +/** + * The {@link RPCServer} calls an arbitrary command implementing this + * interface. + */ +public interface ServerCommand { + + /** + * Executes the request, writing any output or error messages into err. + * Returns 0 on success; any other value or exception indicates an error. + */ + int exec(List<String> args, OutErr outErr, long firstContactTime) throws Exception; + + /** + * The implementation returns true from this method to initiate a shutdown. + * No further requests will be handled. + */ + boolean shutdown(); + +} diff --git a/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java b/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java new file mode 100644 index 0000000000..e5ab9300f7 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/ServerResponse.java @@ -0,0 +1,114 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import com.google.common.base.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.io.UnsupportedEncodingException; + +/** + * This class models a response from the {@link RPCServer}. This is a + * tuple of an error message and the exit status. The encoding of the response + * is extremely simple {@link #toString()}: + * + * <ul><li>Iff a message is present, the wire format is + * <pre>message + '\n' + exit code as string + '\n'</pre> + * </li> + * <li>Otherwise it's just the exit code as string + '\n'</li> + * </ul> + */ +final class ServerResponse { + + /** + * Parses an input string into a {@link ServerResponse} object. + */ + public static ServerResponse parseFrom(String input) { + if (input.charAt(input.length() - 1) != '\n') { + String msg = "Response must end with newline (" + input + ")"; + throw new IllegalArgumentException(msg); + } + int newlineAt = input.lastIndexOf('\n', input.length() - 2); + + final String exitStatusString; + final String errorMessage; + if (newlineAt == -1) { + errorMessage = ""; + exitStatusString = input.substring(0, input.length() - 1); + } else { + errorMessage = input.substring(0, newlineAt); + exitStatusString = input.substring(newlineAt + 1, input.length() - 1); + } + + return new ServerResponse(errorMessage, Integer.parseInt(exitStatusString)); + } + + /** + * Parses {@code bytes} into a {@link ServerResponse} instance, assuming + * Latin 1 encoding. + */ + public static ServerResponse parseFrom(byte[] bytes) { + try { + return parseFrom(new String(bytes, "ISO-8859-1")); + } catch (UnsupportedEncodingException e) { + throw new AssertionError(e); // Latin 1 is everywhere. + } + } + + /** + * Parses {@code bytes} into a {@link ServerResponse} instance, assuming + * Latin 1 encoding. + */ + public static ServerResponse parseFrom(ByteArrayOutputStream bytes) { + return parseFrom(bytes.toByteArray()); + } + + private final String errorMessage; + private final int exitStatus; + + /** + * Construct a new instance given an error message and an exit status. + */ + public ServerResponse(String errorMessage, int exitStatus) { + Preconditions.checkNotNull(errorMessage); + this.errorMessage = errorMessage; + this.exitStatus = exitStatus; + } + + /** + * The wire representation of this response object. + */ + @Override + public String toString() { + if (errorMessage.length() == 0) { + return Integer.toString(exitStatus) + '\n'; + } + return errorMessage + '\n' + Integer.toString(exitStatus) + '\n'; + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof ServerResponse)) return false; + ServerResponse otherResponse = (ServerResponse) other; + return exitStatus == otherResponse.exitStatus + && errorMessage.equals(otherResponse.errorMessage); + } + + @Override + public int hashCode() { + return exitStatus * 31 ^ errorMessage.hashCode(); + } + +} diff --git a/src/main/java/com/google/devtools/build/lib/server/signal/InterruptSignalHandler.java b/src/main/java/com/google/devtools/build/lib/server/signal/InterruptSignalHandler.java new file mode 100644 index 0000000000..521dcef3b7 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/signal/InterruptSignalHandler.java @@ -0,0 +1,58 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.server.signal; + + +import com.google.common.base.Preconditions; + +import sun.misc.Signal; +import sun.misc.SignalHandler; + +/** + * A facade around sun.misc.Signal providing special-purpose SIGINT handling. + * + * We use this code in preference to using sun.misc directly since the latter + * is deprecated, and depending on it causes the jdk1.6 javac to emit an + * unsuppressable warning that sun.misc is "Sun proprietary API and may be + * removed in a future release". + */ +public abstract class InterruptSignalHandler implements Runnable { + + private static final Signal SIGINT = new Signal("INT"); + + private SignalHandler oldHandler; + + /** + * Constructs an InterruptSignalHandler instance. Until the uninstall() + * method is invoked, the delivery of a SIGINT signal to this process will + * cause the run() method to be invoked in another thread. + */ + protected InterruptSignalHandler() { + this.oldHandler = Signal.handle(SIGINT, new SignalHandler() { + @Override + public void handle(Signal signal) { + run(); + } + }); + } + + /** + * Disables SIGINT handling. + */ + public synchronized final void uninstall() { + Preconditions.checkNotNull(oldHandler, "uninstall() already called"); + Signal.handle(SIGINT, oldHandler); + oldHandler = null; + } +} |