diff options
15 files changed, 73 insertions, 2220 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc index 57824ace54..c14206d0b1 100644 --- a/src/main/cpp/blaze.cc +++ b/src/main/cpp/blaze.cc @@ -232,22 +232,6 @@ uint64_t BlazeServer::AcquireLock() { globals->options->block_for_lock, &blaze_lock_); } -// Communication method that uses an AF_UNIX socket and a custom protocol. -class AfUnixBlazeServer : public BlazeServer { - public: - AfUnixBlazeServer(); - virtual ~AfUnixBlazeServer() {} - - virtual bool Connect(); - virtual void Disconnect(); - virtual unsigned int Communicate(); - virtual void KillRunningServer(); - virtual void Cancel(); - - private: - int server_socket_; -}; - // Communication method that uses gRPC on a socket bound to localhost. More // documentation is in command_server.proto . class GrpcBlazeServer : public BlazeServer { @@ -708,254 +692,6 @@ static void StartStandalone(BlazeServer* server) { pdie(blaze_exit_code::INTERNAL_ERROR, "execv of '%s' failed", exe.c_str()); } -AfUnixBlazeServer::AfUnixBlazeServer() { - server_socket_ = -1; - connected_ = false; -} - -bool AfUnixBlazeServer::Connect() { - assert(!connected_); - - if (server_socket_ == -1) { - server_socket_ = socket(PF_UNIX, SOCK_STREAM, 0); - if (server_socket_ == -1) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "can't create AF_UNIX socket"); - } - - if (fcntl(server_socket_, F_SETFD, FD_CLOEXEC) == -1) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "fcntl(F_SETFD, FD_CLOEXEC) failed"); - } - } - - struct sockaddr_un addr; - addr.sun_family = AF_UNIX; - - string socket_file = globals->options->output_base + "/server/server.socket"; - char *resolved_path = realpath(socket_file.c_str(), NULL); - if (resolved_path != NULL) { - strncpy(addr.sun_path, resolved_path, sizeof addr.sun_path); - addr.sun_path[sizeof addr.sun_path - 1] = '\0'; - free(resolved_path); - sockaddr *paddr = reinterpret_cast<sockaddr *>(&addr); - int result = connect(server_socket_, paddr, sizeof addr); - connected_ = result == 0; - if (connected_) { - string server_dir = globals->options->output_base + "/server"; - globals->server_pid = GetServerPid(server_dir); - if (globals->server_pid <= 0) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "can't get PID of existing server (server dir=%s)", - server_dir.c_str()); - } - } - - return connected_; - } else if (errno == ENOENT) { // No socket means no server to connect to - errno = ECONNREFUSED; - return false; - } else { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "realpath('%s') failed", socket_file.c_str()); - return false; - } -} - -void AfUnixBlazeServer::Disconnect() { - assert(connected_); - - close(server_socket_); - connected_ = false; - server_socket_ = -1; -} - -static int ServerEof() { - // e.g. external SIGKILL of server, misplaced System.exit() in the server, - // or a JVM crash. Print out the jvm.out file in case there's something - // useful. - fprintf(stderr, "Error: unexpected EOF from %s server.\n" - "Contents of '%s':\n", globals->options->product_name.c_str(), - globals->jvm_log_file.c_str()); - WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str()); - return GetExitCodeForAbruptExit(*globals); -} - -// Reads a single char from the specified stream. -static int ReadServerChar(int fd, unsigned char *result) { - if (read(fd, result, 1) != 1) { - return ServerEof(); - } - return 0; -} - -static int ReadServerInt(int fd, unsigned int *result) { - unsigned char buffer[4]; - unsigned char *p = buffer; - int remaining = 4; - - while (remaining > 0) { - int bytes_read = read(fd, p, remaining); - if (bytes_read <= 0) { - return ServerEof(); - } - - remaining -= bytes_read; - p += bytes_read; - } - - *result = (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) - + buffer[3]; - return 0; -} - -static char server_output_buffer[8192]; - -// Forwards the output of the server to the specified file handle. -static int ForwardServerOutput(int socket, int output, bool* pipe_broken) { - unsigned int remaining; - int exit_code = ReadServerInt(socket, &remaining); - if (exit_code != 0) { - return exit_code; - } - while (remaining > 0) { - int bytes = remaining > 8192 ? 8192 : remaining; - bytes = read(socket, server_output_buffer, bytes); - if (bytes <= 0) { - return ServerEof(); - } - - remaining -= bytes; - if (write(output, server_output_buffer, bytes) < 0 && errno == EPIPE) { - *pipe_broken = true; - } - } - - return 0; -} - -unsigned int AfUnixBlazeServer::Communicate() { - assert(connected_); - - const string request = BuildServerRequest(); - - // Send request (Request is written in a single chunk.) - char request_size[4]; - request_size[0] = (request.size() >> 24) & 0xff; - request_size[1] = (request.size() >> 16) & 0xff; - request_size[2] = (request.size() >> 8) & 0xff; - request_size[3] = (request.size()) & 0xff; - if (write(server_socket_, request_size, 4) != 4) { - pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed"); - } - - if (write(server_socket_, request.data(), request.size()) != request.size()) { - pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed"); - } - - // Wait until we receive some response from the server. - // (We do this by calling select() with a timeout.) - // If we don't receive a response within 3 seconds, print a message, - // so that the user has some idea what is going on. - while (true) { - fd_set fdset; - FD_ZERO(&fdset); - FD_SET(server_socket_, &fdset); - struct timeval timeout; - timeout.tv_sec = 3; - timeout.tv_usec = 0; - int result = select(server_socket_ + 1, &fdset, NULL, &fdset, &timeout); - if (result > 0) { - // Data is ready on socket. Go ahead and read it. - break; - } else if (result == 0) { - // Timeout. Print a message, then go ahead and read from - // the socket (the read will usually block). - fprintf(stderr, - "INFO: Waiting for response from %s server (pid %d)...\n", - globals->options->product_name.c_str(), globals->server_pid); - break; - } else { // result < 0 - // Error. For EINTR we try again, all other errors are fatal. - if (errno != EINTR) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "select() on server socket failed"); - } - } - } - - // Read and demux the response. - const int TAG_STDOUT = 1; - const int TAG_STDERR = 2; - const int TAG_CONTROL = 3; - unsigned int exit_code; - bool pipe_broken = false; - for (;;) { - // Read the tag - unsigned char tag; - bool pipe_broken_now = false; - exit_code = ReadServerChar(server_socket_, &tag); - if (exit_code != 0) { - return exit_code; - } - - switch (tag) { - // stdout - case TAG_STDOUT: - exit_code = ForwardServerOutput(server_socket_, STDOUT_FILENO, - &pipe_broken_now); - if (exit_code != 0) { - return exit_code; - } - if (pipe_broken_now && !pipe_broken) { - pipe_broken = true; - Cancel(); - } - break; - - // stderr - case TAG_STDERR: - exit_code = ForwardServerOutput(server_socket_, STDERR_FILENO, - &pipe_broken_now); - if (exit_code != 0) { - return exit_code; - } - if (pipe_broken_now && !pipe_broken) { - pipe_broken = true; - Cancel(); - } - break; - - // Control stream. Currently only used for reporting the exit code. - case TAG_CONTROL: - unsigned int length; - exit_code = ReadServerInt(server_socket_, &length); - if (exit_code != 0) { - // We cannot read the length field. The return value of ReadSeverInt() - // is the result of ServerEof(), so we bail out early so that we don't - // call ServerEof() twice. - return exit_code; - } - - if (length != 4) { - return ServerEof(); - } - unsigned int server_exit_code; - exit_code = ReadServerInt(server_socket_, &server_exit_code); - return exit_code != 0 ? exit_code : server_exit_code; - - default: - fprintf(stderr, "bad tag %d\n", tag); - return ServerEof(); - } - } -} - -void AfUnixBlazeServer::Cancel() { - assert(connected_); - kill(globals->server_pid, SIGINT); -} - // Write the contents of file_name to stream. static void WriteFileToStreamOrDie(FILE *stream, const char *file_name) { FILE *fp = fopen(file_name, "r"); @@ -1090,40 +826,6 @@ static bool WaitForServerDeath(pid_t pid, int wait_time_secs) { return false; } -// Kills the specified running Blaze server. First we send a SIGTERM, and if -// that does not kill the process, a SIGKILL. -void AfUnixBlazeServer::KillRunningServer() { - assert(connected_); - assert(globals->server_pid > 0); - - close(server_socket_); - server_socket_ = -1; - fprintf(stderr, "Sending SIGTERM to previous %s server (pid=%d)... ", - globals->options->product_name.c_str(), globals->server_pid); - fflush(stderr); - kill(globals->server_pid, SIGTERM); - if (WaitForServerDeath(globals->server_pid, 10)) { - fprintf(stderr, "done.\n"); - connected_ = false; - return; - } - - // If the previous attempt did not suceeded, kill the whole group. - fprintf(stderr, - "Sending SIGKILL to previous %s server process group (pid=%d)... ", - globals->options->product_name.c_str(), globals->server_pid); - fflush(stderr); - killpg(globals->server_pid, SIGKILL); - if (WaitForServerDeath(globals->server_pid, 10)) { - fprintf(stderr, "killed.\n"); - connected_ = false; - return; - } - - // Process did not go away 10s after SIGKILL. Stuck in state 'Z' or 'D'? - pdie(blaze_exit_code::INTERNAL_ERROR, "SIGKILL unsuccessful after 10s"); -} - // Calls fsync() on the file (or directory) specified in 'file_path'. // pdie()'s if syncing fails. static void SyncFile(const char *file_path) { @@ -1821,13 +1523,6 @@ int Main(int argc, const char *argv[], OptionProcessor *option_processor) { CheckBinaryPath(argv[0]); ParseOptions(argc, argv); -#ifdef __CYGWIN__ - if (globals->options->command_port == -1) { - // AF_UNIX does not work on Windows, so use gRPC instead. - globals->options->command_port = 0; - } -#endif - string error; blaze_exit_code::ExitCode reexec_options_exit_code = globals->options->CheckForReExecuteOptions(argc, argv, &error); @@ -1840,9 +1535,7 @@ int Main(int argc, const char *argv[], OptionProcessor *option_processor) { const string self_path = GetSelfPath(); ComputeBaseDirectories(self_path); - blaze_server = globals->options->command_port >= 0 - ? static_cast<BlazeServer *>(new GrpcBlazeServer()) - : static_cast<BlazeServer *>(new AfUnixBlazeServer()); + blaze_server = static_cast<BlazeServer *>(new GrpcBlazeServer()); globals->command_wait_time = blaze_server->AcquireLock(); diff --git a/src/main/cpp/startup_options.cc b/src/main/cpp/startup_options.cc index 3f93b30aa2..85ec97cd09 100644 --- a/src/main/cpp/startup_options.cc +++ b/src/main/cpp/startup_options.cc @@ -221,10 +221,10 @@ blaze_exit_code::ExitCode StartupOptions::ProcessArg( } else if ((value = GetUnaryOption( arg, next_arg, "--command_port")) != NULL) { if (!blaze_util::safe_strto32(value, &command_port) || - command_port < -1 || command_port > 65535) { + command_port < 0 || command_port > 65535) { blaze_util::StringPrintf(error, - "Invalid argument to --command_port: '%s'. " - "Must be a valid port number or -1 to disable the gRPC server.\n", + "Invalid argument to --command_port: '%s'.\n" + "Must be a valid port number or 0.\n", value); return blaze_exit_code::BAD_ARGV; } diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java index e4fad28e57..27e2ecc6d6 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java @@ -50,7 +50,6 @@ import com.google.devtools.build.lib.rules.test.CoverageReportActionFactory; import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; import com.google.devtools.build.lib.runtime.commands.InfoItem; import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy; -import com.google.devtools.build.lib.server.AfUnixServer; import com.google.devtools.build.lib.server.RPCServer; import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; import com.google.devtools.build.lib.shell.JavaSubprocessFactory; @@ -808,23 +807,17 @@ public final class BlazeRuntime { BlazeServerStartupOptions startupOptions = runtime.getStartupOptionsProvider().getOptions(BlazeServerStartupOptions.class); - if (startupOptions.commandPort != -1) { - try { - // This is necessary so that Bazel kind of works during bootstrapping, at which time the - // gRPC server is not compiled in so that we don't need gRPC for bootstrapping. - Class<?> factoryClass = Class.forName( - "com.google.devtools.build.lib.server.GrpcServerImpl$Factory"); - RPCServer.Factory factory = (RPCServer.Factory) factoryClass.getConstructor().newInstance(); - return factory.create(commandExecutor, runtime.getClock(), - startupOptions.commandPort, runtime.getServerDirectory(), - startupOptions.maxIdleSeconds); - } catch (ReflectiveOperationException | IllegalArgumentException e) { - throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR); - } - } else { - return AfUnixServer.newServerWith(runtime.getClock(), commandExecutor, - runtime.getServerDirectory(), runtime.workspace.getWorkspace(), - startupOptions.maxIdleSeconds); + try { + // This is necessary so that Bazel kind of works during bootstrapping, at which time the + // gRPC server is not compiled in so that we don't need gRPC for bootstrapping. + Class<?> factoryClass = Class.forName( + "com.google.devtools.build.lib.server.GrpcServerImpl$Factory"); + RPCServer.Factory factory = (RPCServer.Factory) factoryClass.getConstructor().newInstance(); + return factory.create(commandExecutor, runtime.getClock(), + startupOptions.commandPort, runtime.getServerDirectory(), + startupOptions.maxIdleSeconds); + } catch (ReflectiveOperationException | IllegalArgumentException e) { + throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR); } } diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java deleted file mode 100644 index 4444d8424e..0000000000 --- a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java +++ /dev/null @@ -1,558 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.server; - -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.io.ByteStreams; -import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.ShutdownMethod; -import com.google.devtools.build.lib.server.RPCService.UnknownCommandException; -import com.google.devtools.build.lib.unix.LocalClientSocket; -import com.google.devtools.build.lib.unix.LocalServerSocket; -import com.google.devtools.build.lib.unix.LocalSocketAddress; -import com.google.devtools.build.lib.unix.NativePosixFiles; -import com.google.devtools.build.lib.util.Clock; -import com.google.devtools.build.lib.util.ThreadUtils; -import com.google.devtools.build.lib.util.io.OutErr; -import com.google.devtools.build.lib.util.io.StreamMultiplexer; -import com.google.devtools.build.lib.vfs.Path; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -/** - * An RPCServer server is a Java object that sits and waits for RPC requests - * (the sit-and-wait is implemented in {@link #serve()}). These requests - * arrive via UNIX file sockets. The RPCServer then calls the application - * (which implements ServerCommand) to handle the request. (Since the Blaze - * server may need to stat hundreds of directories during initialization, this - * is a significant speedup.) The server thread will terminate after idling - * for a user-specified time. - * - * Note: If you are contemplating to call into the RPCServer from - * within Java, consider using the {@link RPCService} class instead. - */ -// TODO(bazel-team): Signal handling. -// TODO(bazel-team): Gives clients status information when the server is busy. One -// way to do this is to put the server status in a file (pid, the current -// target, etc) in the server directory. Alternatively, we can have a separate -// thread taking care of the server socket and put the information into socket -// handshakes. -// TODO(bazel-team): Use Reporter for server-side messages. -public final class AfUnixServer extends RPCServer { - private final Clock clock; - private final RPCService rpcService; - private final LocalServerSocket serverSocket; - private final long maxIdleMillis; - private final long statusCheckMillis; - private final Path serverDirectory; - private final Path workspaceDir; - private static final Logger LOG = Logger.getLogger(AfUnixServer.class.getName()); - private volatile boolean lameDuck; - - private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute. - private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0'); - - /** - * Create a new server instance. After creating the server, you can start it - * by calling the {@link #serve()} method. - * - * @param clock The clock to take time measurements - * @param rpcService The underlying service object, which takes - * care of dispatching to the {@link ServerCommand} - * instances, as requests arrive. - * @param maxIdleMillis The maximum time the server will wait idly. - * @param statusCheckPeriodMillis How long to wait between system status checks. - * @param serverDirectory Directory to put file socket and pid files, etc. - * @param workspaceDir The workspace. Used solely to ensure it persists. - * @throws IOException - */ - public AfUnixServer(Clock clock, RPCService rpcService, - long maxIdleMillis, long statusCheckPeriodMillis, - Path serverDirectory, Path workspaceDir) - throws IOException { - super(serverDirectory); - this.clock = clock; - this.rpcService = rpcService; - this.maxIdleMillis = maxIdleMillis; - this.statusCheckMillis = statusCheckPeriodMillis; - this.serverDirectory = serverDirectory; - this.workspaceDir = workspaceDir; - - this.serverSocket = openServerSocket(); - serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis)); - lameDuck = false; - } - - /** - * Create a new server instance. After creating the server, you can start it - * by calling the {@link #serve()} method. - * - * @param clock The clock to take time measurements - * @param rpcService The underlying service object, which takes - * care of dispatching to the {@link ServerCommand} - * instances, as requests arrive. - * @param maxIdleMillis The maximum time the server will wait idly. - * @param serverDirectory Directory to put file socket and pid files, etc. - * @param workspaceDir The workspace. Used solely to ensure it persists. - * @throws IOException - */ - public AfUnixServer(Clock clock, RPCService rpcService, - long maxIdleMillis, Path serverDirectory, Path workspaceDir) - throws IOException { - this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS, - serverDirectory, workspaceDir); - } - - - private final AtomicBoolean inAction = new AtomicBoolean(false); - private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); - private final AtomicLong cmdNum = new AtomicLong(); - private final Thread mainThread = Thread.currentThread(); - private final Object interruptLock = new Object(); - - @Override - public void interrupt() { - // Only interrupt during actions - otherwise we may end up setting the interrupt bit - // at the end of a build and responding to it at the beginning of the subsequent build. - synchronized (interruptLock) { - if (allowingInterrupt.get()) { - mainThread.interrupt(); - } - } - - if (inAction.get()) { - Runnable interruptWatcher = - new Runnable() { - @Override - public void run() { - try { - long originalCmd = cmdNum.get(); - Thread.sleep(10 * 1000); - if (inAction.get() && cmdNum.get() == originalCmd) { - // We're still operating on the same command. - // Interrupt took too long. - ThreadUtils.warnAboutSlowInterrupt(); - } - } catch (InterruptedException e) { - // Ignore. - } - } - }; - Thread interruptWatcherThread = - new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); - interruptWatcherThread.setDaemon(true); - interruptWatcherThread.start(); - } - } - - /** - * Wait on a socket for business (answer requests). Note that this - * method won't return until the server shuts down. - */ - @Override - public void serve() { - try { - while (!lameDuck) { - try { - IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir); - idleChecker.idle(); - RequestIo requestIo; - - long startTime = clock.currentTimeMillis(); - while (true) { - try { - allowingInterrupt.set(true); - Socket socket = serverSocket.accept(); - long firstContactTime = clock.currentTimeMillis(); - requestIo = new RequestIo(socket, firstContactTime); - break; - } catch (SocketTimeoutException e) { - long idleTime = clock.currentTimeMillis() - startTime; - if (lameDuck) { - closeServerSocket(); - return; - } else if (idleTime > maxIdleMillis - || (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) { - enterLameDuck(); - } - } - } - idleChecker.busy(); - - - List<String> request = null; - try { - request = extractRequest(requestIo); - cmdNum.incrementAndGet(); - inAction.set(true); - if (request != null) { - executeRequest(request, requestIo); - } - } finally { - inAction.set(false); - // Don't reset interruption unless we executed a request. Otherwise this is just a - // ping from the client verifying our existence, in which case we should retain the - // interrupt status for the subsequent request. - if (request != null) { - synchronized (interruptLock) { - allowingInterrupt.set(false); - Thread.interrupted(); // clears thread interrupted status - } - } - requestIo.shutdown(); - switch (rpcService.getShutdown()) { - case NONE: - break; - - case CLEAN: - return; - - case EXPUNGE: - disableShutdownHooks(); - return; - } - } - } catch (EOFException e) { - LOG.info("Connection to the client lost: " - + e.getMessage()); - } catch (IOException e) { - // Something else happened. Print a stack trace for debugging. - printStack(e); - } - } - } finally { - rpcService.shutdown(ShutdownMethod.CLEAN); - LOG.info("Logging finished"); - } - } - - private void closeServerSocket() { - LOG.info("Closing serverSocket."); - try { - serverSocket.close(); - } catch (IOException e) { - printStack(e); - } - - if (!lameDuck) { - try { - getSocketPath().delete(); - } catch (IOException e) { - printStack(e); - } - } - } - - /** - * Allow one last request to be serviced. - */ - private void enterLameDuck() { - lameDuck = true; - try { - getSocketPath().delete(); - } catch (IOException e) { - e.printStackTrace(); - } - serverSocket.setSoTimeout(1); - } - - /** - * Returns the path of the socket file to be used. - */ - public Path getSocketPath() { - return serverDirectory.getRelative("server.socket"); - } - - /** - * Ensures no other server is running for the current socket file. This - * guarantees that no two servers are running against the same output - * directory. - * - * @throws IOException if another server holds the lock for the socket file. - */ - public static void ensureExclusiveAccess(Path socketFile) throws IOException { - LocalSocketAddress address = - new LocalSocketAddress(socketFile.getPathFile()); - if (socketFile.exists()) { - try { - new LocalClientSocket(address).close(); - } catch (IOException e) { - // The previous server process is dead--unlink the file: - socketFile.delete(); - return; - } - // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message - // and add it to the message. - throw new IOException("Socket file " + socketFile.getPathString() - + " is locked by another server"); - } - } - - /** - * Opens a UNIX local server socket. - * @throws IOException if the socket file is used by another server or can - * not be made exclusive. - */ - private LocalServerSocket openServerSocket() throws IOException { - // This is the "well known" socket path via which the server is found... - Path socketFile = getSocketPath(); - - // ...but it may have a name that's too long for AF_UNIX, in which case we - // make it a symlink to /tmp/something. This typically only happens in - // tests where the --output_base is beneath a very deep temp dir. - // (All this extra complexity is just used in tests... *sigh*). - if (socketFile.toString().length() >= 104) { // = UNIX_PATH_MAX - Path socketLink = socketFile; - String tmpDirDefault = System.getenv("TMPDIR"); - if (tmpDirDefault == null - || tmpDirDefault.length() > 104 - "/blaze-4294967296/server.socket".length()) { - // Default for unset TMPDIR, or if TMPDIR is so that the resulting - // path would be too long. - tmpDirDefault = "/tmp"; - } - String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", tmpDirDefault); - socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)). - getRelative("server.socket"); - LOG.info("Using symlinked socket at " + socketFile); - - socketLink.delete(); // Remove stale symlink, if any. - socketLink.createSymbolicLink(socketFile); - - deleteAtExit(socketLink, /*deleteParent=*/false); - deleteAtExit(socketFile, /*deleteParent=*/true); - } else { - deleteAtExit(socketFile, /*deleteParent=*/false); - } - - ensureExclusiveAccess(socketFile); - - - LocalServerSocket serverSocket = new LocalServerSocket(); - serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile())); - NativePosixFiles.chmod(socketFile.getPathFile(), 0600); // Lock it down. - serverSocket.listen(/*backlog=*/50); - return serverSocket; - } - - // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a - // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it - // succeeds. - private static Path createTempSocketDirectory(Path tempDir) { - Random random = new Random(); - while (true) { - Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt())); - try { - if (socketDir.createDirectory()) { - // Make sure it's private; unfortunately, createDirectory() doesn't take a mode - // argument. - socketDir.chmod(0700); - return socketDir; // Created. - } - // Already existed; try again. - } catch (IOException e) { - // Failed; try again. - } - } - } - - /** - * Read a string in platform default encoding and split it into a list of - * NUL-separated words. - * - * <p>Blaze consistently uses the platform default encoding (defined in - * blaze.cc) to interface with Unix APIs. - */ - private static List<String> readRequest(InputStream input) throws IOException { - byte[] sizeBuffer = new byte[4]; - ByteStreams.readFully(input, sizeBuffer); - int size = ((sizeBuffer[0] & 0xff) << 24) - + ((sizeBuffer[1] & 0xff) << 16) - + ((sizeBuffer[2] & 0xff) << 8) - + (sizeBuffer[3] & 0xff); - byte[] inputBytes = new byte[size]; - ByteStreams.readFully(input, inputBytes); - - String s = new String(inputBytes, Charset.defaultCharset()); - return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s)); - } - - private static List<String> extractRequest(RequestIo requestIo) throws IOException { - List<String> request = readRequest(requestIo.in); - if (request == null) { - LOG.info("Short-circuiting empty request"); - return null; - } - return request; - } - - private void executeRequest(List<String> request, RequestIo requestIo) { - Preconditions.checkNotNull(request); - int exitStatus = 2; - try { - exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr, - requestIo.firstContactTime); - LOG.info("Finished executing request"); - } catch (UnknownCommandException e) { - requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage()); - LOG.severe("SERVER ERROR: " + e.getMessage()); - } catch (Exception e) { - // Stacktrace for unknown exception. - StringWriter trace = new StringWriter(); - e.printStackTrace(new PrintWriter(trace, true)); - requestIo.requestOutErr.printErr("SERVER ERROR: " + trace); - LOG.severe("SERVER ERROR: " + trace); - } - - if (rpcService.getShutdown() != ShutdownMethod.NONE) { - // In case of shutdown, disable the listening socket *before* we write - // the last part of the response. Otherwise, a sufficiently fast client - // could read the response and exit, and a new client could make a - // connection to this server, which is still in the listening state, even - // though it is about to shut down imminently. - closeServerSocket(); - } - - requestIo.writeExitStatus(exitStatus); - } - - /** - * Because it's a little complicated, this class factors out all the IO Hook - * up we need per request, that is, in - * {@link AfUnixServer#executeRequest(List, RequestIo)}. - * It's unfortunately complicated, so it's explained here. - */ - private static class RequestIo { - - // Used by the client code - private final InputStream in; - private final OutErr requestOutErr; - private final OutputStream controlChannel; - - // just used by this class to keep the state around - private final Socket requestSocket; - private final OutputStream requestOut; - private final long firstContactTime; - - RequestIo(Socket requestSocket, long firstContactTime) throws IOException { - this.requestSocket = requestSocket; - this.firstContactTime = firstContactTime; - this.in = requestSocket.getInputStream(); - this.requestOut = requestSocket.getOutputStream(); - - // We encode the response sent to the client with a multiplexer so - // we can send three streams (out / err / control) over one wire stream - // (requestOut). - StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut); - - // We'll be writing control messages (exit code + out of date message) - // to this control channel. - controlChannel = multiplexer.createControl(); - - // This is the outErr part of the multiplexed output. - requestOutErr = OutErr.create(multiplexer.createStdout(), - multiplexer.createStderr()); - // We hook up System.out / System.err to our IO object. Stuff written to - // System.out / System.err will show up on the user's screen, prefixed - // with "System.out "/"System.err ". - requestOutErr.addSystemOutErrAsSource(); - } - - public void writeExitStatus(int exitStatus) { - // Make sure to flush the output / error streams prior to writing the exit status. - // The client may stop reading that direction of the socket immediately upon reading the - // exit code. - flushOutErr(); - try { - controlChannel.write((exitStatus >> 24) & 0xff); - controlChannel.write((exitStatus >> 16) & 0xff); - controlChannel.write((exitStatus >> 8) & 0xff); - controlChannel.write(exitStatus & 0xff); - controlChannel.flush(); - LOG.info("" + exitStatus); - } catch (IOException ignored) { - // This exception is historically ignored. - } - } - - private void flushOutErr() { - try { - requestOutErr.getOutputStream().flush(); - } catch (IOException e) { - printStack(e); - } - try { - requestOutErr.getErrorStream().flush(); - } catch (IOException e) { - printStack(e); - } - } - - public void shutdown() { - try { - requestOut.close(); - } catch (IOException e) { - printStack(e); - } - try { - in.close(); - } catch (IOException e) { - printStack(e); - } - try { - requestSocket.close(); - } catch (IOException e) { - printStack(e); - } - } - } - - /** - * Creates and returns a new RPC server. - * Use {@link AfUnixServer#serve()} to start the server. - * - * @param appCommand The application's ServerCommand implementation. - * @param serverDirectory The directory for server-related files. The caller - * must ensure the directory has been created. - * @param workspaceDir The workspace, used solely to ensure it persists. - * @param maxIdleSeconds The idle time in seconds after which the rpc - * server will die unless it receives a request. - */ - public static AfUnixServer newServerWith(Clock clock, - ServerCommand appCommand, - Path serverDirectory, - Path workspaceDir, - int maxIdleSeconds) - throws IOException { - // Creates and starts the RPC server. - RPCService service = new RPCService(appCommand); - - return new AfUnixServer(clock, service, maxIdleSeconds * 1000L, - serverDirectory, workspaceDir); - } - -} diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index a3678c72c9..ffdf473a78 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -46,6 +46,8 @@ import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.security.SecureRandom; @@ -96,7 +98,7 @@ import javax.annotation.concurrent.GuardedBy; * to cancel it. Cancellation is done by the client sending the server a {@code cancel()} RPC call * which results in the main thread of the command being interrupted. */ -public class GrpcServerImpl extends RPCServer { +public class GrpcServerImpl implements RPCServer { private static final Logger log = Logger.getLogger(GrpcServerImpl.class.getName()); // UTF-8 won't do because we want to be able to pass arbitrary binary strings. @@ -421,6 +423,8 @@ public class GrpcServerImpl extends RPCServer { private static final String REQUEST_COOKIE_FILE = "request_cookie"; private static final String RESPONSE_COOKIE_FILE = "response_cookie"; + private static final AtomicBoolean runShutdownHooks = new AtomicBoolean(true); + @GuardedBy("runningCommands") private final Map<String, RunningCommand> runningCommands = new HashMap<>(); private final CommandExecutor commandExecutor; @@ -439,7 +443,14 @@ public class GrpcServerImpl extends RPCServer { public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, Path serverDirectory, int maxIdleSeconds) throws IOException { - super(serverDirectory); + // server.pid was written in the C++ launcher after fork() but before exec() . + // The client only accesses the pid file after connecting to the socket + // which ensures that it gets the correct pid value. + Path pidFile = serverDirectory.getRelative("server.pid.txt"); + Path pidSymlink = serverDirectory.getRelative("server.pid"); + deleteAtExit(pidFile, /*deleteParent=*/ false); + deleteAtExit(pidSymlink, /*deleteParent=*/ false); + this.commandExecutor = commandExecutor; this.clock = clock; this.serverDirectory = serverDirectory; @@ -610,6 +621,46 @@ public class GrpcServerImpl extends RPCServer { deleteAtExit(file, false); } + protected void disableShutdownHooks() { + runShutdownHooks.set(false); + } + + /** + * Schedule the specified file for (attempted) deletion at JVM exit. + */ + protected static void deleteAtExit(final Path path, final boolean deleteParent) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + if (!runShutdownHooks.get()) { + return; + } + + try { + path.delete(); + if (deleteParent) { + path.getParentDirectory().delete(); + } + } catch (IOException e) { + printStack(e); + } + } + }); + } + + static void printStack(IOException e) { + /* + * Hopefully this never happens. It's not very nice to just write this + * to the user's console, but I'm not sure what better choice we have. + */ + StringWriter err = new StringWriter(); + PrintWriter printErr = new PrintWriter(err); + printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]======="); + e.printStackTrace(printErr); + printErr.println("====================================================="); + log.severe(err.toString()); + } + private void executeCommand( RunRequest request, StreamObserver<RunResponse> observer, GrpcSink sink) { sink.setCommandThread(Thread.currentThread()); diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java index 40ccdeb915..8b7aefbef4 100644 --- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java @@ -17,85 +17,28 @@ import com.google.devtools.build.lib.runtime.CommandExecutor; import com.google.devtools.build.lib.util.Clock; import com.google.devtools.build.lib.vfs.Path; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; /** - * A server instance. Can either an AF_UNIX or a gRPC one. + * A gRPC server instance. */ -public abstract class RPCServer { - private static final Logger LOG = Logger.getLogger(RPCServer.class.getName()); - private static AtomicBoolean runShutdownHooks = new AtomicBoolean(true); - +public interface RPCServer { /** * Factory class for the gRPC server. * * Present so that we don't need to invoke a constructor with multiple arguments by reflection. */ - public interface Factory { + interface Factory { RPCServer create(CommandExecutor commandExecutor, Clock clock, int port, Path serverDirectory, int maxIdleSeconds) throws IOException; } - protected RPCServer(Path serverDirectory) throws IOException { - // server.pid was written in the C++ launcher after fork() but before exec() . - // The client only accesses the pid file after connecting to the socket - // which ensures that it gets the correct pid value. - Path pidFile = serverDirectory.getRelative("server.pid.txt"); - Path pidSymlink = serverDirectory.getRelative("server.pid"); - RPCServer.deleteAtExit(pidFile, /*deleteParent=*/ false); - RPCServer.deleteAtExit(pidSymlink, /*deleteParent=*/ false); - } - - protected void disableShutdownHooks() { - runShutdownHooks.set(false); - } - - /** - * Schedule the specified file for (attempted) deletion at JVM exit. - */ - protected static void deleteAtExit(final Path path, final boolean deleteParent) { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - if (!runShutdownHooks.get()) { - return; - } - - try { - path.delete(); - if (deleteParent) { - path.getParentDirectory().delete(); - } - } catch (IOException e) { - printStack(e); - } - } - }); - } - - static void printStack(IOException e) { - /* - * Hopefully this never happens. It's not very nice to just write this - * to the user's console, but I'm not sure what better choice we have. - */ - StringWriter err = new StringWriter(); - PrintWriter printErr = new PrintWriter(err); - printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]======="); - e.printStackTrace(printErr); - printErr.println("====================================================="); - LOG.severe(err.toString()); - } - /** * Start serving and block until the a shutdown command is received. */ - public abstract void serve() throws IOException; + void serve() throws IOException; /** * Called when the server receives a SIGINT. */ - public abstract void interrupt(); + void interrupt(); } diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java deleted file mode 100644 index 10335d1a93..0000000000 --- a/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.unix; - -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.SocketException; - -/** - * <p>An implementation of client Socket for local (AF_UNIX) sockets. - * - * <p>This class intentionally doesn't extend java.net.Socket although it - * has some similarity to it. The java.net class hierarchy is a terrible mess - * and is inextricably coupled to the Internet Protocol. - * - * <p>This code is not intended to be portable to non-UNIX platforms. - */ -public class LocalClientSocket extends LocalSocket { - - /** - * Constructs an unconnected local client socket. - * - * @throws IOException if the socket could not be created. - */ - public LocalClientSocket() throws IOException { - super(); - } - - /** - * Constructs a client socket and connects it to the specified address. - * - * @throws IOException if either of the socket/connect operations failed. - */ - public LocalClientSocket(LocalSocketAddress address) throws IOException { - super(); - connect(address); - } - - /** - * Connect to the specified server. Blocks until the server accepts the - * connection. - * - * @throws IOException if the connection failed. - */ - public synchronized void connect(LocalSocketAddress address) - throws IOException { - checkNotClosed(); - if (state == State.CONNECTED) { - throw new SocketException("socket is already connected"); - } - connect(fd, address.getName().toString()); // JNI - this.address = address; - this.state = State.CONNECTED; - } - - /** - * Returns the input stream for reading from the server. - * - * @param closeSocket close the socket when this input stream is closed. - * @throws IOException if there was a problem. - */ - public synchronized InputStream getInputStream(final boolean closeSocket) throws IOException { - checkConnected(); - checkInputNotShutdown(); - return new FileInputStream(fd) { - @Override - public void close() throws IOException { - if (closeSocket) { - LocalClientSocket.this.close(); - } - } - }; - } - - /** - * Returns the input stream for reading from the server. - * - * @throws IOException if there was a problem. - */ - public synchronized InputStream getInputStream() throws IOException { - return getInputStream(false); - } - - /** - * Returns the output stream for writing to the server. - * - * @throws IOException if there was a problem. - */ - public synchronized OutputStream getOutputStream() throws IOException { - checkConnected(); - checkOutputNotShutdown(); - return new FileOutputStream(fd) { - @Override public void close() { - // Don't close the file descriptor. - } - }; - } - - @Override - public String toString() { - return "LocalClientSocket(" + address + ")"; - } -} diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java deleted file mode 100644 index 0c0bd22c3f..0000000000 --- a/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.unix; - -import java.io.FileDescriptor; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; - -/** - * <p>An implementation of ServerSocket for local (AF_UNIX) sockets. - * - * <p>This class intentionally doesn't extend java.net.ServerSocket although it - * has some similarity to it. The java.net class hierarchy is a terrible mess - * and is inextricably coupled to the Internet Protocol. - * - * <p>This code is not intended to be portable to non-UNIX platforms. - */ -public class LocalServerSocket extends LocalSocket { - - // Socket timeout in milliseconds. No timeout by default. - private long soTimeoutMillis = 0; - - /** - * Constructs an unbound local server socket. - */ - public LocalServerSocket() throws IOException { - super(); - } - - /** - * Constructs a server socket, binds it to the specified address, and - * listens for incoming connections with the specified backlog. - * - * @throws IOException if any of the socket/bind/listen operations failed. - */ - public LocalServerSocket(LocalSocketAddress address, int backlog) - throws IOException { - this(); - bind(address); - listen(backlog); - } - - /** - * Constructs a server socket, binds it to the specified address, and begin - * listening for incoming connections using the default backlog. - * - * @throws IOException if any of the socket/bind/listen operations failed. - */ - public LocalServerSocket(LocalSocketAddress address) throws IOException { - this(address, 50); - } - - /** - * Specifies the timeout in milliseconds for accept(). Setting it to - * zero means an indefinite timeout. - */ - public void setSoTimeout(long timeoutMillis) { - soTimeoutMillis = timeoutMillis; - } - - /** - * Returns the current timeout in milliseconds. - */ - public long getSoTimeout() { - return soTimeoutMillis; - } - - /** - * Binds the specified address to this socket. The socket must be unbound. - * This causes the filesystem entry to appear. - * - * @throws IOException if the bind failed. - */ - public synchronized void bind(LocalSocketAddress address) - throws IOException { - if (address == null) { - throw new NullPointerException("address"); - } - checkNotClosed(); - if (state != State.NEW) { - throw new SocketException("socket is already bound to an address"); - } - bind(fd, address.getName().toString()); // JNI - this.address = address; - this.state = State.BOUND; - } - - /** - * Listen for incoming connections on a socket using the specfied backlog. - * The socket must be bound but not already listening. - * - * @throws IOException if the listen failed. - */ - public synchronized void listen(int backlog) throws IOException { - if (backlog < 1) { - throw new IllegalArgumentException("backlog=" + backlog); - } - checkNotClosed(); - if (address == null) { - throw new SocketException("socket has no address bound"); - } - if (state == State.LISTENING) { - throw new SocketException("socket is already listening"); - } - listen(fd, backlog); // JNI - this.state = State.LISTENING; - } - - /** - * Blocks until a connection is made to this socket and accepts it, returning - * a new socket connected to the client. - * - * @return the new socket connected to the client. - * @throws IOException if an error occurs when waiting for a connection. - * @throws SocketTimeoutException if a timeout was previously set with - * setSoTimeout and the timeout has been reached. - * @throws InterruptedIOException if the thread is interrupted when the - * method is blocked. - */ - public synchronized Socket accept() - throws IOException, SocketTimeoutException, InterruptedIOException { - if (state != State.LISTENING) { - throw new SocketException("socket is not in listening state"); - } - - // Throws a SocketTimeoutException if timeout. - if (soTimeoutMillis != 0) { - poll(fd, soTimeoutMillis); // JNI - } - - FileDescriptor clientFd = new FileDescriptor(); - accept(fd, clientFd); // JNI - final LocalSocketImpl impl = new LocalSocketImpl(clientFd); - return new Socket(impl) { - @Override - public boolean isConnected() { - return true; - } - @Override - public synchronized void close() throws IOException { - if (isClosed()) { - return; - } else { - super.close(); - // Workaround for the fact that super.created==false because we - // created the impl ourselves. As a result, super.close() doesn't - // call impl.close(). *Sigh*, java.net is horrendous. - // (Perhaps we should dispense with Socket/SocketImpl altogether?) - impl.close(); - } - } - }; - } - - @Override - public String toString() { - return "LocalServerSocket(" + address + ")"; - } -} diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java deleted file mode 100644 index d51f2bad36..0000000000 --- a/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.unix; - -import com.google.devtools.build.lib.UnixJniLoader; -import java.io.Closeable; -import java.io.FileDescriptor; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.SocketException; -import java.net.SocketTimeoutException; - -/** - * Abstract superclass for client and server local sockets. - */ -abstract class LocalSocket implements Closeable { - - protected enum State { - NEW, - BOUND, // server only - LISTENING, // server only - CONNECTED, // client only - CLOSED, - } - - protected LocalSocketAddress address = null; - protected FileDescriptor fd = new FileDescriptor(); - protected State state; - protected boolean inputShutdown = false; - protected boolean outputShutdown = false; - - /** - * Constructs an unconnected local socket. - */ - protected LocalSocket() throws IOException { - socket(fd); - if (!fd.valid()) { - throw new IOException("Couldn't create socket!"); - } - this.state = State.NEW; - } - - /** - * Returns the address of the endpoint this socket is bound to. - * - * @return a <code>SocketAddress</code> representing the local endpoint of - * this socket. - */ - public LocalSocketAddress getLocalSocketAddress() { - return address; - } - - /** - * Closes this socket. This operation is idempotent. - * - * To be consistent with Java Socket, the shutdown states of the socket are - * not changed. This makes it easier to port applications between Socket and - * LocalSocket. - * - * @throws IOException if an I/O error occurred when closing the socket. - */ - @Override - public synchronized void close() throws IOException { - if (state == State.CLOSED) { - return; - } - // Closes the file descriptor if it has not been closed by the - // input/output streams. - if (!fd.valid()) { - throw new IllegalStateException("LocalSocket.close(-1)"); - } - close(fd); - if (fd.valid()) { - throw new IllegalStateException("LocalSocket.close() did not set fd to -1"); - } - this.state = State.CLOSED; - } - - /** - * Returns the closed state of the ServerSocket. - * - * @return true if the socket has been closed - */ - public synchronized boolean isClosed() { - // If the file descriptor has been closed by the input/output - // streams, marks the socket as closed too. - return state == State.CLOSED; - } - - /** - * Returns the connected state of the ClientSocket. - * - * @return true if the socket is currently connected. - */ - public synchronized boolean isConnected() { - return state == State.CONNECTED; - } - - protected synchronized void checkConnected() throws SocketException { - if (!isConnected()) { - throw new SocketException("Transport endpoint is not connected"); - } - } - - protected synchronized void checkNotClosed() throws SocketException { - if (isClosed()) { - throw new SocketException("socket is closed"); - } - } - - /** - * Returns the shutdown state of the input channel. - * - * @return true is the input channel of the socket is shutdown. - */ - public synchronized boolean isInputShutdown() { - return inputShutdown; - } - - /** - * Returns the shutdown state of the output channel. - * - * @return true is the input channel of the socket is shutdown. - */ - public synchronized boolean isOutputShutdown() { - return outputShutdown; - } - - protected synchronized void checkInputNotShutdown() throws SocketException { - if (isInputShutdown()) { - throw new SocketException("Socket input is shutdown"); - } - } - - protected synchronized void checkOutputNotShutdown() throws SocketException { - if (isOutputShutdown()) { - throw new SocketException("Socket output is shutdown"); - } - } - - static final int SHUT_RD = 0; // Mapped to BSD SHUT_RD in JNI. - static final int SHUT_WR = 1; // Mapped to BSD SHUT_WR in JNI. - - public synchronized void shutdownInput() throws IOException { - checkNotClosed(); - checkConnected(); - checkInputNotShutdown(); - inputShutdown = true; - shutdown(fd, SHUT_RD); - } - - public synchronized void shutdownOutput() throws IOException { - checkNotClosed(); - checkConnected(); - checkOutputNotShutdown(); - outputShutdown = true; - shutdown(fd, SHUT_WR); - } - - //////////////////////////////////////////////////////////////////////// - // JNI: - - static { - UnixJniLoader.loadJni(); - } - - // The native calls below are thin wrappers around linux system calls. The - // semantics remains the same except for poll(). See the comments for the - // method. - // - // Note: FileDescriptor is a box for a mutable integer that is visible only - // to native code. - - // Generic operations: - protected static native void socket(FileDescriptor server) - throws IOException; - static native void close(FileDescriptor server) - throws IOException; - /** - * Shut down part of a full-duplex connection - * @param code Must be either SHUT_RD or SHUT_WR - */ - static native void shutdown(FileDescriptor fd, int code) - throws IOException; - - /** - * This method checks waits for the given file descriptor to become available for read. - * If timeoutMillis passed and there is no activity, a SocketTimeoutException will be thrown. - */ - protected static native void poll(FileDescriptor read, long timeoutMillis) - throws IOException, SocketTimeoutException, InterruptedIOException; - - // Server operations: - protected static native void bind(FileDescriptor server, String filename) - throws IOException; - protected static native void listen(FileDescriptor server, int backlog) - throws IOException; - protected static native void accept(FileDescriptor server, - FileDescriptor client) - throws IOException; - - // Client operations: - protected static native void connect(FileDescriptor client, String filename) - throws IOException; -} diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java deleted file mode 100644 index f9b9d43f06..0000000000 --- a/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.unix; - -import java.io.File; -import java.net.SocketAddress; - -/** - * An implementation of SocketAddress for naming local sockets, i.e. files in - * the UNIX file system. - */ -public class LocalSocketAddress extends SocketAddress { - - private final File name; - - /** - * Constructs a SocketAddress for the specified file. - */ - public LocalSocketAddress(File name) { - this.name = name; - } - - /** - * Returns the filename of this local socket address. - */ - public File getName() { - return name; - } - - @Override - public String toString() { - return name.toString(); - } - - @Override - public boolean equals(Object other) { - return other instanceof LocalSocketAddress && - ((LocalSocketAddress) other).name.equals(this.name); - } - - @Override - public int hashCode() { - return name.hashCode(); - } -} diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java deleted file mode 100644 index a30b450dc1..0000000000 --- a/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.unix; - -import com.google.devtools.build.lib.UnixJniLoader; -import com.google.devtools.build.lib.util.OS; - -import java.io.Closeable; -import java.io.FileDescriptor; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.SocketAddress; -import java.net.SocketImpl; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * A simple implementation of SocketImpl for sockets that wrap a UNIX - * file-descriptor. This SocketImpl assumes that the socket is already - * created, bound, connected and supports no socket options or out-of-band - * features. This is used to implement server-side accepted client sockets - * (i.e. those returned by {@link LocalServerSocket#accept}). - */ -class LocalSocketImpl extends SocketImpl { - private static final Logger logger = - Logger.getLogger(LocalSocketImpl.class.getName()); - - static { - UnixJniLoader.loadJni(); - if (OS.getCurrent() != OS.WINDOWS) { - init(); - } - } - - // The logic here is a little twisted, to support JDK7 and JDK8. - - // 1) In JDK7, the FileDescriptor class keeps a reference count of - // instances using the fd, and closes it when it goes to 0. The - // reference count is only decremented by the finalizer for a - // given class. When the call to close() happens, the fd is - // closed regardless of the current state of the refcount. - // - // 2) In JDK8, every instance that uses the fd registers a Closeable - // with the FileDescriptor. Since the FileDescriptor has a - // reference to every user, only when all of the users and the - // FileDescriptor get GC'd does the finalizer run. An explicit - // call to close() calls FileDescriptor.closeAll(), which - // force-closes all of the users. - - // So, in our case: - - // 1) ref() increments the refcount in JDK7, and registers with the - // FD in JDK8. - - // 2) unref() decrements the refcount in JDK7, and does nothing in - // JDK8. - - // 3) The finalizer decrements the refcount in JDK7, and simply - // calls close() in JDK8 (where we don't have to worry about - // multiple live users of the FD). The close() method itself is - // idempotent. - - // 4) close() calls fd.closeAll in JDK8, which, in turn, calls - // closer.close(). In JDK7, close() calls closer.close() - // explicitly. - private static native void init(); - private static native void ref(FileDescriptor fd, Closeable closeable); - private static native boolean unref(FileDescriptor fd); - private static native boolean close0(FileDescriptor fd, Closeable closeable); - - private final boolean isInitialized; - private final Closeable closer = new Closeable() { - AtomicBoolean isClosed = new AtomicBoolean(false); - @Override public void close() throws IOException { - if (isClosed.compareAndSet(false, true)) { - LocalSocket.close(fd); - } - } - }; - - // Note to callers: if you pass a FD into this constructor, this - // instance is now responsible for closing it (in the sense of - // LocalSocket.close()). If some other instance tries to close it, - // then terrible things will happen. - LocalSocketImpl(FileDescriptor fd) { - this.fd = fd; // (inherited field) - ref(fd, closer); - isInitialized = true; - } - - @Override protected void finalize() { - try { - if (isInitialized) { - if (!unref(fd)) { - // JDK8 codepath - close0(fd, closer); - } - } - } catch (Exception e) { - logger.log(Level.WARNING, "Unable to access FileDescriptor class - " + - "may cause a file descriptor leak", e); - } - } - @Override protected InputStream getInputStream() { - return new FileInputStream(getFileDescriptor()); - } - @Override protected OutputStream getOutputStream() { - return new FileOutputStream(getFileDescriptor()); - } - @Override protected void close() throws IOException { - if (fd.valid()) { - if (!close0(fd, closer)) { - // JDK7 codepath - closer.close(); - } - } - } - - // Unused: - @Override - public void setOption(int optID, Object value) { - throw new UnsupportedOperationException("setOption"); - } - @Override - public Object getOption(int optID) { - throw new UnsupportedOperationException("getOption"); - } - @Override protected void create(boolean stream) { - throw new UnsupportedOperationException("create"); - } - @Override protected void connect(String host, int port) { - throw new UnsupportedOperationException("connect"); - } - @Override protected void connect(InetAddress address, int port) { - throw new UnsupportedOperationException("connect2"); - } - @Override protected void connect(SocketAddress address, int timeout) { - throw new UnsupportedOperationException("connect3"); - } - @Override protected void bind(InetAddress host, int port) { - throw new UnsupportedOperationException("bind"); - } - @Override protected void listen(int backlog) { - throw new UnsupportedOperationException("listen"); - } - @Override protected void accept(SocketImpl s) { - throw new UnsupportedOperationException("accept"); - } - @Override protected int available() { - throw new UnsupportedOperationException("available"); - } - @Override protected void sendUrgentData(int i) { - throw new UnsupportedOperationException("sendUrgentData"); - } -} diff --git a/src/main/native/BUILD b/src/main/native/BUILD index eed0d7edc6..02729148e8 100644 --- a/src/main/native/BUILD +++ b/src/main/native/BUILD @@ -36,7 +36,6 @@ filegroup( cc_binary( name = "libunix.so", srcs = [ - "localsocket.cc", "macros.h", "process.cc", "unix_jni.cc", diff --git a/src/main/native/localsocket.cc b/src/main/native/localsocket.cc deleted file mode 100644 index 3731619e52..0000000000 --- a/src/main/native/localsocket.cc +++ /dev/null @@ -1,312 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include <jni.h> - -#include <errno.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> -#include <poll.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/un.h> - -#include <string> - -#include "src/main/native/unix_jni.h" - -// Returns the field ID for FileDescriptor.fd. -static jfieldID GetFileDescriptorField(JNIEnv *env) { - // See http://java.sun.com/docs/books/jni/html/fldmeth.html#26855 - static jclass fd_class = NULL; - if (fd_class == NULL) { /* note: harmless race condition */ - jclass local = env->FindClass("java/io/FileDescriptor"); - CHECK(local != NULL); - fd_class = static_cast<jclass>(env->NewGlobalRef(local)); - } - static jfieldID fieldId = NULL; - if (fieldId == NULL) { /* note: harmless race condition */ - fieldId = env->GetFieldID(fd_class, "fd", "I"); - CHECK(fieldId != NULL); - } - return fieldId; -} - -// Returns the UNIX filedescriptor from a java.io.FileDescriptor instance. -static jint GetUnixFileDescriptor(JNIEnv *env, jobject fd_obj) { - return env->GetIntField(fd_obj, GetFileDescriptorField(env)); -} - -// Sets the UNIX filedescriptor of a java.io.FileDescriptor instance. -static void SetUnixFileDescriptor(JNIEnv *env, jobject fd_obj, jint fd) { - env->SetIntField(fd_obj, GetFileDescriptorField(env), fd); -} - -/* - * Class: com.google.devtools.build.lib.unix.LocalSocket - * Method: socket - * Signature: (Ljava/io/FileDescriptor;)V - */ -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocket_socket(JNIEnv *env, - jclass clazz, - jobject fd_sock) { - int sock; - if ((sock = ::socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { - ::PostException(env, errno, ::ErrorMessage(errno)); - return; - } - SetUnixFileDescriptor(env, fd_sock, sock); -} - -// Initialize "addr" from "name_chars", reporting error and returning -// false on failure. -static bool InitializeSockaddr(JNIEnv *env, - struct sockaddr_un *addr, - const char* name_chars) { - memset(addr, 0, sizeof *addr); - addr->sun_family = AF_UNIX; - // Note: UNIX_PATH_MAX is quite small! - if (strlen(name_chars) >= sizeof(addr->sun_path)) { - ::PostFileException(env, ENAMETOOLONG, name_chars); - return false; - } - strcpy((char*) &addr->sun_path, name_chars); - return true; -} - -/* - * Class: com.google.devtools.build.lib.unix.LocalSocket - * Method: bind - * Signature: (Ljava/io/FileDescriptor;Ljava/lang/String;)V - */ -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocket_bind(JNIEnv *env, - jclass clazz, - jobject fd_svr, - jstring name) { - int svr_sock = GetUnixFileDescriptor(env, fd_svr); - const char* name_chars = env->GetStringUTFChars(name, NULL); - struct sockaddr_un addr; - if (InitializeSockaddr(env, &addr, name_chars) && - ::bind(svr_sock, (struct sockaddr *) &addr, sizeof addr) < 0) { - ::PostException(env, errno, ::ErrorMessage(errno)); - } - env->ReleaseStringUTFChars(name, name_chars); -} - -/* - * Class: com.google.devtools.build.lib.unix.LocalSocket - * Method: listen - * Signature: (Ljava/io/FileDescriptor;I)V - */ -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocket_listen(JNIEnv *env, - jclass clazz, - jobject fd_svr, - jint backlog) { - int svr_sock = GetUnixFileDescriptor(env, fd_svr); - if (::listen(svr_sock, backlog) < 0) { - ::PostException(env, errno, ::ErrorMessage(errno)); - } -} - -/* - * Class: com.google.devtools.build.lib.unix.LocalSocket - * Method: select - * Signature: (L[java/io/FileDescriptor;[java/io/FileDescriptor;[java/io/FileDescriptor;J)Ljava/io/FileDescriptor - */ -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocket_poll(JNIEnv *env, - jclass clazz, - jobject rfds_svr, - jlong timeoutMillis) { - // TODO(bazel-team): Handle Unix signals, namely SIGTERM. - - // Copy Java FD into pollfd - pollfd pollfd; - pollfd.fd = GetUnixFileDescriptor(env, rfds_svr); - pollfd.events = POLLIN; - pollfd.revents = 0; - - int count = poll(&pollfd, 1, timeoutMillis); - if (count == 0) { - // throws a timeout exception. - ::PostException(env, ETIMEDOUT, ::ErrorMessage(ETIMEDOUT)); - } else if (count < 0) { - ::PostException(env, errno, ::ErrorMessage(errno)); - } -} - -/* - * Class: com.google.devtools.build.lib.unix.LocalSocket - * Method: accept - * Signature: (Ljava/io/FileDescriptor;Ljava/io/FileDescriptor;)V - */ -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocket_accept(JNIEnv *env, - jclass clazz, - jobject fd_svr, - jobject fd_cli) { - int svr_sock = GetUnixFileDescriptor(env, fd_svr); - int cli_sock; - if ((cli_sock = ::accept(svr_sock, NULL, NULL)) < 0) { - ::PostException(env, errno, ::ErrorMessage(errno)); - return; - } - SetUnixFileDescriptor(env, fd_cli, cli_sock); -} - -/* - * Class: com.google.devtools.build.lib.unix.LocalSocket - * Method: close - * Signature: (Ljava/io/FileDescriptor;)V - */ -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocket_close(JNIEnv *env, - jclass clazz, - jobject fd_svr) { - int svr_sock = GetUnixFileDescriptor(env, fd_svr); - if (::close(svr_sock) < 0) { - ::PostException(env, errno, ::ErrorMessage(errno)); - } - SetUnixFileDescriptor(env, fd_svr, -1); -} - -/* - * Class: com.google.devtools.build.lib.unix.LocalSocket - * Method: connect - * Signature: (Ljava/io/FileDescriptor;Ljava/lang/String;)V - */ -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocket_connect(JNIEnv *env, - jclass clazz, - jobject fd_cli, - jstring name) { - const char* name_chars = env->GetStringUTFChars(name, NULL); - jint cli_sock = GetUnixFileDescriptor(env, fd_cli); - if (cli_sock == -1) { - ::PostFileException(env, ENOTSOCK, name_chars); - } else { - struct sockaddr_un addr; - if (InitializeSockaddr(env, &addr, name_chars)) { - if (::connect(cli_sock, (struct sockaddr *) &addr, sizeof addr) < 0) { - ::PostException(env, errno, ::ErrorMessage(errno)); - } - } - } - env->ReleaseStringUTFChars(name, name_chars); -} - -/* - * Class: com.google.devtools.build.lib.unix.LocalSocket - * Method: shutdown() - * Signature: (Ljava/io/FileDescriptor;I)V - * Parameters: code: 0 to shutdown input and 1 to shutdown output. - */ -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocket_shutdown(JNIEnv *env, - jclass clazz, - jobject fd, - jint code) { - int action; - if (code == 0) { - action = SHUT_RD; - } else { - CHECK(code == 1); - action = SHUT_WR; - } - - int sock = GetUnixFileDescriptor(env, fd); - if (shutdown(sock, action) < 0) { - ::PostException(env, errno, ::ErrorMessage(errno)); - } -} - -// TODO(bazel-team): These methods were removed in JDK8, so they -// can be removed when we are no longer using JDK7. See note in -// LocalSocketImpl. -static jmethodID increment_use_count_; -static jmethodID decrement_use_count_; - -// >=JDK8 -static jmethodID fd_attach_; -static jmethodID fd_close_all_; - -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocketImpl_init(JNIEnv *env, jclass ignored) { - jclass cls = env->FindClass("java/io/FileDescriptor"); - if (cls == NULL) { - cls = env->FindClass("java/lang/NoClassDefFoundError"); - env->ThrowNew(cls, "FileDescriptor class not found"); - return; - } - - // JDK7 - increment_use_count_ = - env->GetMethodID(cls, "incrementAndGetUseCount", "()I"); - if (increment_use_count_ != NULL) { - decrement_use_count_ = - env->GetMethodID(cls, "decrementAndGetUseCount", "()I"); - } else { - // JDK8 - env->ExceptionClear(); // The pending exception from increment_use_count_ - - fd_attach_ = env->GetMethodID(cls, "attach", "(Ljava/io/Closeable;)V"); - fd_close_all_ = env->GetMethodID(cls, "closeAll", "(Ljava/io/Closeable;)V"); - - if (fd_attach_ == NULL || fd_close_all_ == NULL) { - cls = env->FindClass("java/lang/NoSuchMethodError"); - env->ThrowNew(cls, "FileDescriptor methods not found"); - return; - } - } -} - -extern "C" JNIEXPORT void JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocketImpl_ref(JNIEnv *env, jclass clazz, - jobject fd, jobject closer) { - if (increment_use_count_ != NULL) { - env->CallIntMethod(fd, increment_use_count_); - } - - if (fd_attach_ != NULL) { - env->CallVoidMethod(fd, fd_attach_, closer); - } -} - -extern "C" JNIEXPORT jboolean JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocketImpl_unref(JNIEnv *env, jclass clazz, - jobject fd) { - if (decrement_use_count_ != NULL) { - env->CallIntMethod(fd, decrement_use_count_); - return true; - } - return false; -} - -extern "C" JNIEXPORT jboolean JNICALL -Java_com_google_devtools_build_lib_unix_LocalSocketImpl_close0(JNIEnv *env, jclass clazz, - jobject fd, - jobject closeable) { - if (fd_close_all_ != NULL) { - env->CallVoidMethod(fd, fd_close_all_, closeable); - return true; - } - // This will happen if fd_close_all_ is NULL, which means we are running in - // <=JDK7, which means that the caller needs to invoke close() explicitly. - return false; -} diff --git a/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java deleted file mode 100644 index 58948e1720..0000000000 --- a/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2015 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.server; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; -import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.ShutdownMethod; -import com.google.devtools.build.lib.testutil.Suite; -import com.google.devtools.build.lib.testutil.TestSpec; -import com.google.devtools.build.lib.util.JavaClock; -import com.google.devtools.build.lib.util.io.OutErr; -import com.google.devtools.build.lib.util.io.RecordingOutErr; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.Path; -import com.google.devtools.build.lib.vfs.util.FsApparatus; -import java.io.File; -import java.util.List; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Run a real RPC server on localhost, and talk to it using the testing - * client. - */ -@TestSpec(size = Suite.MEDIUM_TESTS) -@RunWith(JUnit4.class) -public class AfUnixServerTest { - - private static final long MAX_IDLE_MILLIS = 10000; - private static final long HEALTH_CHECK_MILLIS = 1000 * 3; - private static final String COMMAND_STDOUT = "Heelllloo...."; - private static final String COMMAND_STDERR = "...world!"; - - private AfUnixServer server; - private FsApparatus scratch = FsApparatus.newNative(); - private RecordingOutErr outErr = new RecordingOutErr(); - private Path serverDir; - private Path workspaceDir; - private RPCTestingClient client; - private Thread serverThread = new Thread(){ - @Override - public void run() { - server.serve(); - } - }; - - private static final ServerCommand helloWorldCommand = new ServerCommand() { - @Override - public int exec(List<String> args, OutErr outErr, LockingMode lockingMode, - String clientDescription, long firstContactTime) { - outErr.printOut(COMMAND_STDOUT); - outErr.printErr(COMMAND_STDERR); - return 42; - } - - @Override - public ShutdownMethod shutdown() { - return ShutdownMethod.NONE; - } - }; - - @Before - public final void startServer() throws Exception { - // Do not use `createUnixTempDir()` here since the file name that results is longer - // than 108 characters, so cannot be used as local socket address. - File file = File.createTempFile("scratch", ".tmp", new File("/tmp")); - file.delete(); - file.mkdir(); - serverDir = this.scratch.dir(file.getAbsolutePath()); - - workspaceDir = this.scratch.createUnixTempDir(); - workspaceDir.createDirectory(); - client = new RPCTestingClient( - outErr, serverDir.getRelative("server.socket")); - RPCService service = new RPCService(helloWorldCommand); - server = new AfUnixServer(new JavaClock(), service, MAX_IDLE_MILLIS, HEALTH_CHECK_MILLIS, - serverDir, workspaceDir); - serverThread.start(); - } - - @After - public final void stopServer() throws Exception { - serverThread.interrupt(); - serverThread.join(); - - FileSystemUtils.deleteTree(serverDir); - } - - private void runTestRequest(String request, int ret, String out, String err) throws Exception { - assertEquals(ret, client.sendRequest(request)); - assertEquals(out, outErr.outAsLatin1()); - assertThat(outErr.errAsLatin1()).contains(err); - } - - @Test - public void testUnknownCommand() throws Exception { - runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); - } - - @Test - public void testEmptyBlazeCommand() throws Exception { - runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); - } - - @Test - public void testWorkspaceDies() throws Exception { - assertTrue(serverThread.isAlive()); - runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR); - Thread.sleep(HEALTH_CHECK_MILLIS * 2); - assertTrue(serverThread.isAlive()); - - assertTrue(workspaceDir.delete()); - Thread.sleep(HEALTH_CHECK_MILLIS * 2); - assertFalse(serverThread.isAlive()); - } -} diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java deleted file mode 100644 index 3b1515d2e2..0000000000 --- a/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2015 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.server; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.google.common.io.ByteStreams; -import com.google.devtools.build.lib.unix.LocalClientSocket; -import com.google.devtools.build.lib.unix.LocalSocketAddress; -import com.google.devtools.build.lib.util.io.RecordingOutErr; -import com.google.devtools.build.lib.util.io.StreamDemultiplexer; -import com.google.devtools.build.lib.vfs.Path; -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; - -/** - * A client to test RPCServer. - */ -public class RPCTestingClient { - - private final RecordingOutErr outErr; - private final Path socketFile; - - /** - * Create a client to RPCServer. {@code socketFile} must be a file - * on disk; this will not work with the in-memory file system. - */ - public RPCTestingClient(RecordingOutErr outErr, Path socketFile) { - this.socketFile = socketFile; - this.outErr = outErr; - } - - public int sendRequest(String command, String... params) - throws Exception { - String request = command; - for (String param : params) { - request += "\0" + param; - } - return sendRequest(request); - } - - public int sendRequest(String request) throws Exception { - LocalClientSocket connection = new LocalClientSocket(); - connection.connect(new LocalSocketAddress(socketFile.getPathFile())); - try { - OutputStream out = connection.getOutputStream(); - byte[] requestBytes = request.getBytes(UTF_8); - byte[] requestLength = new byte[4]; - requestLength[0] = (byte) (requestBytes.length << 24); - requestLength[1] = (byte) ((requestBytes.length << 16) & 0xff); - requestLength[2] = (byte) ((requestBytes.length << 8) & 0xff); - requestLength[3] = (byte) (requestBytes.length & 0xff); - out.write(requestLength); - out.write(requestBytes); - out.flush(); - connection.shutdownOutput(); - - OutputStream stdout = outErr.getOutputStream(); - OutputStream stderr = outErr.getErrorStream(); - ByteArrayOutputStream control = new ByteArrayOutputStream(); - StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, - stdout, stderr, control); - ByteStreams.copy(connection.getInputStream(), demux); - demux.flush(); - - byte[] controlBytes = control.toByteArray(); - return (((int) controlBytes[0]) << 24) - + (((int) controlBytes[1]) << 16) - + (((int) controlBytes[2]) << 8) - + ((int) controlBytes[3]); - } finally { - connection.close(); - } - } - -} |