From bb0dac7ea6447f6676ac14f2e5b0833fb6958655 Mon Sep 17 00:00:00 2001 From: Lukacs Berki Date: Tue, 19 Apr 2016 07:21:19 +0000 Subject: Do not start the AF_UNIX server when in gRPC mode. Work towards #930. With this, it's conceivable that server mode works on Windows to some degree (I haven't tried, though, because there are many issues that need to be fixed) -- MOS_MIGRATED_REVID=120202037 --- .../devtools/build/lib/runtime/BlazeRuntime.java | 54 +- .../devtools/build/lib/server/AfUnixServer.java | 554 ++++++++++++++++++++ .../devtools/build/lib/server/GrpcServer.java | 41 -- .../devtools/build/lib/server/GrpcServerImpl.java | 79 ++- .../devtools/build/lib/server/RPCServer.java | 567 ++------------------- .../build/lib/server/AfUnixServerTest.java | 132 +++++ .../devtools/build/lib/server/RPCServerTest.java | 132 ----- 7 files changed, 768 insertions(+), 791 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java delete mode 100644 src/main/java/com/google/devtools/build/lib/server/GrpcServer.java create mode 100644 src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java delete mode 100644 src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java (limited to 'src') diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java index 2fe7c6acac..9d612693d4 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java @@ -63,7 +63,7 @@ import com.google.devtools.build.lib.runtime.commands.ShutdownCommand; import com.google.devtools.build.lib.runtime.commands.TestCommand; import com.google.devtools.build.lib.runtime.commands.VersionCommand; import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy; -import com.google.devtools.build.lib.server.GrpcServer; +import com.google.devtools.build.lib.server.AfUnixServer; import com.google.devtools.build.lib.server.RPCServer; import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; import com.google.devtools.build.lib.skyframe.DiffAwareness; @@ -131,16 +131,6 @@ import javax.annotation.Nullable; *

The parts specific to the current command are stored in {@link CommandEnvironment}. */ public final class BlazeRuntime { - private static class BlazeServer { - private final RPCServer afUnixServer; - private final GrpcServer grpcServer; - - private BlazeServer(RPCServer afUnixServer, GrpcServer grpcServer) { - this.afUnixServer = afUnixServer; - this.grpcServer = grpcServer; - } - } - private static final Pattern suppressFromLog = Pattern.compile(".*(auth|pass|cookie).*", Pattern.CASE_INSENSITIVE); @@ -845,16 +835,8 @@ public final class BlazeRuntime { */ private static int serverMain(Iterable modules, OutErr outErr, String[] args) { try { - BlazeServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args)); - if (blazeServer.grpcServer != null) { - blazeServer.grpcServer.serve(); - } - - // TODO(lberki): Make this call non-blocking and terminate the two servers at the same time - blazeServer.afUnixServer.serve(); - if (blazeServer.grpcServer != null) { - blazeServer.grpcServer.serve(); - } + RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args)); + blazeServer.serve(); return ExitCode.SUCCESS.getNumericExitCode(); } catch (OptionsParsingException e) { outErr.printErr(e.getMessage()); @@ -880,34 +862,34 @@ public final class BlazeRuntime { /** * Creates and returns a new Blaze RPCServer. Call {@link RPCServer#serve()} to start the server. */ - private static BlazeServer createBlazeRPCServer(Iterable modules, List args) + private static RPCServer createBlazeRPCServer( + Iterable modules, List args) throws IOException, OptionsParsingException, AbruptExitException { OptionsProvider options = parseOptions(modules, args); BlazeServerStartupOptions startupOptions = options.getOptions(BlazeServerStartupOptions.class); - final BlazeRuntime runtime = newRuntime(modules, options); - final BlazeCommandDispatcher dispatcher = new BlazeCommandDispatcher(runtime); - + BlazeRuntime runtime = newRuntime(modules, options); + BlazeCommandDispatcher dispatcher = new BlazeCommandDispatcher(runtime); CommandExecutor commandExecutor = new CommandExecutor(runtime, dispatcher); - RPCServer afUnixServer = RPCServer.newServerWith(runtime.getClock(), commandExecutor, - runtime.getServerDirectory(), runtime.workspace.getWorkspace(), - startupOptions.maxIdleSeconds); - GrpcServer grpcServer = null; + + if (startupOptions.grpcPort != -1) { try { - // We don't want to directly depend on this class so that we don't need gRPC for - // bootstrapping, so we instantiate it using a factory class and reflection + // This is necessary so that Bazel kind of works during bootstrapping, at which time the + // gRPC server is not compiled in so that we don't need gRPC for bootstrapping. Class factoryClass = Class.forName( "com.google.devtools.build.lib.server.GrpcServerImpl$Factory"); - GrpcServer.Factory factory = (GrpcServer.Factory) factoryClass.newInstance(); - grpcServer = factory.create(commandExecutor, runtime.getClock(), - startupOptions.grpcPort, startupOptions.outputBase.getPathString()); + RPCServer.Factory factory = (RPCServer.Factory) factoryClass.newInstance(); + return factory.create(commandExecutor, runtime.getClock(), + startupOptions.grpcPort, runtime.getServerDirectory()); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR); } + } else { + return AfUnixServer.newServerWith(runtime.getClock(), commandExecutor, + runtime.getServerDirectory(), runtime.workspace.getWorkspace(), + startupOptions.maxIdleSeconds); } - - return new BlazeServer(afUnixServer, grpcServer); } private static Function sourceFunctionForMap(final Map map) { diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java new file mode 100644 index 0000000000..26473e2323 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java @@ -0,0 +1,554 @@ +// Copyright 2014 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.server.RPCService.UnknownCommandException; +import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; +import com.google.devtools.build.lib.unix.LocalClientSocket; +import com.google.devtools.build.lib.unix.LocalServerSocket; +import com.google.devtools.build.lib.unix.LocalSocketAddress; +import com.google.devtools.build.lib.unix.NativePosixFiles; +import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.lib.util.ThreadUtils; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.util.io.StreamMultiplexer; +import com.google.devtools.build.lib.vfs.Path; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** + * An RPCServer server is a Java object that sits and waits for RPC requests + * (the sit-and-wait is implemented in {@link #serve()}). These requests + * arrive via UNIX file sockets. The RPCServer then calls the application + * (which implements ServerCommand) to handle the request. (Since the Blaze + * server may need to stat hundreds of directories during initialization, this + * is a significant speedup.) The server thread will terminate after idling + * for a user-specified time. + * + * Note: If you are contemplating to call into the RPCServer from + * within Java, consider using the {@link RPCService} class instead. + */ +// TODO(bazel-team): Signal handling. +// TODO(bazel-team): Gives clients status information when the server is busy. One +// way to do this is to put the server status in a file (pid, the current +// target, etc) in the server directory. Alternatively, we can have a separate +// thread taking care of the server socket and put the information into socket +// handshakes. +// TODO(bazel-team): Use Reporter for server-side messages. +public final class AfUnixServer extends RPCServer { + private final Clock clock; + private final RPCService rpcService; + private final LocalServerSocket serverSocket; + private final long maxIdleMillis; + private final long statusCheckMillis; + private final Path serverDirectory; + private final Path workspaceDir; + private static final Logger LOG = Logger.getLogger(AfUnixServer.class.getName()); + private volatile boolean lameDuck; + + private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute. + private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0'); + + /** + * Create a new server instance. After creating the server, you can start it + * by calling the {@link #serve()} method. + * + * @param clock The clock to take time measurements + * @param rpcService The underlying service object, which takes + * care of dispatching to the {@link ServerCommand} + * instances, as requests arrive. + * @param maxIdleMillis The maximum time the server will wait idly. + * @param statusCheckPeriodMillis How long to wait between system status checks. + * @param serverDirectory Directory to put file socket and pid files, etc. + * @param workspaceDir The workspace. Used solely to ensure it persists. + * @throws IOException + */ + public AfUnixServer(Clock clock, RPCService rpcService, + long maxIdleMillis, long statusCheckPeriodMillis, + Path serverDirectory, Path workspaceDir) + throws IOException { + super(serverDirectory); + this.clock = clock; + this.rpcService = rpcService; + this.maxIdleMillis = maxIdleMillis; + this.statusCheckMillis = statusCheckPeriodMillis; + this.serverDirectory = serverDirectory; + this.workspaceDir = workspaceDir; + + this.serverSocket = openServerSocket(); + serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis)); + lameDuck = false; + } + + /** + * Create a new server instance. After creating the server, you can start it + * by calling the {@link #serve()} method. + * + * @param clock The clock to take time measurements + * @param rpcService The underlying service object, which takes + * care of dispatching to the {@link ServerCommand} + * instances, as requests arrive. + * @param maxIdleMillis The maximum time the server will wait idly. + * @param serverDirectory Directory to put file socket and pid files, etc. + * @param workspaceDir The workspace. Used solely to ensure it persists. + * @throws IOException + */ + public AfUnixServer(Clock clock, RPCService rpcService, + long maxIdleMillis, Path serverDirectory, Path workspaceDir) + throws IOException { + this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS, + serverDirectory, workspaceDir); + } + + /** + * Wait on a socket for business (answer requests). Note that this + * method won't return until the server shuts down. + */ + @Override + public void serve() { + // Register the signal handler. + final AtomicBoolean inAction = new AtomicBoolean(false); + final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); + final AtomicLong cmdNum = new AtomicLong(); + final Thread mainThread = Thread.currentThread(); + final Object interruptLock = new Object(); + + InterruptSignalHandler sigintHandler = + new InterruptSignalHandler() { + @Override + protected void onSignal() { + LOG.severe("User interrupt"); + + // Only interrupt during actions - otherwise we may end up setting the interrupt bit + // at the end of a build and responding to it at the beginning of the subsequent build. + synchronized (interruptLock) { + if (allowingInterrupt.get()) { + mainThread.interrupt(); + } + } + + if (inAction.get()) { + Runnable interruptWatcher = + new Runnable() { + @Override + public void run() { + try { + long originalCmd = cmdNum.get(); + Thread.sleep(10 * 1000); + if (inAction.get() && cmdNum.get() == originalCmd) { + // We're still operating on the same command. + // Interrupt took too long. + ThreadUtils.warnAboutSlowInterrupt(); + } + } catch (InterruptedException e) { + // Ignore. + } + } + }; + Thread interruptWatcherThread = + new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + } + } + }; + + try { + while (!lameDuck) { + try { + IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir); + idleChecker.idle(); + RequestIo requestIo; + + long startTime = clock.currentTimeMillis(); + while (true) { + try { + allowingInterrupt.set(true); + Socket socket = serverSocket.accept(); + long firstContactTime = clock.currentTimeMillis(); + requestIo = new RequestIo(socket, firstContactTime); + break; + } catch (SocketTimeoutException e) { + long idleTime = clock.currentTimeMillis() - startTime; + if (lameDuck) { + closeServerSocket(); + return; + } else if (idleTime > maxIdleMillis + || (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) { + enterLameDuck(); + } + } + } + idleChecker.busy(); + + + List request = null; + try { + request = extractRequest(requestIo); + cmdNum.incrementAndGet(); + inAction.set(true); + if (request != null) { + executeRequest(request, requestIo); + } + } finally { + inAction.set(false); + // Don't reset interruption unless we executed a request. Otherwise this is just a + // ping from the client verifying our existence, in which case we should retain the + // interrupt status for the subsequent request. + if (request != null) { + synchronized (interruptLock) { + allowingInterrupt.set(false); + Thread.interrupted(); // clears thread interrupted status + } + } + requestIo.shutdown(); + if (rpcService.isShutdown()) { + return; + } + } + } catch (EOFException e) { + LOG.info("Connection to the client lost: " + + e.getMessage()); + } catch (IOException e) { + // Something else happened. Print a stack trace for debugging. + printStack(e); + } + } + } finally { + rpcService.shutdown(); + LOG.info("Logging finished"); + sigintHandler.uninstall(); + } + } + + private void closeServerSocket() { + LOG.info("Closing serverSocket."); + try { + serverSocket.close(); + } catch (IOException e) { + printStack(e); + } + + if (!lameDuck) { + try { + getSocketPath().delete(); + } catch (IOException e) { + printStack(e); + } + } + } + + /** + * Allow one last request to be serviced. + */ + private void enterLameDuck() { + lameDuck = true; + try { + getSocketPath().delete(); + } catch (IOException e) { + e.printStackTrace(); + } + serverSocket.setSoTimeout(1); + } + + /** + * Returns the path of the socket file to be used. + */ + public Path getSocketPath() { + return serverDirectory.getRelative("server.socket"); + } + + /** + * Ensures no other server is running for the current socket file. This + * guarantees that no two servers are running against the same output + * directory. + * + * @throws IOException if another server holds the lock for the socket file. + */ + public static void ensureExclusiveAccess(Path socketFile) throws IOException { + LocalSocketAddress address = + new LocalSocketAddress(socketFile.getPathFile()); + if (socketFile.exists()) { + try { + new LocalClientSocket(address).close(); + } catch (IOException e) { + // The previous server process is dead--unlink the file: + socketFile.delete(); + return; + } + // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message + // and add it to the message. + throw new IOException("Socket file " + socketFile.getPathString() + + " is locked by another server"); + } + } + + /** + * Opens a UNIX local server socket. + * @throws IOException if the socket file is used by another server or can + * not be made exclusive. + */ + private LocalServerSocket openServerSocket() throws IOException { + // This is the "well known" socket path via which the server is found... + Path socketFile = getSocketPath(); + + // ...but it may have a name that's too long for AF_UNIX, in which case we + // make it a symlink to /tmp/something. This typically only happens in + // tests where the --output_base is beneath a very deep temp dir. + // (All this extra complexity is just used in tests... *sigh*). + if (socketFile.toString().length() >= 104) { // = UNIX_PATH_MAX + Path socketLink = socketFile; + String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", "/tmp"); + socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)). + getRelative("server.socket"); + LOG.info("Using symlinked socket at " + socketFile); + + socketLink.delete(); // Remove stale symlink, if any. + socketLink.createSymbolicLink(socketFile); + + deleteAtExit(socketLink, /*deleteParent=*/false); + deleteAtExit(socketFile, /*deleteParent=*/true); + } else { + deleteAtExit(socketFile, /*deleteParent=*/false); + } + + ensureExclusiveAccess(socketFile); + + + LocalServerSocket serverSocket = new LocalServerSocket(); + serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile())); + NativePosixFiles.chmod(socketFile.getPathFile(), 0600); // Lock it down. + serverSocket.listen(/*backlog=*/50); + return serverSocket; + } + + // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a + // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it + // succeeds. + private static Path createTempSocketDirectory(Path tempDir) { + Random random = new Random(); + while (true) { + Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt())); + try { + if (socketDir.createDirectory()) { + // Make sure it's private; unfortunately, createDirectory() doesn't take a mode + // argument. + socketDir.chmod(0700); + return socketDir; // Created. + } + // Already existed; try again. + } catch (IOException e) { + // Failed; try again. + } + } + } + + /** + * Read a string in platform default encoding and split it into a list of + * NUL-separated words. + * + *

Blaze consistently uses the platform default encoding (defined in + * blaze.cc) to interface with Unix APIs. + */ + private static List readRequest(InputStream input) throws IOException { + byte[] sizeBuffer = new byte[4]; + ByteStreams.readFully(input, sizeBuffer); + int size = ((sizeBuffer[0] & 0xff) << 24) + + ((sizeBuffer[1] & 0xff) << 16) + + ((sizeBuffer[2] & 0xff) << 8) + + (sizeBuffer[3] & 0xff); + byte[] inputBytes = new byte[size]; + ByteStreams.readFully(input, inputBytes); + + String s = new String(inputBytes, Charset.defaultCharset()); + return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s)); + } + + private static List extractRequest(RequestIo requestIo) throws IOException { + List request = readRequest(requestIo.in); + if (request == null) { + LOG.info("Short-circuiting empty request"); + return null; + } + return request; + } + + private void executeRequest(List request, RequestIo requestIo) { + Preconditions.checkNotNull(request); + int exitStatus = 2; + try { + exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr, + requestIo.firstContactTime); + LOG.info("Finished executing request"); + } catch (UnknownCommandException e) { + requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage()); + LOG.severe("SERVER ERROR: " + e.getMessage()); + } catch (Exception e) { + // Stacktrace for unknown exception. + StringWriter trace = new StringWriter(); + e.printStackTrace(new PrintWriter(trace, true)); + requestIo.requestOutErr.printErr("SERVER ERROR: " + trace); + LOG.severe("SERVER ERROR: " + trace); + } + + if (rpcService.isShutdown()) { + // In case of shutdown, disable the listening socket *before* we write + // the last part of the response. Otherwise, a sufficiently fast client + // could read the response and exit, and a new client could make a + // connection to this server, which is still in the listening state, even + // though it is about to shut down imminently. + closeServerSocket(); + } + + requestIo.writeExitStatus(exitStatus); + } + + /** + * Because it's a little complicated, this class factors out all the IO Hook + * up we need per request, that is, in + * {@link AfUnixServer#executeRequest(List, RequestIo)}. + * It's unfortunately complicated, so it's explained here. + */ + private static class RequestIo { + + // Used by the client code + private final InputStream in; + private final OutErr requestOutErr; + private final OutputStream controlChannel; + + // just used by this class to keep the state around + private final Socket requestSocket; + private final OutputStream requestOut; + private final long firstContactTime; + + RequestIo(Socket requestSocket, long firstContactTime) throws IOException { + this.requestSocket = requestSocket; + this.firstContactTime = firstContactTime; + this.in = requestSocket.getInputStream(); + this.requestOut = requestSocket.getOutputStream(); + + // We encode the response sent to the client with a multiplexer so + // we can send three streams (out / err / control) over one wire stream + // (requestOut). + StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut); + + // We'll be writing control messages (exit code + out of date message) + // to this control channel. + controlChannel = multiplexer.createControl(); + + // This is the outErr part of the multiplexed output. + requestOutErr = OutErr.create(multiplexer.createStdout(), + multiplexer.createStderr()); + // We hook up System.out / System.err to our IO object. Stuff written to + // System.out / System.err will show up on the user's screen, prefixed + // with "System.out "/"System.err ". + requestOutErr.addSystemOutErrAsSource(); + } + + public void writeExitStatus(int exitStatus) { + // Make sure to flush the output / error streams prior to writing the exit status. + // The client may stop reading that direction of the socket immediately upon reading the + // exit code. + flushOutErr(); + try { + controlChannel.write((exitStatus >> 24) & 0xff); + controlChannel.write((exitStatus >> 16) & 0xff); + controlChannel.write((exitStatus >> 8) & 0xff); + controlChannel.write(exitStatus & 0xff); + controlChannel.flush(); + LOG.info("" + exitStatus); + } catch (IOException ignored) { + // This exception is historically ignored. + } + } + + private void flushOutErr() { + try { + requestOutErr.getOutputStream().flush(); + } catch (IOException e) { + printStack(e); + } + try { + requestOutErr.getErrorStream().flush(); + } catch (IOException e) { + printStack(e); + } + } + + public void shutdown() { + try { + requestOut.close(); + } catch (IOException e) { + printStack(e); + } + try { + in.close(); + } catch (IOException e) { + printStack(e); + } + try { + requestSocket.close(); + } catch (IOException e) { + printStack(e); + } + } + } + + /** + * Creates and returns a new RPC server. + * Use {@link AfUnixServer#serve()} to start the server. + * + * @param appCommand The application's ServerCommand implementation. + * @param serverDirectory The directory for server-related files. The caller + * must ensure the directory has been created. + * @param workspaceDir The workspace, used solely to ensure it persists. + * @param maxIdleSeconds The idle time in seconds after which the rpc + * server will die unless it receives a request. + */ + public static AfUnixServer newServerWith(Clock clock, + ServerCommand appCommand, + Path serverDirectory, + Path workspaceDir, + int maxIdleSeconds) + throws IOException { + if (!serverDirectory.exists()) { + serverDirectory.createDirectory(); + } + + // Creates and starts the RPC server. + RPCService service = new RPCService(appCommand); + + return new AfUnixServer(clock, service, maxIdleSeconds * 1000L, + serverDirectory, workspaceDir); + } + +} diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java deleted file mode 100644 index d93d044b14..0000000000 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServer.java +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.server; - -import com.google.devtools.build.lib.runtime.CommandExecutor; -import com.google.devtools.build.lib.util.Clock; - -import java.io.IOException; - -/** - * Interface for the gRPC server. - * - *

This is necessary so that Bazel kind of works during bootstrapping, at which time the - * gRPC server is not compiled on so that we don't need gRPC for bootstrapping. - */ -public interface GrpcServer { - - /** - * Factory class. - * - * Present so that we don't need to invoke a constructor with multiple arguments by reflection. - */ - interface Factory { - GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port, - String outputBase); - } - - void serve() throws IOException; - void terminate(); -} diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 4349d83062..143589ce2c 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -23,21 +23,19 @@ import com.google.devtools.build.lib.util.Clock; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.io.OutputStreamWriter; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; -import java.nio.charset.StandardCharsets; import java.security.SecureRandom; /** @@ -46,15 +44,15 @@ import java.security.SecureRandom; *

Only this class should depend on gRPC so that we only need to exclude this during * bootstrapping. */ -public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServer { +public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer { /** * Factory class. Instantiated by reflection. */ - public static class Factory implements GrpcServer.Factory { + public static class Factory implements RPCServer.Factory { @Override - public GrpcServer create(CommandExecutor commandExecutor, Clock clock, int port, - String outputBase) { - return new GrpcServerImpl(commandExecutor, clock, port, outputBase); + public RPCServer create(CommandExecutor commandExecutor, Clock clock, int port, + Path serverDirectory) throws IOException { + return new GrpcServerImpl(commandExecutor, clock, port, serverDirectory); } } @@ -96,14 +94,14 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ } } - // These paths are all relative to the output base - private static final String PORT_FILE = "server/grpc_port"; - private static final String REQUEST_COOKIE_FILE = "server/request_cookie"; - private static final String RESPONSE_COOKIE_FILE = "server/response_cookie"; + // These paths are all relative to the server directory + private static final String PORT_FILE = "grpc_port"; + private static final String REQUEST_COOKIE_FILE = "request_cookie"; + private static final String RESPONSE_COOKIE_FILE = "response_cookie"; private final CommandExecutor commandExecutor; private final Clock clock; - private final String outputBase; + private final Path serverDirectory; private final String requestCookie; private final String responseCookie; @@ -112,10 +110,11 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ boolean serving; public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, - String outputBase) { + Path serverDirectory) throws IOException { + super(serverDirectory); this.commandExecutor = commandExecutor; this.clock = clock; - this.outputBase = outputBase; + this.serverDirectory = serverDirectory; this.port = port; this.serving = false; @@ -135,6 +134,7 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ return result.toString(); } + @Override public void serve() throws IOException { Preconditions.checkState(!serving); server = ServerBuilder.forPort(port) @@ -148,17 +148,22 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ port = getActualServerPort(); } - writeFile(PORT_FILE, Integer.toString(port)); - writeFile(REQUEST_COOKIE_FILE, requestCookie); - writeFile(RESPONSE_COOKIE_FILE, responseCookie); + writeServerFile(PORT_FILE, Integer.toString(port)); + writeServerFile(REQUEST_COOKIE_FILE, requestCookie); + writeServerFile(RESPONSE_COOKIE_FILE, responseCookie); + try { + server.awaitTermination(); + } catch (InterruptedException e) { + // TODO(lberki): Handle SIGINT in a reasonable way + throw new IllegalStateException(e); + } } - private void writeFile(String path, String contents) throws IOException { - OutputStreamWriter writer = new OutputStreamWriter( - new FileOutputStream(new File(outputBase + "/" + path)), StandardCharsets.UTF_8); - writer.write(contents); - writer.close(); + private void writeServerFile(String name, String contents) throws IOException { + Path file = serverDirectory.getChild(name); + FileSystemUtils.writeContentAsLatin1(file, contents); + deleteAtExit(file, false); } /** @@ -197,28 +202,6 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ return instance; } - public void terminate() { - server.shutdownNow(); - // This is Uninterruptibles#callUninterruptibly. Calling that method properly is about the same - // amount of code as implementing it ourselves. - boolean interrupted = false; - try { - while (true) { - try { - server.awaitTermination(); - serving = false; - return; - } catch (InterruptedException e) { - interrupted = true; - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - @Override public void run( RunRequest request, StreamObserver observer) { @@ -245,6 +228,10 @@ public class GrpcServerImpl implements CommandServerGrpc.CommandServer, GrpcServ observer.onNext(response); observer.onCompleted(); + + if (commandExecutor.shutdown()) { + server.shutdownNow(); + } } @Override diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java index ab07009d62..fc6b83ffc2 100644 --- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java @@ -11,325 +11,53 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package com.google.devtools.build.lib.server; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.io.ByteStreams; -import com.google.devtools.build.lib.server.RPCService.UnknownCommandException; -import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; -import com.google.devtools.build.lib.unix.LocalClientSocket; -import com.google.devtools.build.lib.unix.LocalServerSocket; -import com.google.devtools.build.lib.unix.LocalSocketAddress; -import com.google.devtools.build.lib.unix.NativePosixFiles; +import com.google.devtools.build.lib.runtime.CommandExecutor; import com.google.devtools.build.lib.util.Clock; import com.google.devtools.build.lib.util.OsUtils; -import com.google.devtools.build.lib.util.ThreadUtils; -import com.google.devtools.build.lib.util.io.OutErr; -import com.google.devtools.build.lib.util.io.StreamMultiplexer; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; -import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.io.PrintWriter; import java.io.StringWriter; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; /** - * An RPCServer server is a Java object that sits and waits for RPC requests - * (the sit-and-wait is implemented in {@link #serve()}). These requests - * arrive via UNIX file sockets. The RPCServer then calls the application - * (which implements ServerCommand) to handle the request. (Since the Blaze - * server may need to stat hundreds of directories during initialization, this - * is a significant speedup.) The server thread will terminate after idling - * for a user-specified time. - * - * Note: If you are contemplating to call into the RPCServer from - * within Java, consider using the {@link RPCService} class instead. + * A server instance. Can either an AF_UNIX or a gRPC one. */ -// TODO(bazel-team): Signal handling. -// TODO(bazel-team): Gives clients status information when the server is busy. One -// way to do this is to put the server status in a file (pid, the current -// target, etc) in the server directory. Alternatively, we can have a separate -// thread taking care of the server socket and put the information into socket -// handshakes. -// TODO(bazel-team): Use Reporter for server-side messages. -public final class RPCServer { - - private final Clock clock; - private final RPCService rpcService; - private final LocalServerSocket serverSocket; - private final long maxIdleMillis; - private final long statusCheckMillis; - private final Path serverDirectory; - private final Path workspaceDir; +public abstract class RPCServer { private static final Logger LOG = Logger.getLogger(RPCServer.class.getName()); - private volatile boolean lameDuck; - - private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute. - private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0'); - - /** - * Create a new server instance. After creating the server, you can start it - * by calling the {@link #serve()} method. - * - * @param clock The clock to take time measurements - * @param rpcService The underlying service object, which takes - * care of dispatching to the {@link ServerCommand} - * instances, as requests arrive. - * @param maxIdleMillis The maximum time the server will wait idly. - * @param statusCheckPeriodMillis How long to wait between system status checks. - * @param serverDirectory Directory to put file socket and pid files, etc. - * @param workspaceDir The workspace. Used solely to ensure it persists. - * @throws IOException - */ - public RPCServer(Clock clock, RPCService rpcService, - long maxIdleMillis, long statusCheckPeriodMillis, - Path serverDirectory, Path workspaceDir) - throws IOException { - this.clock = clock; - this.rpcService = rpcService; - this.maxIdleMillis = maxIdleMillis; - this.statusCheckMillis = statusCheckPeriodMillis; - this.serverDirectory = serverDirectory; - this.workspaceDir = workspaceDir; - - this.serverSocket = openServerSocket(); - serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis)); - lameDuck = false; - } /** - * Create a new server instance. After creating the server, you can start it - * by calling the {@link #serve()} method. + * Factory class for the gRPC server. * - * @param clock The clock to take time measurements - * @param rpcService The underlying service object, which takes - * care of dispatching to the {@link ServerCommand} - * instances, as requests arrive. - * @param maxIdleMillis The maximum time the server will wait idly. - * @param serverDirectory Directory to put file socket and pid files, etc. - * @param workspaceDir The workspace. Used solely to ensure it persists. - * @throws IOException - */ - public RPCServer(Clock clock, RPCService rpcService, - long maxIdleMillis, Path serverDirectory, Path workspaceDir) - throws IOException { - this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS, - serverDirectory, workspaceDir); - } - - private static void printStack(IOException e) { - /* - * Hopefully this never happens. It's not very nice to just write this - * to the user's console, but I'm not sure what better choice we have. - */ - StringWriter err = new StringWriter(); - PrintWriter printErr = new PrintWriter(err); - printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]======="); - e.printStackTrace(printErr); - printErr.println("====================================================="); - LOG.severe(err.toString()); - } - - /** - * Wait on a socket for business (answer requests). Note that this - * method won't return until the server shuts down. + * Present so that we don't need to invoke a constructor with multiple arguments by reflection. */ - public void serve() { - // Register the signal handler. - final AtomicBoolean inAction = new AtomicBoolean(false); - final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); - final AtomicLong cmdNum = new AtomicLong(); - final Thread mainThread = Thread.currentThread(); - final Object interruptLock = new Object(); - - InterruptSignalHandler sigintHandler = - new InterruptSignalHandler() { - @Override - protected void onSignal() { - LOG.severe("User interrupt"); - - // Only interrupt during actions - otherwise we may end up setting the interrupt bit - // at the end of a build and responding to it at the beginning of the subsequent build. - synchronized (interruptLock) { - if (allowingInterrupt.get()) { - mainThread.interrupt(); - } - } - - if (inAction.get()) { - Runnable interruptWatcher = - new Runnable() { - @Override - public void run() { - try { - long originalCmd = cmdNum.get(); - Thread.sleep(10 * 1000); - if (inAction.get() && cmdNum.get() == originalCmd) { - // We're still operating on the same command. - // Interrupt took too long. - ThreadUtils.warnAboutSlowInterrupt(); - } - } catch (InterruptedException e) { - // Ignore. - } - } - }; - Thread interruptWatcherThread = - new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); - interruptWatcherThread.setDaemon(true); - interruptWatcherThread.start(); - } - } - }; - - try { - while (!lameDuck) { - try { - IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir); - idleChecker.idle(); - RequestIo requestIo; - - long startTime = clock.currentTimeMillis(); - while (true) { - try { - allowingInterrupt.set(true); - Socket socket = serverSocket.accept(); - long firstContactTime = clock.currentTimeMillis(); - requestIo = new RequestIo(socket, firstContactTime); - break; - } catch (SocketTimeoutException e) { - long idleTime = clock.currentTimeMillis() - startTime; - if (lameDuck) { - closeServerSocket(); - return; - } else if (idleTime > maxIdleMillis || - (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) { - enterLameDuck(); - } - } - } - idleChecker.busy(); - - - List request = null; - try { - request = extractRequest(requestIo); - cmdNum.incrementAndGet(); - inAction.set(true); - if (request != null) { - executeRequest(request, requestIo); - } - } finally { - inAction.set(false); - // Don't reset interruption unless we executed a request. Otherwise this is just a - // ping from the client verifying our existence, in which case we should retain the - // interrupt status for the subsequent request. - if (request != null) { - synchronized (interruptLock) { - allowingInterrupt.set(false); - Thread.interrupted(); // clears thread interrupted status - } - } - requestIo.shutdown(); - if (rpcService.isShutdown()) { - return; - } - } - } catch (EOFException e) { - LOG.info("Connection to the client lost: " - + e.getMessage()); - } catch (IOException e) { - // Something else happened. Print a stack trace for debugging. - printStack(e); - } - } - } finally { - rpcService.shutdown(); - LOG.info("Logging finished"); - sigintHandler.uninstall(); - } + public interface Factory { + RPCServer create(CommandExecutor commandExecutor, Clock clock, int port, Path serverDirectory) + throws IOException; } - private void closeServerSocket() { - LOG.info("Closing serverSocket."); - try { - serverSocket.close(); - } catch (IOException e) { - printStack(e); - } - - if (!lameDuck) { - try { - getSocketPath().delete(); - } catch (IOException e) { - printStack(e); - } - } - } - - /** - * Allow one last request to be serviced. - */ - private void enterLameDuck() { - lameDuck = true; + protected RPCServer(Path serverDirectory) throws IOException { + // We create the server.pid file strictly before binding the socket. + // The client only accesses the pid file after connecting to the socket + // which ensures that it gets the correct pid value. + Path pidFile = serverDirectory.getRelative("server.pid"); + RPCServer.deleteAtExit(pidFile, /*deleteParent=*/ false); try { - getSocketPath().delete(); + pidFile.delete(); } catch (IOException e) { - e.printStackTrace(); - } - serverSocket.setSoTimeout(1); - } - - /** - * Returns the path of the socket file to be used. - */ - public Path getSocketPath() { - return serverDirectory.getRelative("server.socket"); - } - - /** - * Ensures no other server is running for the current socket file. This - * guarantees that no two servers are running against the same output - * directory. - * - * @throws IOException if another server holds the lock for the socket file. - */ - public static void ensureExclusiveAccess(Path socketFile) throws IOException { - LocalSocketAddress address = - new LocalSocketAddress(socketFile.getPathFile()); - if (socketFile.exists()) { - try { - new LocalClientSocket(address).close(); - } catch (IOException e) { - // The previous server process is dead--unlink the file: - socketFile.delete(); - return; - } - // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message - // and add it to the message. - throw new IOException("Socket file " + socketFile.getPathString() - + " is locked by another server"); + // Ignore. } + pidFile.createSymbolicLink(new PathFragment(String.valueOf(OsUtils.getpid()))); } /** * Schedule the specified file for (attempted) deletion at JVM exit. */ - private static void deleteAtExit(final Path socketFile, final boolean deleteParent) { + protected static void deleteAtExit(final Path socketFile, final boolean deleteParent) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { @@ -345,254 +73,21 @@ public final class RPCServer { }); } - /** - * Opens a UNIX local server socket. - * @throws IOException if the socket file is used by another server or can - * not be made exclusive. - */ - private LocalServerSocket openServerSocket() throws IOException { - // This is the "well known" socket path via which the server is found... - Path socketFile = getSocketPath(); - - // ...but it may have a name that's too long for AF_UNIX, in which case we - // make it a symlink to /tmp/something. This typically only happens in - // tests where the --output_base is beneath a very deep temp dir. - // (All this extra complexity is just used in tests... *sigh*). - if (socketFile.toString().length() >= 104) { // = UNIX_PATH_MAX - Path socketLink = socketFile; - String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", "/tmp"); - socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)). - getRelative("server.socket"); - LOG.info("Using symlinked socket at " + socketFile); - - socketLink.delete(); // Remove stale symlink, if any. - socketLink.createSymbolicLink(socketFile); - - deleteAtExit(socketLink, /*deleteParent=*/false); - deleteAtExit(socketFile, /*deleteParent=*/true); - } else { - deleteAtExit(socketFile, /*deleteParent=*/false); - } - - ensureExclusiveAccess(socketFile); - - // We create the server.pid file strictly before binding the socket. - // The client only accesses the pid file after connecting to the socket - // which ensures that it gets the correct pid value. - Path pidFile = serverDirectory.getRelative("server.pid"); - deleteAtExit(pidFile, /*deleteParent=*/ false); - try { - pidFile.delete(); - } catch (IOException e) { - // Ignore. - } - pidFile.createSymbolicLink(new PathFragment(String.valueOf(OsUtils.getpid()))); - - LocalServerSocket serverSocket = new LocalServerSocket(); - serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile())); - NativePosixFiles.chmod(socketFile.getPathFile(), 0600); // Lock it down. - serverSocket.listen(/*backlog=*/50); - return serverSocket; - } - - // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a - // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it - // succeeds. - private static Path createTempSocketDirectory(Path tempDir) { - Random random = new Random(); - while (true) { - Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt())); - try { - if (socketDir.createDirectory()) { - // Make sure it's private; unfortunately, createDirectory() doesn't take a mode - // argument. - socketDir.chmod(0700); - return socketDir; // Created. - } - // Already existed; try again. - } catch (IOException e) { - // Failed; try again. - } - } - } - - /** - * Read a string in platform default encoding and split it into a list of - * NUL-separated words. - * - *

Blaze consistently uses the platform default encoding (defined in - * blaze.cc) to interface with Unix APIs. - */ - private static List readRequest(InputStream input) throws IOException { - byte[] sizeBuffer = new byte[4]; - ByteStreams.readFully(input, sizeBuffer); - int size = ((sizeBuffer[0] & 0xff) << 24) - + ((sizeBuffer[1] & 0xff) << 16) - + ((sizeBuffer[2] & 0xff) << 8) - + (sizeBuffer[3] & 0xff); - byte[] inputBytes = new byte[size]; - ByteStreams.readFully(input, inputBytes); - - String s = new String(inputBytes, Charset.defaultCharset()); - return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s)); - } - - private static List extractRequest(RequestIo requestIo) throws IOException { - List request = readRequest(requestIo.in); - if (request == null) { - LOG.info("Short-circuiting empty request"); - return null; - } - return request; - } - - private void executeRequest(List request, RequestIo requestIo) { - Preconditions.checkNotNull(request); - int exitStatus = 2; - try { - exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr, - requestIo.firstContactTime); - LOG.info("Finished executing request"); - } catch (UnknownCommandException e) { - requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage()); - LOG.severe("SERVER ERROR: " + e.getMessage()); - } catch (Exception e) { - // Stacktrace for unknown exception. - StringWriter trace = new StringWriter(); - e.printStackTrace(new PrintWriter(trace, true)); - requestIo.requestOutErr.printErr("SERVER ERROR: " + trace); - LOG.severe("SERVER ERROR: " + trace); - } - - if (rpcService.isShutdown()) { - // In case of shutdown, disable the listening socket *before* we write - // the last part of the response. Otherwise, a sufficiently fast client - // could read the response and exit, and a new client could make a - // connection to this server, which is still in the listening state, even - // though it is about to shut down imminently. - closeServerSocket(); - } - - requestIo.writeExitStatus(exitStatus); - } - - /** - * Because it's a little complicated, this class factors out all the IO Hook - * up we need per request, that is, in - * {@link RPCServer#executeRequest(List, RequestIo)}. - * It's unfortunately complicated, so it's explained here. - */ - private static class RequestIo { - - // Used by the client code - private final InputStream in; - private final OutErr requestOutErr; - private final OutputStream controlChannel; - - // just used by this class to keep the state around - private final Socket requestSocket; - private final OutputStream requestOut; - private final long firstContactTime; - - RequestIo(Socket requestSocket, long firstContactTime) throws IOException { - this.requestSocket = requestSocket; - this.firstContactTime = firstContactTime; - this.in = requestSocket.getInputStream(); - this.requestOut = requestSocket.getOutputStream(); - - // We encode the response sent to the client with a multiplexer so - // we can send three streams (out / err / control) over one wire stream - // (requestOut). - StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut); - - // We'll be writing control messages (exit code + out of date message) - // to this control channel. - controlChannel = multiplexer.createControl(); - - // This is the outErr part of the multiplexed output. - requestOutErr = OutErr.create(multiplexer.createStdout(), - multiplexer.createStderr()); - // We hook up System.out / System.err to our IO object. Stuff written to - // System.out / System.err will show up on the user's screen, prefixed - // with "System.out "/"System.err ". - requestOutErr.addSystemOutErrAsSource(); - } - - public void writeExitStatus(int exitStatus) { - // Make sure to flush the output / error streams prior to writing the exit status. - // The client may stop reading that direction of the socket immediately upon reading the - // exit code. - flushOutErr(); - try { - controlChannel.write((exitStatus >> 24) & 0xff); - controlChannel.write((exitStatus >> 16) & 0xff); - controlChannel.write((exitStatus >> 8) & 0xff); - controlChannel.write(exitStatus & 0xff); - controlChannel.flush(); - LOG.info("" + exitStatus); - } catch (IOException ignored) { - // This exception is historically ignored. - } - } - - private void flushOutErr() { - try { - requestOutErr.getOutputStream().flush(); - } catch (IOException e) { - printStack(e); - } - try { - requestOutErr.getErrorStream().flush(); - } catch (IOException e) { - printStack(e); - } - } - - public void shutdown() { - try { - requestOut.close(); - } catch (IOException e) { - printStack(e); - } - try { - in.close(); - } catch (IOException e) { - printStack(e); - } - try { - requestSocket.close(); - } catch (IOException e) { - printStack(e); - } - } + static void printStack(IOException e) { + /* + * Hopefully this never happens. It's not very nice to just write this + * to the user's console, but I'm not sure what better choice we have. + */ + StringWriter err = new StringWriter(); + PrintWriter printErr = new PrintWriter(err); + printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]======="); + e.printStackTrace(printErr); + printErr.println("====================================================="); + LOG.severe(err.toString()); } /** - * Creates and returns a new RPC server. - * Use {@link RPCServer#serve()} to start the server. - * - * @param appCommand The application's ServerCommand implementation. - * @param serverDirectory The directory for server-related files. The caller - * must ensure the directory has been created. - * @param workspaceDir The workspace, used solely to ensure it persists. - * @param maxIdleSeconds The idle time in seconds after which the rpc - * server will die unless it receives a request. + * Start serving and block until the a shutdown command is received. */ - public static RPCServer newServerWith(Clock clock, - ServerCommand appCommand, - Path serverDirectory, - Path workspaceDir, - int maxIdleSeconds) - throws IOException { - if (!serverDirectory.exists()) { - serverDirectory.createDirectory(); - } - - // Creates and starts the RPC server. - RPCService service = new RPCService(appCommand); - - return new RPCServer(clock, service, maxIdleSeconds * 1000L, - serverDirectory, workspaceDir); - } - + public abstract void serve() throws IOException; } diff --git a/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java new file mode 100644 index 0000000000..9e9c7af3e9 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java @@ -0,0 +1,132 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.devtools.build.lib.testutil.Suite; +import com.google.devtools.build.lib.testutil.TestSpec; +import com.google.devtools.build.lib.util.JavaClock; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.util.io.RecordingOutErr; +import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.util.FsApparatus; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.File; +import java.util.List; + +/** + * Run a real RPC server on localhost, and talk to it using the testing + * client. + */ +@TestSpec(size = Suite.MEDIUM_TESTS) +@RunWith(JUnit4.class) +public class AfUnixServerTest { + + private static final long MAX_IDLE_MILLIS = 10000; + private static final long HEALTH_CHECK_MILLIS = 1000 * 3; + private static final String COMMAND_STDOUT = "Heelllloo...."; + private static final String COMMAND_STDERR = "...world!"; + + private AfUnixServer server; + private FsApparatus scratch = FsApparatus.newNative(); + private RecordingOutErr outErr = new RecordingOutErr(); + private Path serverDir; + private Path workspaceDir; + private RPCTestingClient client; + private Thread serverThread = new Thread(){ + @Override + public void run() { + server.serve(); + } + }; + + private static final ServerCommand helloWorldCommand = new ServerCommand() { + @Override + public int exec(List args, OutErr outErr, long firstContactTime) throws Exception { + outErr.printOut(COMMAND_STDOUT); + outErr.printErr(COMMAND_STDERR); + return 42; + } + @Override + public boolean shutdown() { + return false; + } + }; + + @Before + public final void startServer() throws Exception { + // Do not use `createUnixTempDir()` here since the file name that results is longer + // than 108 characters, so cannot be used as local socket address. + File file = File.createTempFile("scratch", ".tmp", new File("/tmp")); + file.delete(); + file.mkdir(); + serverDir = this.scratch.dir(file.getAbsolutePath()); + + workspaceDir = this.scratch.createUnixTempDir(); + workspaceDir.createDirectory(); + client = new RPCTestingClient( + outErr, serverDir.getRelative("server.socket")); + RPCService service = new RPCService(helloWorldCommand); + server = new AfUnixServer(new JavaClock(), service, MAX_IDLE_MILLIS, HEALTH_CHECK_MILLIS, + serverDir, workspaceDir); + serverThread.start(); + } + + @After + public final void stopServer() throws Exception { + serverThread.interrupt(); + serverThread.join(); + + FileSystemUtils.deleteTree(serverDir); + } + + private void runTestRequest(String request, int ret, String out, String err) throws Exception { + assertEquals(ret, client.sendRequest(request)); + assertEquals(out, outErr.outAsLatin1()); + assertThat(outErr.errAsLatin1()).contains(err); + } + + @Test + public void testUnknownCommand() throws Exception { + runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); + } + + @Test + public void testEmptyBlazeCommand() throws Exception { + runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); + } + + @Test + public void testWorkspaceDies() throws Exception { + assertTrue(serverThread.isAlive()); + runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR); + Thread.sleep(HEALTH_CHECK_MILLIS * 2); + assertTrue(serverThread.isAlive()); + + assertTrue(workspaceDir.delete()); + Thread.sleep(HEALTH_CHECK_MILLIS * 2); + assertFalse(serverThread.isAlive()); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java b/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java deleted file mode 100644 index f81a0384f3..0000000000 --- a/src/test/java/com/google/devtools/build/lib/server/RPCServerTest.java +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2015 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.server; -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import com.google.devtools.build.lib.testutil.Suite; -import com.google.devtools.build.lib.testutil.TestSpec; -import com.google.devtools.build.lib.util.JavaClock; -import com.google.devtools.build.lib.util.io.OutErr; -import com.google.devtools.build.lib.util.io.RecordingOutErr; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.Path; -import com.google.devtools.build.lib.vfs.util.FsApparatus; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; -import java.util.List; - -/** - * Run a real RPC server on localhost, and talk to it using the testing - * client. - */ -@TestSpec(size = Suite.MEDIUM_TESTS) -@RunWith(JUnit4.class) -public class RPCServerTest { - - private static final long MAX_IDLE_MILLIS = 10000; - private static final long HEALTH_CHECK_MILLIS = 1000 * 3; - private static final String COMMAND_STDOUT = "Heelllloo...."; - private static final String COMMAND_STDERR = "...world!"; - - private RPCServer server; - private FsApparatus scratch = FsApparatus.newNative(); - private RecordingOutErr outErr = new RecordingOutErr(); - private Path serverDir; - private Path workspaceDir; - private RPCTestingClient client; - private Thread serverThread = new Thread(){ - @Override - public void run() { - server.serve(); - } - }; - - private static final ServerCommand helloWorldCommand = new ServerCommand() { - @Override - public int exec(List args, OutErr outErr, long firstContactTime) throws Exception { - outErr.printOut(COMMAND_STDOUT); - outErr.printErr(COMMAND_STDERR); - return 42; - } - @Override - public boolean shutdown() { - return false; - } - }; - - @Before - public final void startServer() throws Exception { - // Do not use `createUnixTempDir()` here since the file name that results is longer - // than 108 characters, so cannot be used as local socket address. - File file = File.createTempFile("scratch", ".tmp", new File("/tmp")); - file.delete(); - file.mkdir(); - serverDir = this.scratch.dir(file.getAbsolutePath()); - - workspaceDir = this.scratch.createUnixTempDir(); - workspaceDir.createDirectory(); - client = new RPCTestingClient( - outErr, serverDir.getRelative("server.socket")); - RPCService service = new RPCService(helloWorldCommand); - server = new RPCServer(new JavaClock(), service, MAX_IDLE_MILLIS, HEALTH_CHECK_MILLIS, - serverDir, workspaceDir); - serverThread.start(); - } - - @After - public final void stopServer() throws Exception { - serverThread.interrupt(); - serverThread.join(); - - FileSystemUtils.deleteTree(serverDir); - } - - private void runTestRequest(String request, int ret, String out, String err) throws Exception { - assertEquals(ret, client.sendRequest(request)); - assertEquals(out, outErr.outAsLatin1()); - assertThat(outErr.errAsLatin1()).contains(err); - } - - @Test - public void testUnknownCommand() throws Exception { - runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); - } - - @Test - public void testEmptyBlazeCommand() throws Exception { - runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); - } - - @Test - public void testWorkspaceDies() throws Exception { - assertTrue(serverThread.isAlive()); - runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR); - Thread.sleep(HEALTH_CHECK_MILLIS * 2); - assertTrue(serverThread.isAlive()); - - assertTrue(workspaceDir.delete()); - Thread.sleep(HEALTH_CHECK_MILLIS * 2); - assertFalse(serverThread.isAlive()); - } -} -- cgit v1.2.3