aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-04-19 07:21:19 +0000
committerGravatar Damien Martin-Guillerez <dmarting@google.com>2016-04-19 09:48:11 +0000
commitbb0dac7ea6447f6676ac14f2e5b0833fb6958655 (patch)
tree6828d24ca2d6a25262bdcff049800f3e105953f9 /src
parente5f125bf1d799e4553b973e005d65d91bcb9567f (diff)
Do not start the AF_UNIX server when in gRPC mode.
Work towards #930. With this, it's conceivable that server mode works on Windows to some degree (I haven't tried, though, because there are many issues that need to be fixed) -- MOS_MIGRATED_REVID=120202037
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java54
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java554
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServer.java41
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java79
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/RPCServer.java567
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java (renamed from src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java)6
6 files changed, 639 insertions, 662 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
index 2fe7c6acac..9d612693d4 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
@@ -63,7 +63,7 @@ import com.google.devtools.build.lib.runtime.commands.ShutdownCommand;
import com.google.devtools.build.lib.runtime.commands.TestCommand;
import com.google.devtools.build.lib.runtime.commands.VersionCommand;
import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy;
-import com.google.devtools.build.lib.server.GrpcServer;
+import com.google.devtools.build.lib.server.AfUnixServer;
import com.google.devtools.build.lib.server.RPCServer;
import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
import com.google.devtools.build.lib.skyframe.DiffAwareness;
@@ -131,16 +131,6 @@ import javax.annotation.Nullable;
* <p>The parts specific to the current command are stored in {@link CommandEnvironment}.
*/
public final class BlazeRuntime {
- private static class BlazeServer {
- private final RPCServer afUnixServer;
- private final GrpcServer grpcServer;
-
- private BlazeServer(RPCServer afUnixServer, GrpcServer grpcServer) {
- this.afUnixServer = afUnixServer;
- this.grpcServer = grpcServer;
- }
- }
-
private static final Pattern suppressFromLog = Pattern.compile(".*(auth|pass|cookie).*",
Pattern.CASE_INSENSITIVE);
@@ -845,16 +835,8 @@ public final class BlazeRuntime {
*/
private static int serverMain(Iterable<BlazeModule> modules, OutErr outErr, String[] args) {
try {
- BlazeServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
- if (blazeServer.grpcServer != null) {
- blazeServer.grpcServer.serve();
- }
-
- // TODO(lberki): Make this call non-blocking and terminate the two servers at the same time
- blazeServer.afUnixServer.serve();
- if (blazeServer.grpcServer != null) {
- blazeServer.grpcServer.serve();
- }
+ RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
+ blazeServer.serve();
return ExitCode.SUCCESS.getNumericExitCode();
} catch (OptionsParsingException e) {
outErr.printErr(e.getMessage());
@@ -880,34 +862,34 @@ public final class BlazeRuntime {
/**
* Creates and returns a new Blaze RPCServer. Call {@link RPCServer#serve()} to start the server.
*/
- private static BlazeServer createBlazeRPCServer(Iterable<BlazeModule> modules, List<String> args)
+ private static RPCServer createBlazeRPCServer(
+ Iterable<BlazeModule> modules, List<String> args)
throws IOException, OptionsParsingException, AbruptExitException {
OptionsProvider options = parseOptions(modules, args);
BlazeServerStartupOptions startupOptions = options.getOptions(BlazeServerStartupOptions.class);
- final BlazeRuntime runtime = newRuntime(modules, options);
- final BlazeCommandDispatcher dispatcher = new BlazeCommandDispatcher(runtime);
-
+ BlazeRuntime runtime = newRuntime(modules, options);
+ BlazeCommandDispatcher dispatcher = new BlazeCommandDispatcher(runtime);
CommandExecutor commandExecutor = new CommandExecutor(runtime, dispatcher);
- RPCServer afUnixServer = RPCServer.newServerWith(runtime.getClock(), commandExecutor,
- runtime.getServerDirectory(), runtime.workspace.getWorkspace(),
- startupOptions.maxIdleSeconds);
- GrpcServer grpcServer = null;
+
+
if (startupOptions.grpcPort != -1) {
try {
- // We don't want to directly depend on this class so that we don't need gRPC for
- // bootstrapping, so we instantiate it using a factory class and reflection
+ // This is necessary so that Bazel kind of works during bootstrapping, at which time the
+ // gRPC server is not compiled in so that we don't need gRPC for bootstrapping.
Class<?> factoryClass = Class.forName(
"com.google.devtools.build.lib.server.GrpcServerImpl$Factory");
- GrpcServer.Factory factory = (GrpcServer.Factory) factoryClass.newInstance();
- grpcServer = factory.create(commandExecutor, runtime.getClock(),
- startupOptions.grpcPort, startupOptions.outputBase.getPathString());
+ RPCServer.Factory factory = (RPCServer.Factory) factoryClass.newInstance();
+ return factory.create(commandExecutor, runtime.getClock(),
+ startupOptions.grpcPort, runtime.getServerDirectory());
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR);
}
+ } else {
+ return AfUnixServer.newServerWith(runtime.getClock(), commandExecutor,
+ runtime.getServerDirectory(), runtime.workspace.getWorkspace(),
+ startupOptions.maxIdleSeconds);
}
-
- return new BlazeServer(afUnixServer, grpcServer);
}
private static Function<String, String> sourceFunctionForMap(final Map<String, String> map) {
diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
new file mode 100644
index 0000000000..26473e2323
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
@@ -0,0 +1,554 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.server.RPCService.UnknownCommandException;
+import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
+import com.google.devtools.build.lib.unix.LocalClientSocket;
+import com.google.devtools.build.lib.unix.LocalServerSocket;
+import com.google.devtools.build.lib.unix.LocalSocketAddress;
+import com.google.devtools.build.lib.unix.NativePosixFiles;
+import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.lib.util.ThreadUtils;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.util.io.StreamMultiplexer;
+import com.google.devtools.build.lib.vfs.Path;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+/**
+ * An RPCServer server is a Java object that sits and waits for RPC requests
+ * (the sit-and-wait is implemented in {@link #serve()}). These requests
+ * arrive via UNIX file sockets. The RPCServer then calls the application
+ * (which implements ServerCommand) to handle the request. (Since the Blaze
+ * server may need to stat hundreds of directories during initialization, this
+ * is a significant speedup.) The server thread will terminate after idling
+ * for a user-specified time.
+ *
+ * Note: If you are contemplating to call into the RPCServer from
+ * within Java, consider using the {@link RPCService} class instead.
+ */
+// TODO(bazel-team): Signal handling.
+// TODO(bazel-team): Gives clients status information when the server is busy. One
+// way to do this is to put the server status in a file (pid, the current
+// target, etc) in the server directory. Alternatively, we can have a separate
+// thread taking care of the server socket and put the information into socket
+// handshakes.
+// TODO(bazel-team): Use Reporter for server-side messages.
+public final class AfUnixServer extends RPCServer {
+ private final Clock clock;
+ private final RPCService rpcService;
+ private final LocalServerSocket serverSocket;
+ private final long maxIdleMillis;
+ private final long statusCheckMillis;
+ private final Path serverDirectory;
+ private final Path workspaceDir;
+ private static final Logger LOG = Logger.getLogger(AfUnixServer.class.getName());
+ private volatile boolean lameDuck;
+
+ private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute.
+ private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0');
+
+ /**
+ * Create a new server instance. After creating the server, you can start it
+ * by calling the {@link #serve()} method.
+ *
+ * @param clock The clock to take time measurements
+ * @param rpcService The underlying service object, which takes
+ * care of dispatching to the {@link ServerCommand}
+ * instances, as requests arrive.
+ * @param maxIdleMillis The maximum time the server will wait idly.
+ * @param statusCheckPeriodMillis How long to wait between system status checks.
+ * @param serverDirectory Directory to put file socket and pid files, etc.
+ * @param workspaceDir The workspace. Used solely to ensure it persists.
+ * @throws IOException
+ */
+ public AfUnixServer(Clock clock, RPCService rpcService,
+ long maxIdleMillis, long statusCheckPeriodMillis,
+ Path serverDirectory, Path workspaceDir)
+ throws IOException {
+ super(serverDirectory);
+ this.clock = clock;
+ this.rpcService = rpcService;
+ this.maxIdleMillis = maxIdleMillis;
+ this.statusCheckMillis = statusCheckPeriodMillis;
+ this.serverDirectory = serverDirectory;
+ this.workspaceDir = workspaceDir;
+
+ this.serverSocket = openServerSocket();
+ serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis));
+ lameDuck = false;
+ }
+
+ /**
+ * Create a new server instance. After creating the server, you can start it
+ * by calling the {@link #serve()} method.
+ *
+ * @param clock The clock to take time measurements
+ * @param rpcService The underlying service object, which takes
+ * care of dispatching to the {@link ServerCommand}
+ * instances, as requests arrive.
+ * @param maxIdleMillis The maximum time the server will wait idly.
+ * @param serverDirectory Directory to put file socket and pid files, etc.
+ * @param workspaceDir The workspace. Used solely to ensure it persists.
+ * @throws IOException
+ */
+ public AfUnixServer(Clock clock, RPCService rpcService,
+ long maxIdleMillis, Path serverDirectory, Path workspaceDir)
+ throws IOException {
+ this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS,
+ serverDirectory, workspaceDir);
+ }
+
+ /**
+ * Wait on a socket for business (answer requests). Note that this
+ * method won't return until the server shuts down.
+ */
+ @Override
+ public void serve() {
+ // Register the signal handler.
+ final AtomicBoolean inAction = new AtomicBoolean(false);
+ final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
+ final AtomicLong cmdNum = new AtomicLong();
+ final Thread mainThread = Thread.currentThread();
+ final Object interruptLock = new Object();
+
+ InterruptSignalHandler sigintHandler =
+ new InterruptSignalHandler() {
+ @Override
+ protected void onSignal() {
+ LOG.severe("User interrupt");
+
+ // Only interrupt during actions - otherwise we may end up setting the interrupt bit
+ // at the end of a build and responding to it at the beginning of the subsequent build.
+ synchronized (interruptLock) {
+ if (allowingInterrupt.get()) {
+ mainThread.interrupt();
+ }
+ }
+
+ if (inAction.get()) {
+ Runnable interruptWatcher =
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long originalCmd = cmdNum.get();
+ Thread.sleep(10 * 1000);
+ if (inAction.get() && cmdNum.get() == originalCmd) {
+ // We're still operating on the same command.
+ // Interrupt took too long.
+ ThreadUtils.warnAboutSlowInterrupt();
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ };
+ Thread interruptWatcherThread =
+ new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ }
+ }
+ };
+
+ try {
+ while (!lameDuck) {
+ try {
+ IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir);
+ idleChecker.idle();
+ RequestIo requestIo;
+
+ long startTime = clock.currentTimeMillis();
+ while (true) {
+ try {
+ allowingInterrupt.set(true);
+ Socket socket = serverSocket.accept();
+ long firstContactTime = clock.currentTimeMillis();
+ requestIo = new RequestIo(socket, firstContactTime);
+ break;
+ } catch (SocketTimeoutException e) {
+ long idleTime = clock.currentTimeMillis() - startTime;
+ if (lameDuck) {
+ closeServerSocket();
+ return;
+ } else if (idleTime > maxIdleMillis
+ || (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) {
+ enterLameDuck();
+ }
+ }
+ }
+ idleChecker.busy();
+
+
+ List<String> request = null;
+ try {
+ request = extractRequest(requestIo);
+ cmdNum.incrementAndGet();
+ inAction.set(true);
+ if (request != null) {
+ executeRequest(request, requestIo);
+ }
+ } finally {
+ inAction.set(false);
+ // Don't reset interruption unless we executed a request. Otherwise this is just a
+ // ping from the client verifying our existence, in which case we should retain the
+ // interrupt status for the subsequent request.
+ if (request != null) {
+ synchronized (interruptLock) {
+ allowingInterrupt.set(false);
+ Thread.interrupted(); // clears thread interrupted status
+ }
+ }
+ requestIo.shutdown();
+ if (rpcService.isShutdown()) {
+ return;
+ }
+ }
+ } catch (EOFException e) {
+ LOG.info("Connection to the client lost: "
+ + e.getMessage());
+ } catch (IOException e) {
+ // Something else happened. Print a stack trace for debugging.
+ printStack(e);
+ }
+ }
+ } finally {
+ rpcService.shutdown();
+ LOG.info("Logging finished");
+ sigintHandler.uninstall();
+ }
+ }
+
+ private void closeServerSocket() {
+ LOG.info("Closing serverSocket.");
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+
+ if (!lameDuck) {
+ try {
+ getSocketPath().delete();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+ }
+
+ /**
+ * Allow one last request to be serviced.
+ */
+ private void enterLameDuck() {
+ lameDuck = true;
+ try {
+ getSocketPath().delete();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ serverSocket.setSoTimeout(1);
+ }
+
+ /**
+ * Returns the path of the socket file to be used.
+ */
+ public Path getSocketPath() {
+ return serverDirectory.getRelative("server.socket");
+ }
+
+ /**
+ * Ensures no other server is running for the current socket file. This
+ * guarantees that no two servers are running against the same output
+ * directory.
+ *
+ * @throws IOException if another server holds the lock for the socket file.
+ */
+ public static void ensureExclusiveAccess(Path socketFile) throws IOException {
+ LocalSocketAddress address =
+ new LocalSocketAddress(socketFile.getPathFile());
+ if (socketFile.exists()) {
+ try {
+ new LocalClientSocket(address).close();
+ } catch (IOException e) {
+ // The previous server process is dead--unlink the file:
+ socketFile.delete();
+ return;
+ }
+ // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message
+ // and add it to the message.
+ throw new IOException("Socket file " + socketFile.getPathString()
+ + " is locked by another server");
+ }
+ }
+
+ /**
+ * Opens a UNIX local server socket.
+ * @throws IOException if the socket file is used by another server or can
+ * not be made exclusive.
+ */
+ private LocalServerSocket openServerSocket() throws IOException {
+ // This is the "well known" socket path via which the server is found...
+ Path socketFile = getSocketPath();
+
+ // ...but it may have a name that's too long for AF_UNIX, in which case we
+ // make it a symlink to /tmp/something. This typically only happens in
+ // tests where the --output_base is beneath a very deep temp dir.
+ // (All this extra complexity is just used in tests... *sigh*).
+ if (socketFile.toString().length() >= 104) { // = UNIX_PATH_MAX
+ Path socketLink = socketFile;
+ String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", "/tmp");
+ socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)).
+ getRelative("server.socket");
+ LOG.info("Using symlinked socket at " + socketFile);
+
+ socketLink.delete(); // Remove stale symlink, if any.
+ socketLink.createSymbolicLink(socketFile);
+
+ deleteAtExit(socketLink, /*deleteParent=*/false);
+ deleteAtExit(socketFile, /*deleteParent=*/true);
+ } else {
+ deleteAtExit(socketFile, /*deleteParent=*/false);
+ }
+
+ ensureExclusiveAccess(socketFile);
+
+
+ LocalServerSocket serverSocket = new LocalServerSocket();
+ serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile()));
+ NativePosixFiles.chmod(socketFile.getPathFile(), 0600); // Lock it down.
+ serverSocket.listen(/*backlog=*/50);
+ return serverSocket;
+ }
+
+ // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a
+ // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it
+ // succeeds.
+ private static Path createTempSocketDirectory(Path tempDir) {
+ Random random = new Random();
+ while (true) {
+ Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt()));
+ try {
+ if (socketDir.createDirectory()) {
+ // Make sure it's private; unfortunately, createDirectory() doesn't take a mode
+ // argument.
+ socketDir.chmod(0700);
+ return socketDir; // Created.
+ }
+ // Already existed; try again.
+ } catch (IOException e) {
+ // Failed; try again.
+ }
+ }
+ }
+
+ /**
+ * Read a string in platform default encoding and split it into a list of
+ * NUL-separated words.
+ *
+ * <p>Blaze consistently uses the platform default encoding (defined in
+ * blaze.cc) to interface with Unix APIs.
+ */
+ private static List<String> readRequest(InputStream input) throws IOException {
+ byte[] sizeBuffer = new byte[4];
+ ByteStreams.readFully(input, sizeBuffer);
+ int size = ((sizeBuffer[0] & 0xff) << 24)
+ + ((sizeBuffer[1] & 0xff) << 16)
+ + ((sizeBuffer[2] & 0xff) << 8)
+ + (sizeBuffer[3] & 0xff);
+ byte[] inputBytes = new byte[size];
+ ByteStreams.readFully(input, inputBytes);
+
+ String s = new String(inputBytes, Charset.defaultCharset());
+ return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s));
+ }
+
+ private static List<String> extractRequest(RequestIo requestIo) throws IOException {
+ List<String> request = readRequest(requestIo.in);
+ if (request == null) {
+ LOG.info("Short-circuiting empty request");
+ return null;
+ }
+ return request;
+ }
+
+ private void executeRequest(List<String> request, RequestIo requestIo) {
+ Preconditions.checkNotNull(request);
+ int exitStatus = 2;
+ try {
+ exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr,
+ requestIo.firstContactTime);
+ LOG.info("Finished executing request");
+ } catch (UnknownCommandException e) {
+ requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage());
+ LOG.severe("SERVER ERROR: " + e.getMessage());
+ } catch (Exception e) {
+ // Stacktrace for unknown exception.
+ StringWriter trace = new StringWriter();
+ e.printStackTrace(new PrintWriter(trace, true));
+ requestIo.requestOutErr.printErr("SERVER ERROR: " + trace);
+ LOG.severe("SERVER ERROR: " + trace);
+ }
+
+ if (rpcService.isShutdown()) {
+ // In case of shutdown, disable the listening socket *before* we write
+ // the last part of the response. Otherwise, a sufficiently fast client
+ // could read the response and exit, and a new client could make a
+ // connection to this server, which is still in the listening state, even
+ // though it is about to shut down imminently.
+ closeServerSocket();
+ }
+
+ requestIo.writeExitStatus(exitStatus);
+ }
+
+ /**
+ * Because it's a little complicated, this class factors out all the IO Hook
+ * up we need per request, that is, in
+ * {@link AfUnixServer#executeRequest(List, RequestIo)}.
+ * It's unfortunately complicated, so it's explained here.
+ */
+ private static class RequestIo {
+
+ // Used by the client code
+ private final InputStream in;
+ private final OutErr requestOutErr;
+ private final OutputStream controlChannel;
+
+ // just used by this class to keep the state around
+ private final Socket requestSocket;
+ private final OutputStream requestOut;
+ private final long firstContactTime;
+
+ RequestIo(Socket requestSocket, long firstContactTime) throws IOException {
+ this.requestSocket = requestSocket;
+ this.firstContactTime = firstContactTime;
+ this.in = requestSocket.getInputStream();
+ this.requestOut = requestSocket.getOutputStream();
+
+ // We encode the response sent to the client with a multiplexer so
+ // we can send three streams (out / err / control) over one wire stream
+ // (requestOut).
+ StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut);
+
+ // We'll be writing control messages (exit code + out of date message)
+ // to this control channel.
+ controlChannel = multiplexer.createControl();
+
+ // This is the outErr part of the multiplexed output.
+ requestOutErr = OutErr.create(multiplexer.createStdout(),
+ multiplexer.createStderr());
+ // We hook up System.out / System.err to our IO object. Stuff written to
+ // System.out / System.err will show up on the user's screen, prefixed
+ // with "System.out "/"System.err ".
+ requestOutErr.addSystemOutErrAsSource();
+ }
+
+ public void writeExitStatus(int exitStatus) {
+ // Make sure to flush the output / error streams prior to writing the exit status.
+ // The client may stop reading that direction of the socket immediately upon reading the
+ // exit code.
+ flushOutErr();
+ try {
+ controlChannel.write((exitStatus >> 24) & 0xff);
+ controlChannel.write((exitStatus >> 16) & 0xff);
+ controlChannel.write((exitStatus >> 8) & 0xff);
+ controlChannel.write(exitStatus & 0xff);
+ controlChannel.flush();
+ LOG.info("" + exitStatus);
+ } catch (IOException ignored) {
+ // This exception is historically ignored.
+ }
+ }
+
+ private void flushOutErr() {
+ try {
+ requestOutErr.getOutputStream().flush();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ requestOutErr.getErrorStream().flush();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+
+ public void shutdown() {
+ try {
+ requestOut.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ in.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ requestSocket.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+ }
+
+ /**
+ * Creates and returns a new RPC server.
+ * Use {@link AfUnixServer#serve()} to start the server.
+ *
+ * @param appCommand The application's ServerCommand implementation.
+ * @param serverDirectory The directory for server-related files. The caller
+ * must ensure the directory has been created.
+ * @param workspaceDir The workspace, used solely to ensure it persists.
+ * @param maxIdleSeconds The idle time in seconds after which the rpc
+ * server will die unless it receives a request.
+ */
+ public static AfUnixServer newServerWith(Clock clock,
+ ServerCommand appCommand,
+ Path serverDirectory,
+ Path workspaceDir,
+ int maxIdleSeconds)
+ throws IOException {
+ if (!serverDirectory.exists()) {
+ serverDirectory.createDirectory();
+ }
+
+ // Creates and starts the RPC server.
+ RPCService service = new RPCService(appCommand);
+
+ return new AfUnixServer(clock, service, maxIdleSeconds * 1000L,
+ serverDirectory, workspaceDir);
+ }
+
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java
deleted file mode 100644
index d93d044b14..0000000000
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.server;
-
-import com.google.devtools.build.lib.runtime.CommandExecutor;
-import com.google.devtools.build.lib.util.Clock;
-
-import java.io.IOException;
-
-/**
- * Interface for the gRPC server.
- *
- * <p>This is necessary so that Bazel kind of works during bootstrapping, at which time the
- * gRPC server is not compiled on so that we don't need gRPC for bootstrapping.
- */
-public interface GrpcServer {
-
- /**
- * Factory class.
- *
- * Present so that we don't need to invoke a constructor with multiple arguments by reflection.
- */
- interface Factory {
- GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port,
- String outputBase);
- }
-
- void serve() throws IOException;
- void terminate();
-}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 4349d83062..143589ce2c 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -23,21 +23,19 @@ import com.google.devtools.build.lib.util.Clock;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
-import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
/**
@@ -46,15 +44,15 @@ import java.security.SecureRandom;
* <p>Only this class should depend on gRPC so that we only need to exclude this during
* bootstrapping.
*/
-public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer {
+public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer {
/**
* Factory class. Instantiated by reflection.
*/
- public static class Factory implements GrpcServer.Factory {
+ public static class Factory implements RPCServer.Factory {
@Override
- public GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port,
- String outputBase) {
- return new GrpcServerImpl(commandExecutor, clock, port, outputBase);
+ public RPCServer create(CommandExecutor commandExecutor, Clock clock, int port,
+ Path serverDirectory) throws IOException {
+ return new GrpcServerImpl(commandExecutor, clock, port, serverDirectory);
}
}
@@ -96,14 +94,14 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
}
}
- // These paths are all relative to the output base
- private static final String PORT_FILE = "server/grpc_port";
- private static final String REQUEST_COOKIE_FILE = "server/request_cookie";
- private static final String RESPONSE_COOKIE_FILE = "server/response_cookie";
+ // These paths are all relative to the server directory
+ private static final String PORT_FILE = "grpc_port";
+ private static final String REQUEST_COOKIE_FILE = "request_cookie";
+ private static final String RESPONSE_COOKIE_FILE = "response_cookie";
private final CommandExecutor commandExecutor;
private final Clock clock;
- private final String outputBase;
+ private final Path serverDirectory;
private final String requestCookie;
private final String responseCookie;
@@ -112,10 +110,11 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
boolean serving;
public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
- String outputBase) {
+ Path serverDirectory) throws IOException {
+ super(serverDirectory);
this.commandExecutor = commandExecutor;
this.clock = clock;
- this.outputBase = outputBase;
+ this.serverDirectory = serverDirectory;
this.port = port;
this.serving = false;
@@ -135,6 +134,7 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
return result.toString();
}
+ @Override
public void serve() throws IOException {
Preconditions.checkState(!serving);
server = ServerBuilder.forPort(port)
@@ -148,17 +148,22 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
port = getActualServerPort();
}
- writeFile(PORT_FILE, Integer.toString(port));
- writeFile(REQUEST_COOKIE_FILE, requestCookie);
- writeFile(RESPONSE_COOKIE_FILE, responseCookie);
+ writeServerFile(PORT_FILE, Integer.toString(port));
+ writeServerFile(REQUEST_COOKIE_FILE, requestCookie);
+ writeServerFile(RESPONSE_COOKIE_FILE, responseCookie);
+ try {
+ server.awaitTermination();
+ } catch (InterruptedException e) {
+ // TODO(lberki): Handle SIGINT in a reasonable way
+ throw new IllegalStateException(e);
+ }
}
- private void writeFile(String path, String contents) throws IOException {
- OutputStreamWriter writer = new OutputStreamWriter(
- new FileOutputStream(new File(outputBase + "/" + path)), StandardCharsets.UTF_8);
- writer.write(contents);
- writer.close();
+ private void writeServerFile(String name, String contents) throws IOException {
+ Path file = serverDirectory.getChild(name);
+ FileSystemUtils.writeContentAsLatin1(file, contents);
+ deleteAtExit(file, false);
}
/**
@@ -197,28 +202,6 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
return instance;
}
- public void terminate() {
- server.shutdownNow();
- // This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same
- // amount of code as implementing it ourselves.
- boolean interrupted = false;
- try {
- while (true) {
- try {
- server.awaitTermination();
- serving = false;
- return;
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
@Override
public void run(
RunRequest request, StreamObserver<RunResponse> observer) {
@@ -245,6 +228,10 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ
observer.onNext(response);
observer.onCompleted();
+
+ if (commandExecutor.shutdown()) {
+ server.shutdownNow();
+ }
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
index ab07009d62..fc6b83ffc2 100644
--- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
@@ -11,325 +11,53 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-
package com.google.devtools.build.lib.server;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.ByteStreams;
-import com.google.devtools.build.lib.server.RPCService.UnknownCommandException;
-import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
-import com.google.devtools.build.lib.unix.LocalClientSocket;
-import com.google.devtools.build.lib.unix.LocalServerSocket;
-import com.google.devtools.build.lib.unix.LocalSocketAddress;
-import com.google.devtools.build.lib.unix.NativePosixFiles;
+import com.google.devtools.build.lib.runtime.CommandExecutor;
import com.google.devtools.build.lib.util.Clock;
import com.google.devtools.build.lib.util.OsUtils;
-import com.google.devtools.build.lib.util.ThreadUtils;
-import com.google.devtools.build.lib.util.io.OutErr;
-import com.google.devtools.build.lib.util.io.StreamMultiplexer;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
-import java.io.EOFException;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
/**
- * An RPCServer server is a Java object that sits and waits for RPC requests
- * (the sit-and-wait is implemented in {@link #serve()}). These requests
- * arrive via UNIX file sockets. The RPCServer then calls the application
- * (which implements ServerCommand) to handle the request. (Since the Blaze
- * server may need to stat hundreds of directories during initialization, this
- * is a significant speedup.) The server thread will terminate after idling
- * for a user-specified time.
- *
- * Note: If you are contemplating to call into the RPCServer from
- * within Java, consider using the {@link RPCService} class instead.
+ * A server instance. Can either an AF_UNIX or a gRPC one.
*/
-// TODO(bazel-team): Signal handling.
-// TODO(bazel-team): Gives clients status information when the server is busy. One
-// way to do this is to put the server status in a file (pid, the current
-// target, etc) in the server directory. Alternatively, we can have a separate
-// thread taking care of the server socket and put the information into socket
-// handshakes.
-// TODO(bazel-team): Use Reporter for server-side messages.
-public final class RPCServer {
-
- private final Clock clock;
- private final RPCService rpcService;
- private final LocalServerSocket serverSocket;
- private final long maxIdleMillis;
- private final long statusCheckMillis;
- private final Path serverDirectory;
- private final Path workspaceDir;
+public abstract class RPCServer {
private static final Logger LOG = Logger.getLogger(RPCServer.class.getName());
- private volatile boolean lameDuck;
-
- private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute.
- private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0');
-
- /**
- * Create a new server instance. After creating the server, you can start it
- * by calling the {@link #serve()} method.
- *
- * @param clock The clock to take time measurements
- * @param rpcService The underlying service object, which takes
- * care of dispatching to the {@link ServerCommand}
- * instances, as requests arrive.
- * @param maxIdleMillis The maximum time the server will wait idly.
- * @param statusCheckPeriodMillis How long to wait between system status checks.
- * @param serverDirectory Directory to put file socket and pid files, etc.
- * @param workspaceDir The workspace. Used solely to ensure it persists.
- * @throws IOException
- */
- public RPCServer(Clock clock, RPCService rpcService,
- long maxIdleMillis, long statusCheckPeriodMillis,
- Path serverDirectory, Path workspaceDir)
- throws IOException {
- this.clock = clock;
- this.rpcService = rpcService;
- this.maxIdleMillis = maxIdleMillis;
- this.statusCheckMillis = statusCheckPeriodMillis;
- this.serverDirectory = serverDirectory;
- this.workspaceDir = workspaceDir;
-
- this.serverSocket = openServerSocket();
- serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis));
- lameDuck = false;
- }
/**
- * Create a new server instance. After creating the server, you can start it
- * by calling the {@link #serve()} method.
+ * Factory class for the gRPC server.
*
- * @param clock The clock to take time measurements
- * @param rpcService The underlying service object, which takes
- * care of dispatching to the {@link ServerCommand}
- * instances, as requests arrive.
- * @param maxIdleMillis The maximum time the server will wait idly.
- * @param serverDirectory Directory to put file socket and pid files, etc.
- * @param workspaceDir The workspace. Used solely to ensure it persists.
- * @throws IOException
- */
- public RPCServer(Clock clock, RPCService rpcService,
- long maxIdleMillis, Path serverDirectory, Path workspaceDir)
- throws IOException {
- this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS,
- serverDirectory, workspaceDir);
- }
-
- private static void printStack(IOException e) {
- /*
- * Hopefully this never happens. It's not very nice to just write this
- * to the user's console, but I'm not sure what better choice we have.
- */
- StringWriter err = new StringWriter();
- PrintWriter printErr = new PrintWriter(err);
- printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]=======");
- e.printStackTrace(printErr);
- printErr.println("=====================================================");
- LOG.severe(err.toString());
- }
-
- /**
- * Wait on a socket for business (answer requests). Note that this
- * method won't return until the server shuts down.
+ * Present so that we don't need to invoke a constructor with multiple arguments by reflection.
*/
- public void serve() {
- // Register the signal handler.
- final AtomicBoolean inAction = new AtomicBoolean(false);
- final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
- final AtomicLong cmdNum = new AtomicLong();
- final Thread mainThread = Thread.currentThread();
- final Object interruptLock = new Object();
-
- InterruptSignalHandler sigintHandler =
- new InterruptSignalHandler() {
- @Override
- protected void onSignal() {
- LOG.severe("User interrupt");
-
- // Only interrupt during actions - otherwise we may end up setting the interrupt bit
- // at the end of a build and responding to it at the beginning of the subsequent build.
- synchronized (interruptLock) {
- if (allowingInterrupt.get()) {
- mainThread.interrupt();
- }
- }
-
- if (inAction.get()) {
- Runnable interruptWatcher =
- new Runnable() {
- @Override
- public void run() {
- try {
- long originalCmd = cmdNum.get();
- Thread.sleep(10 * 1000);
- if (inAction.get() && cmdNum.get() == originalCmd) {
- // We're still operating on the same command.
- // Interrupt took too long.
- ThreadUtils.warnAboutSlowInterrupt();
- }
- } catch (InterruptedException e) {
- // Ignore.
- }
- }
- };
- Thread interruptWatcherThread =
- new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
- interruptWatcherThread.setDaemon(true);
- interruptWatcherThread.start();
- }
- }
- };
-
- try {
- while (!lameDuck) {
- try {
- IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir);
- idleChecker.idle();
- RequestIo requestIo;
-
- long startTime = clock.currentTimeMillis();
- while (true) {
- try {
- allowingInterrupt.set(true);
- Socket socket = serverSocket.accept();
- long firstContactTime = clock.currentTimeMillis();
- requestIo = new RequestIo(socket, firstContactTime);
- break;
- } catch (SocketTimeoutException e) {
- long idleTime = clock.currentTimeMillis() - startTime;
- if (lameDuck) {
- closeServerSocket();
- return;
- } else if (idleTime > maxIdleMillis ||
- (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) {
- enterLameDuck();
- }
- }
- }
- idleChecker.busy();
-
-
- List<String> request = null;
- try {
- request = extractRequest(requestIo);
- cmdNum.incrementAndGet();
- inAction.set(true);
- if (request != null) {
- executeRequest(request, requestIo);
- }
- } finally {
- inAction.set(false);
- // Don't reset interruption unless we executed a request. Otherwise this is just a
- // ping from the client verifying our existence, in which case we should retain the
- // interrupt status for the subsequent request.
- if (request != null) {
- synchronized (interruptLock) {
- allowingInterrupt.set(false);
- Thread.interrupted(); // clears thread interrupted status
- }
- }
- requestIo.shutdown();
- if (rpcService.isShutdown()) {
- return;
- }
- }
- } catch (EOFException e) {
- LOG.info("Connection to the client lost: "
- + e.getMessage());
- } catch (IOException e) {
- // Something else happened. Print a stack trace for debugging.
- printStack(e);
- }
- }
- } finally {
- rpcService.shutdown();
- LOG.info("Logging finished");
- sigintHandler.uninstall();
- }
+ public interface Factory {
+ RPCServer create(CommandExecutor commandExecutor, Clock clock, int port, Path serverDirectory)
+ throws IOException;
}
- private void closeServerSocket() {
- LOG.info("Closing serverSocket.");
- try {
- serverSocket.close();
- } catch (IOException e) {
- printStack(e);
- }
-
- if (!lameDuck) {
- try {
- getSocketPath().delete();
- } catch (IOException e) {
- printStack(e);
- }
- }
- }
-
- /**
- * Allow one last request to be serviced.
- */
- private void enterLameDuck() {
- lameDuck = true;
+ protected RPCServer(Path serverDirectory) throws IOException {
+ // We create the server.pid file strictly before binding the socket.
+ // The client only accesses the pid file after connecting to the socket
+ // which ensures that it gets the correct pid value.
+ Path pidFile = serverDirectory.getRelative("server.pid");
+ RPCServer.deleteAtExit(pidFile, /*deleteParent=*/ false);
try {
- getSocketPath().delete();
+ pidFile.delete();
} catch (IOException e) {
- e.printStackTrace();
- }
- serverSocket.setSoTimeout(1);
- }
-
- /**
- * Returns the path of the socket file to be used.
- */
- public Path getSocketPath() {
- return serverDirectory.getRelative("server.socket");
- }
-
- /**
- * Ensures no other server is running for the current socket file. This
- * guarantees that no two servers are running against the same output
- * directory.
- *
- * @throws IOException if another server holds the lock for the socket file.
- */
- public static void ensureExclusiveAccess(Path socketFile) throws IOException {
- LocalSocketAddress address =
- new LocalSocketAddress(socketFile.getPathFile());
- if (socketFile.exists()) {
- try {
- new LocalClientSocket(address).close();
- } catch (IOException e) {
- // The previous server process is dead--unlink the file:
- socketFile.delete();
- return;
- }
- // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message
- // and add it to the message.
- throw new IOException("Socket file " + socketFile.getPathString()
- + " is locked by another server");
+ // Ignore.
}
+ pidFile.createSymbolicLink(new PathFragment(String.valueOf(OsUtils.getpid())));
}
/**
* Schedule the specified file for (attempted) deletion at JVM exit.
*/
- private static void deleteAtExit(final Path socketFile, final boolean deleteParent) {
+ protected static void deleteAtExit(final Path socketFile, final boolean deleteParent) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
@@ -345,254 +73,21 @@ public final class RPCServer {
});
}
- /**
- * Opens a UNIX local server socket.
- * @throws IOException if the socket file is used by another server or can
- * not be made exclusive.
- */
- private LocalServerSocket openServerSocket() throws IOException {
- // This is the "well known" socket path via which the server is found...
- Path socketFile = getSocketPath();
-
- // ...but it may have a name that's too long for AF_UNIX, in which case we
- // make it a symlink to /tmp/something. This typically only happens in
- // tests where the --output_base is beneath a very deep temp dir.
- // (All this extra complexity is just used in tests... *sigh*).
- if (socketFile.toString().length() >= 104) { // = UNIX_PATH_MAX
- Path socketLink = socketFile;
- String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", "/tmp");
- socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)).
- getRelative("server.socket");
- LOG.info("Using symlinked socket at " + socketFile);
-
- socketLink.delete(); // Remove stale symlink, if any.
- socketLink.createSymbolicLink(socketFile);
-
- deleteAtExit(socketLink, /*deleteParent=*/false);
- deleteAtExit(socketFile, /*deleteParent=*/true);
- } else {
- deleteAtExit(socketFile, /*deleteParent=*/false);
- }
-
- ensureExclusiveAccess(socketFile);
-
- // We create the server.pid file strictly before binding the socket.
- // The client only accesses the pid file after connecting to the socket
- // which ensures that it gets the correct pid value.
- Path pidFile = serverDirectory.getRelative("server.pid");
- deleteAtExit(pidFile, /*deleteParent=*/ false);
- try {
- pidFile.delete();
- } catch (IOException e) {
- // Ignore.
- }
- pidFile.createSymbolicLink(new PathFragment(String.valueOf(OsUtils.getpid())));
-
- LocalServerSocket serverSocket = new LocalServerSocket();
- serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile()));
- NativePosixFiles.chmod(socketFile.getPathFile(), 0600); // Lock it down.
- serverSocket.listen(/*backlog=*/50);
- return serverSocket;
- }
-
- // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a
- // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it
- // succeeds.
- private static Path createTempSocketDirectory(Path tempDir) {
- Random random = new Random();
- while (true) {
- Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt()));
- try {
- if (socketDir.createDirectory()) {
- // Make sure it's private; unfortunately, createDirectory() doesn't take a mode
- // argument.
- socketDir.chmod(0700);
- return socketDir; // Created.
- }
- // Already existed; try again.
- } catch (IOException e) {
- // Failed; try again.
- }
- }
- }
-
- /**
- * Read a string in platform default encoding and split it into a list of
- * NUL-separated words.
- *
- * <p>Blaze consistently uses the platform default encoding (defined in
- * blaze.cc) to interface with Unix APIs.
- */
- private static List<String> readRequest(InputStream input) throws IOException {
- byte[] sizeBuffer = new byte[4];
- ByteStreams.readFully(input, sizeBuffer);
- int size = ((sizeBuffer[0] & 0xff) << 24)
- + ((sizeBuffer[1] & 0xff) << 16)
- + ((sizeBuffer[2] & 0xff) << 8)
- + (sizeBuffer[3] & 0xff);
- byte[] inputBytes = new byte[size];
- ByteStreams.readFully(input, inputBytes);
-
- String s = new String(inputBytes, Charset.defaultCharset());
- return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s));
- }
-
- private static List<String> extractRequest(RequestIo requestIo) throws IOException {
- List<String> request = readRequest(requestIo.in);
- if (request == null) {
- LOG.info("Short-circuiting empty request");
- return null;
- }
- return request;
- }
-
- private void executeRequest(List<String> request, RequestIo requestIo) {
- Preconditions.checkNotNull(request);
- int exitStatus = 2;
- try {
- exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr,
- requestIo.firstContactTime);
- LOG.info("Finished executing request");
- } catch (UnknownCommandException e) {
- requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage());
- LOG.severe("SERVER ERROR: " + e.getMessage());
- } catch (Exception e) {
- // Stacktrace for unknown exception.
- StringWriter trace = new StringWriter();
- e.printStackTrace(new PrintWriter(trace, true));
- requestIo.requestOutErr.printErr("SERVER ERROR: " + trace);
- LOG.severe("SERVER ERROR: " + trace);
- }
-
- if (rpcService.isShutdown()) {
- // In case of shutdown, disable the listening socket *before* we write
- // the last part of the response. Otherwise, a sufficiently fast client
- // could read the response and exit, and a new client could make a
- // connection to this server, which is still in the listening state, even
- // though it is about to shut down imminently.
- closeServerSocket();
- }
-
- requestIo.writeExitStatus(exitStatus);
- }
-
- /**
- * Because it's a little complicated, this class factors out all the IO Hook
- * up we need per request, that is, in
- * {@link RPCServer#executeRequest(List, RequestIo)}.
- * It's unfortunately complicated, so it's explained here.
- */
- private static class RequestIo {
-
- // Used by the client code
- private final InputStream in;
- private final OutErr requestOutErr;
- private final OutputStream controlChannel;
-
- // just used by this class to keep the state around
- private final Socket requestSocket;
- private final OutputStream requestOut;
- private final long firstContactTime;
-
- RequestIo(Socket requestSocket, long firstContactTime) throws IOException {
- this.requestSocket = requestSocket;
- this.firstContactTime = firstContactTime;
- this.in = requestSocket.getInputStream();
- this.requestOut = requestSocket.getOutputStream();
-
- // We encode the response sent to the client with a multiplexer so
- // we can send three streams (out / err / control) over one wire stream
- // (requestOut).
- StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut);
-
- // We'll be writing control messages (exit code + out of date message)
- // to this control channel.
- controlChannel = multiplexer.createControl();
-
- // This is the outErr part of the multiplexed output.
- requestOutErr = OutErr.create(multiplexer.createStdout(),
- multiplexer.createStderr());
- // We hook up System.out / System.err to our IO object. Stuff written to
- // System.out / System.err will show up on the user's screen, prefixed
- // with "System.out "/"System.err ".
- requestOutErr.addSystemOutErrAsSource();
- }
-
- public void writeExitStatus(int exitStatus) {
- // Make sure to flush the output / error streams prior to writing the exit status.
- // The client may stop reading that direction of the socket immediately upon reading the
- // exit code.
- flushOutErr();
- try {
- controlChannel.write((exitStatus >> 24) & 0xff);
- controlChannel.write((exitStatus >> 16) & 0xff);
- controlChannel.write((exitStatus >> 8) & 0xff);
- controlChannel.write(exitStatus & 0xff);
- controlChannel.flush();
- LOG.info("" + exitStatus);
- } catch (IOException ignored) {
- // This exception is historically ignored.
- }
- }
-
- private void flushOutErr() {
- try {
- requestOutErr.getOutputStream().flush();
- } catch (IOException e) {
- printStack(e);
- }
- try {
- requestOutErr.getErrorStream().flush();
- } catch (IOException e) {
- printStack(e);
- }
- }
-
- public void shutdown() {
- try {
- requestOut.close();
- } catch (IOException e) {
- printStack(e);
- }
- try {
- in.close();
- } catch (IOException e) {
- printStack(e);
- }
- try {
- requestSocket.close();
- } catch (IOException e) {
- printStack(e);
- }
- }
+ static void printStack(IOException e) {
+ /*
+ * Hopefully this never happens. It's not very nice to just write this
+ * to the user's console, but I'm not sure what better choice we have.
+ */
+ StringWriter err = new StringWriter();
+ PrintWriter printErr = new PrintWriter(err);
+ printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]=======");
+ e.printStackTrace(printErr);
+ printErr.println("=====================================================");
+ LOG.severe(err.toString());
}
/**
- * Creates and returns a new RPC server.
- * Use {@link RPCServer#serve()} to start the server.
- *
- * @param appCommand The application's ServerCommand implementation.
- * @param serverDirectory The directory for server-related files. The caller
- * must ensure the directory has been created.
- * @param workspaceDir The workspace, used solely to ensure it persists.
- * @param maxIdleSeconds The idle time in seconds after which the rpc
- * server will die unless it receives a request.
+ * Start serving and block until the a shutdown command is received.
*/
- public static RPCServer newServerWith(Clock clock,
- ServerCommand appCommand,
- Path serverDirectory,
- Path workspaceDir,
- int maxIdleSeconds)
- throws IOException {
- if (!serverDirectory.exists()) {
- serverDirectory.createDirectory();
- }
-
- // Creates and starts the RPC server.
- RPCService service = new RPCService(appCommand);
-
- return new RPCServer(clock, service, maxIdleSeconds * 1000L,
- serverDirectory, workspaceDir);
- }
-
+ public abstract void serve() throws IOException;
}
diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java
index f81a0384f3..9e9c7af3e9 100644
--- a/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java
@@ -42,14 +42,14 @@ import java.util.List;
*/
@TestSpec(size = Suite.MEDIUM_TESTS)
@RunWith(JUnit4.class)
-public class RPCServerTest {
+public class AfUnixServerTest {
private static final long MAX_IDLE_MILLIS = 10000;
private static final long HEALTH_CHECK_MILLIS = 1000 * 3;
private static final String COMMAND_STDOUT = "Heelllloo....";
private static final String COMMAND_STDERR = "...world!";
- private RPCServer server;
+ private AfUnixServer server;
private FsApparatus scratch = FsApparatus.newNative();
private RecordingOutErr outErr = new RecordingOutErr();
private Path serverDir;
@@ -89,7 +89,7 @@ public class RPCServerTest {
client = new RPCTestingClient(
outErr, serverDir.getRelative("server.socket"));
RPCService service = new RPCService(helloWorldCommand);
- server = new RPCServer(new JavaClock(), service, MAX_IDLE_MILLIS, HEALTH_CHECK_MILLIS,
+ server = new AfUnixServer(new JavaClock(), service, MAX_IDLE_MILLIS, HEALTH_CHECK_MILLIS,
serverDir, workspaceDir);
serverThread.start();
}