diff options
author | 2016-08-11 20:46:20 +0000 | |
---|---|---|
committer | 2016-08-12 08:52:58 +0000 | |
commit | bab0d481dea8be7568cd593460c26111bf302175 (patch) | |
tree | 30b454482bcded54cf216b30fa40a98d56fa679a | |
parent | 7e33704e7546bb676e9052089c30f1dd625fd082 (diff) |
Rollback of commit f107debac45ddf5859b1eb963379769b5815b18f. Also includes the logical rollback of commit 67ad82a319ff8959e69e774e7c15d3af904ec23d.
RELNOTES[INC]: Bazel supports Unix domain sockets for communication between its client and server again, temporarily, while we diagnose a memory leak.
--
MOS_MIGRATED_REVID=130027009
15 files changed, 2217 insertions, 78 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc index 116efd5314..7fb3511a0a 100644 --- a/src/main/cpp/blaze.cc +++ b/src/main/cpp/blaze.cc @@ -241,6 +241,22 @@ uint64_t BlazeServer::AcquireLock() { globals->options.block_for_lock, &blaze_lock_); } +// Communication method that uses an AF_UNIX socket and a custom protocol. +class AfUnixBlazeServer : public BlazeServer { + public: + AfUnixBlazeServer(); + virtual ~AfUnixBlazeServer() {} + + virtual bool Connect(); + virtual void Disconnect(); + virtual unsigned int Communicate(); + virtual void KillRunningServer(); + virtual void Cancel(); + + private: + int server_socket_; +}; + // Communication method that uses gRPC on a socket bound to localhost. More // documentation is in command_server.proto . class GrpcBlazeServer : public BlazeServer { @@ -695,6 +711,242 @@ static void StartStandalone(BlazeServer* server) { pdie(blaze_exit_code::INTERNAL_ERROR, "execv of '%s' failed", exe.c_str()); } +AfUnixBlazeServer::AfUnixBlazeServer() { + server_socket_ = -1; + connected_ = false; +} + +bool AfUnixBlazeServer::Connect() { + assert(!connected_); + + if (server_socket_ == -1) { + server_socket_ = socket(PF_UNIX, SOCK_STREAM, 0); + if (server_socket_ == -1) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "can't create AF_UNIX socket"); + } + + if (fcntl(server_socket_, F_SETFD, FD_CLOEXEC) == -1) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "fcntl(F_SETFD, FD_CLOEXEC) failed"); + } + } + + struct sockaddr_un addr; + addr.sun_family = AF_UNIX; + + string socket_file = globals->options.output_base + "/server/server.socket"; + char *resolved_path = realpath(socket_file.c_str(), NULL); + if (resolved_path != NULL) { + strncpy(addr.sun_path, resolved_path, sizeof addr.sun_path); + addr.sun_path[sizeof addr.sun_path - 1] = '\0'; + free(resolved_path); + sockaddr *paddr = reinterpret_cast<sockaddr *>(&addr); + int result = connect(server_socket_, paddr, sizeof addr); + connected_ = result == 0; + if (connected_) { + string server_dir = globals->options.output_base + "/server"; + globals->server_pid = GetServerPid(server_dir); + if (globals->server_pid <= 0) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "can't get PID of existing server (server dir=%s)", + server_dir.c_str()); + } + } + + return connected_; + } else if (errno == ENOENT) { // No socket means no server to connect to + errno = ECONNREFUSED; + return false; + } else { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "realpath('%s') failed", socket_file.c_str()); + return false; + } +} + +void AfUnixBlazeServer::Disconnect() { + assert(connected_); + + close(server_socket_); + connected_ = false; + server_socket_ = -1; +} + +static int ServerEof() { + // e.g. external SIGKILL of server, misplaced System.exit() in the server, + // or a JVM crash. Print out the jvm.out file in case there's something + // useful. + fprintf(stderr, "Error: unexpected EOF from %s server.\n" + "Contents of '%s':\n", globals->options.product_name.c_str(), + globals->jvm_log_file.c_str()); + WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str()); + return GetExitCodeForAbruptExit(*globals); +} + +// Reads a single char from the specified stream. +static int ReadServerChar(int fd, unsigned char *result) { + if (read(fd, result, 1) != 1) { + return ServerEof(); + } + return 0; +} + +static int ReadServerInt(int fd, unsigned int *result) { + unsigned char buffer[4]; + unsigned char *p = buffer; + int remaining = 4; + + while (remaining > 0) { + int bytes_read = read(fd, p, remaining); + if (bytes_read <= 0) { + return ServerEof(); + } + + remaining -= bytes_read; + p += bytes_read; + } + + *result = (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8) + + buffer[3]; + return 0; +} + +static char server_output_buffer[8192]; + +// Forwards the output of the server to the specified file handle. +static int ForwardServerOutput(int socket, int output) { + unsigned int remaining; + int exit_code = ReadServerInt(socket, &remaining); + if (exit_code != 0) { + return exit_code; + } + while (remaining > 0) { + int bytes = remaining > 8192 ? 8192 : remaining; + bytes = read(socket, server_output_buffer, bytes); + if (bytes <= 0) { + return ServerEof(); + } + + remaining -= bytes; + if (write(output, server_output_buffer, bytes) != bytes) { + // Not much we can do if this doesn't work, just placate the compiler. + } + } + + return 0; +} + +unsigned int AfUnixBlazeServer::Communicate() { + assert(connected_); + + const string request = BuildServerRequest(); + + // Send request (Request is written in a single chunk.) + char request_size[4]; + request_size[0] = (request.size() >> 24) & 0xff; + request_size[1] = (request.size() >> 16) & 0xff; + request_size[2] = (request.size() >> 8) & 0xff; + request_size[3] = (request.size()) & 0xff; + if (write(server_socket_, request_size, 4) != 4) { + pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed"); + } + + if (write(server_socket_, request.data(), request.size()) != request.size()) { + pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed"); + } + + // Wait until we receive some response from the server. + // (We do this by calling select() with a timeout.) + // If we don't receive a response within 3 seconds, print a message, + // so that the user has some idea what is going on. + while (true) { + fd_set fdset; + FD_ZERO(&fdset); + FD_SET(server_socket_, &fdset); + struct timeval timeout; + timeout.tv_sec = 3; + timeout.tv_usec = 0; + int result = select(server_socket_ + 1, &fdset, NULL, &fdset, &timeout); + if (result > 0) { + // Data is ready on socket. Go ahead and read it. + break; + } else if (result == 0) { + // Timeout. Print a message, then go ahead and read from + // the socket (the read will usually block). + fprintf(stderr, + "INFO: Waiting for response from %s server (pid %d)...\n", + globals->options.product_name.c_str(), globals->server_pid); + break; + } else { // result < 0 + // Error. For EINTR we try again, all other errors are fatal. + if (errno != EINTR) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "select() on server socket failed"); + } + } + } + + // Read and demux the response. + const int TAG_STDOUT = 1; + const int TAG_STDERR = 2; + const int TAG_CONTROL = 3; + unsigned int exit_code; + for (;;) { + // Read the tag + unsigned char tag; + exit_code = ReadServerChar(server_socket_, &tag); + if (exit_code != 0) { + return exit_code; + } + + switch (tag) { + // stdout + case TAG_STDOUT: + exit_code = ForwardServerOutput(server_socket_, STDOUT_FILENO); + if (exit_code != 0) { + return exit_code; + } + break; + + // stderr + case TAG_STDERR: + exit_code = ForwardServerOutput(server_socket_, STDERR_FILENO); + if (exit_code != 0) { + return exit_code; + } + break; + + // Control stream. Currently only used for reporting the exit code. + case TAG_CONTROL: + unsigned int length; + exit_code = ReadServerInt(server_socket_, &length); + if (exit_code != 0) { + // We cannot read the length field. The return value of ReadSeverInt() + // is the result of ServerEof(), so we bail out early so that we don't + // call ServerEof() twice. + return exit_code; + } + + if (length != 4) { + return ServerEof(); + } + unsigned int server_exit_code; + exit_code = ReadServerInt(server_socket_, &server_exit_code); + return exit_code != 0 ? exit_code : server_exit_code; + + default: + fprintf(stderr, "bad tag %d\n", tag); + return ServerEof(); + } + } +} + +void AfUnixBlazeServer::Cancel() { + assert(connected_); + kill(globals->server_pid, SIGINT); +} + // Write the contents of file_name to stream. static void WriteFileToStreamOrDie(FILE *stream, const char *file_name) { FILE *fp = fopen(file_name, "r"); @@ -809,6 +1061,55 @@ static void StartServerAndConnect(BlazeServer *server) { socket_file.c_str()); } +// Poll until the given process denoted by pid goes away. Return false if this +// does not occur within wait_time_secs. +static bool WaitForServerDeath(pid_t pid, int wait_time_secs) { + for (int ii = 0; ii < wait_time_secs * 10; ++ii) { + if (kill(pid, 0) == -1) { + if (errno == ESRCH) { + return true; + } + pdie(blaze_exit_code::INTERNAL_ERROR, "could not be killed"); + } + poll(NULL, 0, 100); // sleep 100ms. (usleep(3) is obsolete.) + } + return false; +} + +// Kills the specified running Blaze server. First we send a SIGTERM, and if +// that does not kill the process, a SIGKILL. +void AfUnixBlazeServer::KillRunningServer() { + assert(connected_); + assert(globals->server_pid > 0); + + close(server_socket_); + server_socket_ = -1; + fprintf(stderr, "Sending SIGTERM to previous %s server (pid=%d)... ", + globals->options.product_name.c_str(), globals->server_pid); + fflush(stderr); + kill(globals->server_pid, SIGTERM); + if (WaitForServerDeath(globals->server_pid, 10)) { + fprintf(stderr, "done.\n"); + connected_ = false; + return; + } + + // If the previous attempt did not suceeded, kill the whole group. + fprintf(stderr, + "Sending SIGKILL to previous %s server process group (pid=%d)... ", + globals->options.product_name.c_str(), globals->server_pid); + fflush(stderr); + killpg(globals->server_pid, SIGKILL); + if (WaitForServerDeath(globals->server_pid, 10)) { + fprintf(stderr, "killed.\n"); + connected_ = false; + return; + } + + // Process did not go away 10s after SIGKILL. Stuck in state 'Z' or 'D'? + pdie(blaze_exit_code::INTERNAL_ERROR, "SIGKILL unsuccessful after 10s"); +} + // Calls fsync() on the file (or directory) specified in 'file_path'. // pdie()'s if syncing fails. static void SyncFile(const char *file_path) { @@ -1530,6 +1831,13 @@ int main(int argc, const char *argv[]) { CheckBinaryPath(argv[0]); ParseOptions(argc, argv); +#ifdef __CYGWIN__ + if (globals->options.command_port == -1) { + // AF_UNIX does not work on Windows, so use gRPC instead. + globals->options.command_port = 0; + } +#endif + string error; blaze_exit_code::ExitCode reexec_options_exit_code = globals->options.CheckForReExecuteOptions(argc, argv, &error); @@ -1542,7 +1850,9 @@ int main(int argc, const char *argv[]) { const string self_path = GetSelfPath(); ComputeBaseDirectories(self_path); - blaze_server = static_cast<BlazeServer *>(new GrpcBlazeServer()); + blaze_server = globals->options.command_port >= 0 + ? static_cast<BlazeServer *>(new GrpcBlazeServer()) + : static_cast<BlazeServer *>(new AfUnixBlazeServer()); globals->command_wait_time = blaze_server->AcquireLock(); diff --git a/src/main/cpp/blaze_startup_options_common.cc b/src/main/cpp/blaze_startup_options_common.cc index b3cbaadbe6..449907b4a2 100644 --- a/src/main/cpp/blaze_startup_options_common.cc +++ b/src/main/cpp/blaze_startup_options_common.cc @@ -242,10 +242,10 @@ blaze_exit_code::ExitCode BlazeStartupOptions::ProcessArg( } else if ((value = GetUnaryOption( arg, next_arg, "--command_port")) != NULL) { if (!blaze_util::safe_strto32(value, &command_port) || - command_port < 0 || command_port > 65535) { + command_port < -1 || command_port > 65535) { blaze_util::StringPrintf(error, "Invalid argument to --command_port: '%s'. " - "Must be a valid port number or 0.\n", + "Must be a valid port number or -1 to disable the gRPC server.\n", value); return blaze_exit_code::BAD_ARGV; } diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java index 5a4ebd0875..808759628a 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java @@ -47,6 +47,7 @@ import com.google.devtools.build.lib.query2.output.OutputFormatter; import com.google.devtools.build.lib.rules.test.CoverageReportActionFactory; import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy; +import com.google.devtools.build.lib.server.AfUnixServer; import com.google.devtools.build.lib.server.RPCServer; import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; import com.google.devtools.build.lib.shell.JavaSubprocessFactory; @@ -777,17 +778,23 @@ public final class BlazeRuntime { BlazeServerStartupOptions startupOptions = runtime.getStartupOptionsProvider().getOptions(BlazeServerStartupOptions.class); - try { - // This is necessary so that Bazel kind of works during bootstrapping, at which time the - // gRPC server is not compiled in so that we don't need gRPC for bootstrapping. - Class<?> factoryClass = Class.forName( - "com.google.devtools.build.lib.server.GrpcServerImpl$Factory"); + if (startupOptions.commandPort != -1) { + try { + // This is necessary so that Bazel kind of works during bootstrapping, at which time the + // gRPC server is not compiled in so that we don't need gRPC for bootstrapping. + Class<?> factoryClass = Class.forName( + "com.google.devtools.build.lib.server.GrpcServerImpl$Factory"); RPCServer.Factory factory = (RPCServer.Factory) factoryClass.getConstructor().newInstance(); return factory.create(commandExecutor, runtime.getClock(), startupOptions.commandPort, runtime.getServerDirectory(), startupOptions.maxIdleSeconds); - } catch (ReflectiveOperationException | IllegalArgumentException e) { - throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR); + } catch (ReflectiveOperationException | IllegalArgumentException e) { + throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR); + } + } else { + return AfUnixServer.newServerWith(runtime.getClock(), commandExecutor, + runtime.getServerDirectory(), runtime.workspace.getWorkspace(), + startupOptions.maxIdleSeconds); } } diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java new file mode 100644 index 0000000000..4444d8424e --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java @@ -0,0 +1,558 @@ +// Copyright 2014 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.ShutdownMethod; +import com.google.devtools.build.lib.server.RPCService.UnknownCommandException; +import com.google.devtools.build.lib.unix.LocalClientSocket; +import com.google.devtools.build.lib.unix.LocalServerSocket; +import com.google.devtools.build.lib.unix.LocalSocketAddress; +import com.google.devtools.build.lib.unix.NativePosixFiles; +import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.lib.util.ThreadUtils; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.util.io.StreamMultiplexer; +import com.google.devtools.build.lib.vfs.Path; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** + * An RPCServer server is a Java object that sits and waits for RPC requests + * (the sit-and-wait is implemented in {@link #serve()}). These requests + * arrive via UNIX file sockets. The RPCServer then calls the application + * (which implements ServerCommand) to handle the request. (Since the Blaze + * server may need to stat hundreds of directories during initialization, this + * is a significant speedup.) The server thread will terminate after idling + * for a user-specified time. + * + * Note: If you are contemplating to call into the RPCServer from + * within Java, consider using the {@link RPCService} class instead. + */ +// TODO(bazel-team): Signal handling. +// TODO(bazel-team): Gives clients status information when the server is busy. One +// way to do this is to put the server status in a file (pid, the current +// target, etc) in the server directory. Alternatively, we can have a separate +// thread taking care of the server socket and put the information into socket +// handshakes. +// TODO(bazel-team): Use Reporter for server-side messages. +public final class AfUnixServer extends RPCServer { + private final Clock clock; + private final RPCService rpcService; + private final LocalServerSocket serverSocket; + private final long maxIdleMillis; + private final long statusCheckMillis; + private final Path serverDirectory; + private final Path workspaceDir; + private static final Logger LOG = Logger.getLogger(AfUnixServer.class.getName()); + private volatile boolean lameDuck; + + private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute. + private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0'); + + /** + * Create a new server instance. After creating the server, you can start it + * by calling the {@link #serve()} method. + * + * @param clock The clock to take time measurements + * @param rpcService The underlying service object, which takes + * care of dispatching to the {@link ServerCommand} + * instances, as requests arrive. + * @param maxIdleMillis The maximum time the server will wait idly. + * @param statusCheckPeriodMillis How long to wait between system status checks. + * @param serverDirectory Directory to put file socket and pid files, etc. + * @param workspaceDir The workspace. Used solely to ensure it persists. + * @throws IOException + */ + public AfUnixServer(Clock clock, RPCService rpcService, + long maxIdleMillis, long statusCheckPeriodMillis, + Path serverDirectory, Path workspaceDir) + throws IOException { + super(serverDirectory); + this.clock = clock; + this.rpcService = rpcService; + this.maxIdleMillis = maxIdleMillis; + this.statusCheckMillis = statusCheckPeriodMillis; + this.serverDirectory = serverDirectory; + this.workspaceDir = workspaceDir; + + this.serverSocket = openServerSocket(); + serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis)); + lameDuck = false; + } + + /** + * Create a new server instance. After creating the server, you can start it + * by calling the {@link #serve()} method. + * + * @param clock The clock to take time measurements + * @param rpcService The underlying service object, which takes + * care of dispatching to the {@link ServerCommand} + * instances, as requests arrive. + * @param maxIdleMillis The maximum time the server will wait idly. + * @param serverDirectory Directory to put file socket and pid files, etc. + * @param workspaceDir The workspace. Used solely to ensure it persists. + * @throws IOException + */ + public AfUnixServer(Clock clock, RPCService rpcService, + long maxIdleMillis, Path serverDirectory, Path workspaceDir) + throws IOException { + this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS, + serverDirectory, workspaceDir); + } + + + private final AtomicBoolean inAction = new AtomicBoolean(false); + private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); + private final AtomicLong cmdNum = new AtomicLong(); + private final Thread mainThread = Thread.currentThread(); + private final Object interruptLock = new Object(); + + @Override + public void interrupt() { + // Only interrupt during actions - otherwise we may end up setting the interrupt bit + // at the end of a build and responding to it at the beginning of the subsequent build. + synchronized (interruptLock) { + if (allowingInterrupt.get()) { + mainThread.interrupt(); + } + } + + if (inAction.get()) { + Runnable interruptWatcher = + new Runnable() { + @Override + public void run() { + try { + long originalCmd = cmdNum.get(); + Thread.sleep(10 * 1000); + if (inAction.get() && cmdNum.get() == originalCmd) { + // We're still operating on the same command. + // Interrupt took too long. + ThreadUtils.warnAboutSlowInterrupt(); + } + } catch (InterruptedException e) { + // Ignore. + } + } + }; + Thread interruptWatcherThread = + new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + } + } + + /** + * Wait on a socket for business (answer requests). Note that this + * method won't return until the server shuts down. + */ + @Override + public void serve() { + try { + while (!lameDuck) { + try { + IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir); + idleChecker.idle(); + RequestIo requestIo; + + long startTime = clock.currentTimeMillis(); + while (true) { + try { + allowingInterrupt.set(true); + Socket socket = serverSocket.accept(); + long firstContactTime = clock.currentTimeMillis(); + requestIo = new RequestIo(socket, firstContactTime); + break; + } catch (SocketTimeoutException e) { + long idleTime = clock.currentTimeMillis() - startTime; + if (lameDuck) { + closeServerSocket(); + return; + } else if (idleTime > maxIdleMillis + || (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) { + enterLameDuck(); + } + } + } + idleChecker.busy(); + + + List<String> request = null; + try { + request = extractRequest(requestIo); + cmdNum.incrementAndGet(); + inAction.set(true); + if (request != null) { + executeRequest(request, requestIo); + } + } finally { + inAction.set(false); + // Don't reset interruption unless we executed a request. Otherwise this is just a + // ping from the client verifying our existence, in which case we should retain the + // interrupt status for the subsequent request. + if (request != null) { + synchronized (interruptLock) { + allowingInterrupt.set(false); + Thread.interrupted(); // clears thread interrupted status + } + } + requestIo.shutdown(); + switch (rpcService.getShutdown()) { + case NONE: + break; + + case CLEAN: + return; + + case EXPUNGE: + disableShutdownHooks(); + return; + } + } + } catch (EOFException e) { + LOG.info("Connection to the client lost: " + + e.getMessage()); + } catch (IOException e) { + // Something else happened. Print a stack trace for debugging. + printStack(e); + } + } + } finally { + rpcService.shutdown(ShutdownMethod.CLEAN); + LOG.info("Logging finished"); + } + } + + private void closeServerSocket() { + LOG.info("Closing serverSocket."); + try { + serverSocket.close(); + } catch (IOException e) { + printStack(e); + } + + if (!lameDuck) { + try { + getSocketPath().delete(); + } catch (IOException e) { + printStack(e); + } + } + } + + /** + * Allow one last request to be serviced. + */ + private void enterLameDuck() { + lameDuck = true; + try { + getSocketPath().delete(); + } catch (IOException e) { + e.printStackTrace(); + } + serverSocket.setSoTimeout(1); + } + + /** + * Returns the path of the socket file to be used. + */ + public Path getSocketPath() { + return serverDirectory.getRelative("server.socket"); + } + + /** + * Ensures no other server is running for the current socket file. This + * guarantees that no two servers are running against the same output + * directory. + * + * @throws IOException if another server holds the lock for the socket file. + */ + public static void ensureExclusiveAccess(Path socketFile) throws IOException { + LocalSocketAddress address = + new LocalSocketAddress(socketFile.getPathFile()); + if (socketFile.exists()) { + try { + new LocalClientSocket(address).close(); + } catch (IOException e) { + // The previous server process is dead--unlink the file: + socketFile.delete(); + return; + } + // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message + // and add it to the message. + throw new IOException("Socket file " + socketFile.getPathString() + + " is locked by another server"); + } + } + + /** + * Opens a UNIX local server socket. + * @throws IOException if the socket file is used by another server or can + * not be made exclusive. + */ + private LocalServerSocket openServerSocket() throws IOException { + // This is the "well known" socket path via which the server is found... + Path socketFile = getSocketPath(); + + // ...but it may have a name that's too long for AF_UNIX, in which case we + // make it a symlink to /tmp/something. This typically only happens in + // tests where the --output_base is beneath a very deep temp dir. + // (All this extra complexity is just used in tests... *sigh*). + if (socketFile.toString().length() >= 104) { // = UNIX_PATH_MAX + Path socketLink = socketFile; + String tmpDirDefault = System.getenv("TMPDIR"); + if (tmpDirDefault == null + || tmpDirDefault.length() > 104 - "/blaze-4294967296/server.socket".length()) { + // Default for unset TMPDIR, or if TMPDIR is so that the resulting + // path would be too long. + tmpDirDefault = "/tmp"; + } + String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", tmpDirDefault); + socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)). + getRelative("server.socket"); + LOG.info("Using symlinked socket at " + socketFile); + + socketLink.delete(); // Remove stale symlink, if any. + socketLink.createSymbolicLink(socketFile); + + deleteAtExit(socketLink, /*deleteParent=*/false); + deleteAtExit(socketFile, /*deleteParent=*/true); + } else { + deleteAtExit(socketFile, /*deleteParent=*/false); + } + + ensureExclusiveAccess(socketFile); + + + LocalServerSocket serverSocket = new LocalServerSocket(); + serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile())); + NativePosixFiles.chmod(socketFile.getPathFile(), 0600); // Lock it down. + serverSocket.listen(/*backlog=*/50); + return serverSocket; + } + + // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a + // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it + // succeeds. + private static Path createTempSocketDirectory(Path tempDir) { + Random random = new Random(); + while (true) { + Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt())); + try { + if (socketDir.createDirectory()) { + // Make sure it's private; unfortunately, createDirectory() doesn't take a mode + // argument. + socketDir.chmod(0700); + return socketDir; // Created. + } + // Already existed; try again. + } catch (IOException e) { + // Failed; try again. + } + } + } + + /** + * Read a string in platform default encoding and split it into a list of + * NUL-separated words. + * + * <p>Blaze consistently uses the platform default encoding (defined in + * blaze.cc) to interface with Unix APIs. + */ + private static List<String> readRequest(InputStream input) throws IOException { + byte[] sizeBuffer = new byte[4]; + ByteStreams.readFully(input, sizeBuffer); + int size = ((sizeBuffer[0] & 0xff) << 24) + + ((sizeBuffer[1] & 0xff) << 16) + + ((sizeBuffer[2] & 0xff) << 8) + + (sizeBuffer[3] & 0xff); + byte[] inputBytes = new byte[size]; + ByteStreams.readFully(input, inputBytes); + + String s = new String(inputBytes, Charset.defaultCharset()); + return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s)); + } + + private static List<String> extractRequest(RequestIo requestIo) throws IOException { + List<String> request = readRequest(requestIo.in); + if (request == null) { + LOG.info("Short-circuiting empty request"); + return null; + } + return request; + } + + private void executeRequest(List<String> request, RequestIo requestIo) { + Preconditions.checkNotNull(request); + int exitStatus = 2; + try { + exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr, + requestIo.firstContactTime); + LOG.info("Finished executing request"); + } catch (UnknownCommandException e) { + requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage()); + LOG.severe("SERVER ERROR: " + e.getMessage()); + } catch (Exception e) { + // Stacktrace for unknown exception. + StringWriter trace = new StringWriter(); + e.printStackTrace(new PrintWriter(trace, true)); + requestIo.requestOutErr.printErr("SERVER ERROR: " + trace); + LOG.severe("SERVER ERROR: " + trace); + } + + if (rpcService.getShutdown() != ShutdownMethod.NONE) { + // In case of shutdown, disable the listening socket *before* we write + // the last part of the response. Otherwise, a sufficiently fast client + // could read the response and exit, and a new client could make a + // connection to this server, which is still in the listening state, even + // though it is about to shut down imminently. + closeServerSocket(); + } + + requestIo.writeExitStatus(exitStatus); + } + + /** + * Because it's a little complicated, this class factors out all the IO Hook + * up we need per request, that is, in + * {@link AfUnixServer#executeRequest(List, RequestIo)}. + * It's unfortunately complicated, so it's explained here. + */ + private static class RequestIo { + + // Used by the client code + private final InputStream in; + private final OutErr requestOutErr; + private final OutputStream controlChannel; + + // just used by this class to keep the state around + private final Socket requestSocket; + private final OutputStream requestOut; + private final long firstContactTime; + + RequestIo(Socket requestSocket, long firstContactTime) throws IOException { + this.requestSocket = requestSocket; + this.firstContactTime = firstContactTime; + this.in = requestSocket.getInputStream(); + this.requestOut = requestSocket.getOutputStream(); + + // We encode the response sent to the client with a multiplexer so + // we can send three streams (out / err / control) over one wire stream + // (requestOut). + StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut); + + // We'll be writing control messages (exit code + out of date message) + // to this control channel. + controlChannel = multiplexer.createControl(); + + // This is the outErr part of the multiplexed output. + requestOutErr = OutErr.create(multiplexer.createStdout(), + multiplexer.createStderr()); + // We hook up System.out / System.err to our IO object. Stuff written to + // System.out / System.err will show up on the user's screen, prefixed + // with "System.out "/"System.err ". + requestOutErr.addSystemOutErrAsSource(); + } + + public void writeExitStatus(int exitStatus) { + // Make sure to flush the output / error streams prior to writing the exit status. + // The client may stop reading that direction of the socket immediately upon reading the + // exit code. + flushOutErr(); + try { + controlChannel.write((exitStatus >> 24) & 0xff); + controlChannel.write((exitStatus >> 16) & 0xff); + controlChannel.write((exitStatus >> 8) & 0xff); + controlChannel.write(exitStatus & 0xff); + controlChannel.flush(); + LOG.info("" + exitStatus); + } catch (IOException ignored) { + // This exception is historically ignored. + } + } + + private void flushOutErr() { + try { + requestOutErr.getOutputStream().flush(); + } catch (IOException e) { + printStack(e); + } + try { + requestOutErr.getErrorStream().flush(); + } catch (IOException e) { + printStack(e); + } + } + + public void shutdown() { + try { + requestOut.close(); + } catch (IOException e) { + printStack(e); + } + try { + in.close(); + } catch (IOException e) { + printStack(e); + } + try { + requestSocket.close(); + } catch (IOException e) { + printStack(e); + } + } + } + + /** + * Creates and returns a new RPC server. + * Use {@link AfUnixServer#serve()} to start the server. + * + * @param appCommand The application's ServerCommand implementation. + * @param serverDirectory The directory for server-related files. The caller + * must ensure the directory has been created. + * @param workspaceDir The workspace, used solely to ensure it persists. + * @param maxIdleSeconds The idle time in seconds after which the rpc + * server will die unless it receives a request. + */ + public static AfUnixServer newServerWith(Clock clock, + ServerCommand appCommand, + Path serverDirectory, + Path workspaceDir, + int maxIdleSeconds) + throws IOException { + // Creates and starts the RPC server. + RPCService service = new RPCService(appCommand); + + return new AfUnixServer(clock, service, maxIdleSeconds * 1000L, + serverDirectory, workspaceDir); + } + +} diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 6e1555ca00..6d22550e8a 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -40,8 +40,6 @@ import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.io.OutputStream; -import java.io.PrintWriter; -import java.io.StringWriter; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.security.SecureRandom; @@ -50,10 +48,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - import javax.annotation.concurrent.GuardedBy; /** @@ -62,7 +57,7 @@ import javax.annotation.concurrent.GuardedBy; * <p>Only this class should depend on gRPC so that we only need to exclude this during * bootstrapping. */ -public class GrpcServerImpl implements RPCServer { +public class GrpcServerImpl extends RPCServer { // UTF-8 won't do because we want to be able to pass arbitrary binary strings. // Not that the internals of Bazel handle that correctly, but why not make at least this little // part correct? @@ -70,8 +65,6 @@ public class GrpcServerImpl implements RPCServer { private static final long NANOSECONDS_IN_MS = TimeUnit.MILLISECONDS.toNanos(1); - private static final Logger LOG = Logger.getLogger(RPCServer.class.getName()); - private class RunningCommand implements AutoCloseable { private final Thread thread; private final String id; @@ -152,8 +145,6 @@ public class GrpcServerImpl implements RPCServer { private static final String REQUEST_COOKIE_FILE = "request_cookie"; private static final String RESPONSE_COOKIE_FILE = "response_cookie"; - private static final AtomicBoolean runShutdownHooks = new AtomicBoolean(true); - @GuardedBy("runningCommands") private final Map<String, RunningCommand> runningCommands = new HashMap<>(); private final CommandExecutor commandExecutor; @@ -170,14 +161,7 @@ public class GrpcServerImpl implements RPCServer { public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port, Path serverDirectory, int maxIdleSeconds) throws IOException { - // server.pid was written in the C++ launcher after fork() but before exec() . - // The client only accesses the pid file after connecting to the socket - // which ensures that it gets the correct pid value. - Path pidFile = serverDirectory.getRelative("server.pid.txt"); - Path pidSymlink = serverDirectory.getRelative("server.pid"); - deleteAtExit(pidFile, /*deleteParent=*/ false); - deleteAtExit(pidSymlink, /*deleteParent=*/ false); - + super(serverDirectory); this.commandExecutor = commandExecutor; this.clock = clock; this.serverDirectory = serverDirectory; @@ -331,46 +315,6 @@ public class GrpcServerImpl implements RPCServer { } - protected void disableShutdownHooks() { - runShutdownHooks.set(false); - } - - /** - * Schedule the specified file for (attempted) deletion at JVM exit. - */ - protected static void deleteAtExit(final Path path, final boolean deleteParent) { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - if (!runShutdownHooks.get()) { - return; - } - - try { - path.delete(); - if (deleteParent) { - path.getParentDirectory().delete(); - } - } catch (IOException e) { - printStack(e); - } - } - }); - } - - static void printStack(IOException e) { - /* - * Hopefully this never happens. It's not very nice to just write this - * to the user's console, but I'm not sure what better choice we have. - */ - StringWriter err = new StringWriter(); - PrintWriter printErr = new PrintWriter(err); - printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]======="); - e.printStackTrace(printErr); - printErr.println("====================================================="); - LOG.severe(err.toString()); - } - private final CommandServerGrpc.CommandServerImplBase commandServer = new CommandServerGrpc.CommandServerImplBase() { @Override diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java index 4880349e15..40ccdeb915 100644 --- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java @@ -16,35 +16,86 @@ package com.google.devtools.build.lib.server; import com.google.devtools.build.lib.runtime.CommandExecutor; import com.google.devtools.build.lib.util.Clock; import com.google.devtools.build.lib.vfs.Path; - import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; /** - * A Bazel server instance. - * - * <p>Even though it only has one implementation, that implementation cannot be compiled during - * bootstrapping Bazel because it depends on the gRPC Java stubs, so we add a layer of abstraction - * so that we can still use its functionality without resorting to reflection every time. + * A server instance. Can either an AF_UNIX or a gRPC one. */ -public interface RPCServer { +public abstract class RPCServer { + private static final Logger LOG = Logger.getLogger(RPCServer.class.getName()); + private static AtomicBoolean runShutdownHooks = new AtomicBoolean(true); /** * Factory class for the gRPC server. * * Present so that we don't need to invoke a constructor with multiple arguments by reflection. */ - interface Factory { + public interface Factory { RPCServer create(CommandExecutor commandExecutor, Clock clock, int port, Path serverDirectory, int maxIdleSeconds) throws IOException; } + protected RPCServer(Path serverDirectory) throws IOException { + // server.pid was written in the C++ launcher after fork() but before exec() . + // The client only accesses the pid file after connecting to the socket + // which ensures that it gets the correct pid value. + Path pidFile = serverDirectory.getRelative("server.pid.txt"); + Path pidSymlink = serverDirectory.getRelative("server.pid"); + RPCServer.deleteAtExit(pidFile, /*deleteParent=*/ false); + RPCServer.deleteAtExit(pidSymlink, /*deleteParent=*/ false); + } + + protected void disableShutdownHooks() { + runShutdownHooks.set(false); + } + + /** + * Schedule the specified file for (attempted) deletion at JVM exit. + */ + protected static void deleteAtExit(final Path path, final boolean deleteParent) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + if (!runShutdownHooks.get()) { + return; + } + + try { + path.delete(); + if (deleteParent) { + path.getParentDirectory().delete(); + } + } catch (IOException e) { + printStack(e); + } + } + }); + } + + static void printStack(IOException e) { + /* + * Hopefully this never happens. It's not very nice to just write this + * to the user's console, but I'm not sure what better choice we have. + */ + StringWriter err = new StringWriter(); + PrintWriter printErr = new PrintWriter(err); + printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]======="); + e.printStackTrace(printErr); + printErr.println("====================================================="); + LOG.severe(err.toString()); + } + /** * Start serving and block until the a shutdown command is received. */ - void serve() throws IOException; + public abstract void serve() throws IOException; /** * Called when the server receives a SIGINT. */ - void interrupt(); + public abstract void interrupt(); } diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java new file mode 100644 index 0000000000..10335d1a93 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java @@ -0,0 +1,117 @@ +// Copyright 2014 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.unix; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.SocketException; + +/** + * <p>An implementation of client Socket for local (AF_UNIX) sockets. + * + * <p>This class intentionally doesn't extend java.net.Socket although it + * has some similarity to it. The java.net class hierarchy is a terrible mess + * and is inextricably coupled to the Internet Protocol. + * + * <p>This code is not intended to be portable to non-UNIX platforms. + */ +public class LocalClientSocket extends LocalSocket { + + /** + * Constructs an unconnected local client socket. + * + * @throws IOException if the socket could not be created. + */ + public LocalClientSocket() throws IOException { + super(); + } + + /** + * Constructs a client socket and connects it to the specified address. + * + * @throws IOException if either of the socket/connect operations failed. + */ + public LocalClientSocket(LocalSocketAddress address) throws IOException { + super(); + connect(address); + } + + /** + * Connect to the specified server. Blocks until the server accepts the + * connection. + * + * @throws IOException if the connection failed. + */ + public synchronized void connect(LocalSocketAddress address) + throws IOException { + checkNotClosed(); + if (state == State.CONNECTED) { + throw new SocketException("socket is already connected"); + } + connect(fd, address.getName().toString()); // JNI + this.address = address; + this.state = State.CONNECTED; + } + + /** + * Returns the input stream for reading from the server. + * + * @param closeSocket close the socket when this input stream is closed. + * @throws IOException if there was a problem. + */ + public synchronized InputStream getInputStream(final boolean closeSocket) throws IOException { + checkConnected(); + checkInputNotShutdown(); + return new FileInputStream(fd) { + @Override + public void close() throws IOException { + if (closeSocket) { + LocalClientSocket.this.close(); + } + } + }; + } + + /** + * Returns the input stream for reading from the server. + * + * @throws IOException if there was a problem. + */ + public synchronized InputStream getInputStream() throws IOException { + return getInputStream(false); + } + + /** + * Returns the output stream for writing to the server. + * + * @throws IOException if there was a problem. + */ + public synchronized OutputStream getOutputStream() throws IOException { + checkConnected(); + checkOutputNotShutdown(); + return new FileOutputStream(fd) { + @Override public void close() { + // Don't close the file descriptor. + } + }; + } + + @Override + public String toString() { + return "LocalClientSocket(" + address + ")"; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java new file mode 100644 index 0000000000..0c0bd22c3f --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java @@ -0,0 +1,173 @@ +// Copyright 2014 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.unix; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; + +/** + * <p>An implementation of ServerSocket for local (AF_UNIX) sockets. + * + * <p>This class intentionally doesn't extend java.net.ServerSocket although it + * has some similarity to it. The java.net class hierarchy is a terrible mess + * and is inextricably coupled to the Internet Protocol. + * + * <p>This code is not intended to be portable to non-UNIX platforms. + */ +public class LocalServerSocket extends LocalSocket { + + // Socket timeout in milliseconds. No timeout by default. + private long soTimeoutMillis = 0; + + /** + * Constructs an unbound local server socket. + */ + public LocalServerSocket() throws IOException { + super(); + } + + /** + * Constructs a server socket, binds it to the specified address, and + * listens for incoming connections with the specified backlog. + * + * @throws IOException if any of the socket/bind/listen operations failed. + */ + public LocalServerSocket(LocalSocketAddress address, int backlog) + throws IOException { + this(); + bind(address); + listen(backlog); + } + + /** + * Constructs a server socket, binds it to the specified address, and begin + * listening for incoming connections using the default backlog. + * + * @throws IOException if any of the socket/bind/listen operations failed. + */ + public LocalServerSocket(LocalSocketAddress address) throws IOException { + this(address, 50); + } + + /** + * Specifies the timeout in milliseconds for accept(). Setting it to + * zero means an indefinite timeout. + */ + public void setSoTimeout(long timeoutMillis) { + soTimeoutMillis = timeoutMillis; + } + + /** + * Returns the current timeout in milliseconds. + */ + public long getSoTimeout() { + return soTimeoutMillis; + } + + /** + * Binds the specified address to this socket. The socket must be unbound. + * This causes the filesystem entry to appear. + * + * @throws IOException if the bind failed. + */ + public synchronized void bind(LocalSocketAddress address) + throws IOException { + if (address == null) { + throw new NullPointerException("address"); + } + checkNotClosed(); + if (state != State.NEW) { + throw new SocketException("socket is already bound to an address"); + } + bind(fd, address.getName().toString()); // JNI + this.address = address; + this.state = State.BOUND; + } + + /** + * Listen for incoming connections on a socket using the specfied backlog. + * The socket must be bound but not already listening. + * + * @throws IOException if the listen failed. + */ + public synchronized void listen(int backlog) throws IOException { + if (backlog < 1) { + throw new IllegalArgumentException("backlog=" + backlog); + } + checkNotClosed(); + if (address == null) { + throw new SocketException("socket has no address bound"); + } + if (state == State.LISTENING) { + throw new SocketException("socket is already listening"); + } + listen(fd, backlog); // JNI + this.state = State.LISTENING; + } + + /** + * Blocks until a connection is made to this socket and accepts it, returning + * a new socket connected to the client. + * + * @return the new socket connected to the client. + * @throws IOException if an error occurs when waiting for a connection. + * @throws SocketTimeoutException if a timeout was previously set with + * setSoTimeout and the timeout has been reached. + * @throws InterruptedIOException if the thread is interrupted when the + * method is blocked. + */ + public synchronized Socket accept() + throws IOException, SocketTimeoutException, InterruptedIOException { + if (state != State.LISTENING) { + throw new SocketException("socket is not in listening state"); + } + + // Throws a SocketTimeoutException if timeout. + if (soTimeoutMillis != 0) { + poll(fd, soTimeoutMillis); // JNI + } + + FileDescriptor clientFd = new FileDescriptor(); + accept(fd, clientFd); // JNI + final LocalSocketImpl impl = new LocalSocketImpl(clientFd); + return new Socket(impl) { + @Override + public boolean isConnected() { + return true; + } + @Override + public synchronized void close() throws IOException { + if (isClosed()) { + return; + } else { + super.close(); + // Workaround for the fact that super.created==false because we + // created the impl ourselves. As a result, super.close() doesn't + // call impl.close(). *Sigh*, java.net is horrendous. + // (Perhaps we should dispense with Socket/SocketImpl altogether?) + impl.close(); + } + } + }; + } + + @Override + public String toString() { + return "LocalServerSocket(" + address + ")"; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java new file mode 100644 index 0000000000..d51f2bad36 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java @@ -0,0 +1,216 @@ +// Copyright 2014 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.unix; + +import com.google.devtools.build.lib.UnixJniLoader; +import java.io.Closeable; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; + +/** + * Abstract superclass for client and server local sockets. + */ +abstract class LocalSocket implements Closeable { + + protected enum State { + NEW, + BOUND, // server only + LISTENING, // server only + CONNECTED, // client only + CLOSED, + } + + protected LocalSocketAddress address = null; + protected FileDescriptor fd = new FileDescriptor(); + protected State state; + protected boolean inputShutdown = false; + protected boolean outputShutdown = false; + + /** + * Constructs an unconnected local socket. + */ + protected LocalSocket() throws IOException { + socket(fd); + if (!fd.valid()) { + throw new IOException("Couldn't create socket!"); + } + this.state = State.NEW; + } + + /** + * Returns the address of the endpoint this socket is bound to. + * + * @return a <code>SocketAddress</code> representing the local endpoint of + * this socket. + */ + public LocalSocketAddress getLocalSocketAddress() { + return address; + } + + /** + * Closes this socket. This operation is idempotent. + * + * To be consistent with Java Socket, the shutdown states of the socket are + * not changed. This makes it easier to port applications between Socket and + * LocalSocket. + * + * @throws IOException if an I/O error occurred when closing the socket. + */ + @Override + public synchronized void close() throws IOException { + if (state == State.CLOSED) { + return; + } + // Closes the file descriptor if it has not been closed by the + // input/output streams. + if (!fd.valid()) { + throw new IllegalStateException("LocalSocket.close(-1)"); + } + close(fd); + if (fd.valid()) { + throw new IllegalStateException("LocalSocket.close() did not set fd to -1"); + } + this.state = State.CLOSED; + } + + /** + * Returns the closed state of the ServerSocket. + * + * @return true if the socket has been closed + */ + public synchronized boolean isClosed() { + // If the file descriptor has been closed by the input/output + // streams, marks the socket as closed too. + return state == State.CLOSED; + } + + /** + * Returns the connected state of the ClientSocket. + * + * @return true if the socket is currently connected. + */ + public synchronized boolean isConnected() { + return state == State.CONNECTED; + } + + protected synchronized void checkConnected() throws SocketException { + if (!isConnected()) { + throw new SocketException("Transport endpoint is not connected"); + } + } + + protected synchronized void checkNotClosed() throws SocketException { + if (isClosed()) { + throw new SocketException("socket is closed"); + } + } + + /** + * Returns the shutdown state of the input channel. + * + * @return true is the input channel of the socket is shutdown. + */ + public synchronized boolean isInputShutdown() { + return inputShutdown; + } + + /** + * Returns the shutdown state of the output channel. + * + * @return true is the input channel of the socket is shutdown. + */ + public synchronized boolean isOutputShutdown() { + return outputShutdown; + } + + protected synchronized void checkInputNotShutdown() throws SocketException { + if (isInputShutdown()) { + throw new SocketException("Socket input is shutdown"); + } + } + + protected synchronized void checkOutputNotShutdown() throws SocketException { + if (isOutputShutdown()) { + throw new SocketException("Socket output is shutdown"); + } + } + + static final int SHUT_RD = 0; // Mapped to BSD SHUT_RD in JNI. + static final int SHUT_WR = 1; // Mapped to BSD SHUT_WR in JNI. + + public synchronized void shutdownInput() throws IOException { + checkNotClosed(); + checkConnected(); + checkInputNotShutdown(); + inputShutdown = true; + shutdown(fd, SHUT_RD); + } + + public synchronized void shutdownOutput() throws IOException { + checkNotClosed(); + checkConnected(); + checkOutputNotShutdown(); + outputShutdown = true; + shutdown(fd, SHUT_WR); + } + + //////////////////////////////////////////////////////////////////////// + // JNI: + + static { + UnixJniLoader.loadJni(); + } + + // The native calls below are thin wrappers around linux system calls. The + // semantics remains the same except for poll(). See the comments for the + // method. + // + // Note: FileDescriptor is a box for a mutable integer that is visible only + // to native code. + + // Generic operations: + protected static native void socket(FileDescriptor server) + throws IOException; + static native void close(FileDescriptor server) + throws IOException; + /** + * Shut down part of a full-duplex connection + * @param code Must be either SHUT_RD or SHUT_WR + */ + static native void shutdown(FileDescriptor fd, int code) + throws IOException; + + /** + * This method checks waits for the given file descriptor to become available for read. + * If timeoutMillis passed and there is no activity, a SocketTimeoutException will be thrown. + */ + protected static native void poll(FileDescriptor read, long timeoutMillis) + throws IOException, SocketTimeoutException, InterruptedIOException; + + // Server operations: + protected static native void bind(FileDescriptor server, String filename) + throws IOException; + protected static native void listen(FileDescriptor server, int backlog) + throws IOException; + protected static native void accept(FileDescriptor server, + FileDescriptor client) + throws IOException; + + // Client operations: + protected static native void connect(FileDescriptor client, String filename) + throws IOException; +} diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java new file mode 100644 index 0000000000..f9b9d43f06 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java @@ -0,0 +1,56 @@ +// Copyright 2014 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.unix; + +import java.io.File; +import java.net.SocketAddress; + +/** + * An implementation of SocketAddress for naming local sockets, i.e. files in + * the UNIX file system. + */ +public class LocalSocketAddress extends SocketAddress { + + private final File name; + + /** + * Constructs a SocketAddress for the specified file. + */ + public LocalSocketAddress(File name) { + this.name = name; + } + + /** + * Returns the filename of this local socket address. + */ + public File getName() { + return name; + } + + @Override + public String toString() { + return name.toString(); + } + + @Override + public boolean equals(Object other) { + return other instanceof LocalSocketAddress && + ((LocalSocketAddress) other).name.equals(this.name); + } + + @Override + public int hashCode() { + return name.hashCode(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java new file mode 100644 index 0000000000..a30b450dc1 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java @@ -0,0 +1,171 @@ +// Copyright 2014 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.unix; + +import com.google.devtools.build.lib.UnixJniLoader; +import com.google.devtools.build.lib.util.OS; + +import java.io.Closeable; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.SocketImpl; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A simple implementation of SocketImpl for sockets that wrap a UNIX + * file-descriptor. This SocketImpl assumes that the socket is already + * created, bound, connected and supports no socket options or out-of-band + * features. This is used to implement server-side accepted client sockets + * (i.e. those returned by {@link LocalServerSocket#accept}). + */ +class LocalSocketImpl extends SocketImpl { + private static final Logger logger = + Logger.getLogger(LocalSocketImpl.class.getName()); + + static { + UnixJniLoader.loadJni(); + if (OS.getCurrent() != OS.WINDOWS) { + init(); + } + } + + // The logic here is a little twisted, to support JDK7 and JDK8. + + // 1) In JDK7, the FileDescriptor class keeps a reference count of + // instances using the fd, and closes it when it goes to 0. The + // reference count is only decremented by the finalizer for a + // given class. When the call to close() happens, the fd is + // closed regardless of the current state of the refcount. + // + // 2) In JDK8, every instance that uses the fd registers a Closeable + // with the FileDescriptor. Since the FileDescriptor has a + // reference to every user, only when all of the users and the + // FileDescriptor get GC'd does the finalizer run. An explicit + // call to close() calls FileDescriptor.closeAll(), which + // force-closes all of the users. + + // So, in our case: + + // 1) ref() increments the refcount in JDK7, and registers with the + // FD in JDK8. + + // 2) unref() decrements the refcount in JDK7, and does nothing in + // JDK8. + + // 3) The finalizer decrements the refcount in JDK7, and simply + // calls close() in JDK8 (where we don't have to worry about + // multiple live users of the FD). The close() method itself is + // idempotent. + + // 4) close() calls fd.closeAll in JDK8, which, in turn, calls + // closer.close(). In JDK7, close() calls closer.close() + // explicitly. + private static native void init(); + private static native void ref(FileDescriptor fd, Closeable closeable); + private static native boolean unref(FileDescriptor fd); + private static native boolean close0(FileDescriptor fd, Closeable closeable); + + private final boolean isInitialized; + private final Closeable closer = new Closeable() { + AtomicBoolean isClosed = new AtomicBoolean(false); + @Override public void close() throws IOException { + if (isClosed.compareAndSet(false, true)) { + LocalSocket.close(fd); + } + } + }; + + // Note to callers: if you pass a FD into this constructor, this + // instance is now responsible for closing it (in the sense of + // LocalSocket.close()). If some other instance tries to close it, + // then terrible things will happen. + LocalSocketImpl(FileDescriptor fd) { + this.fd = fd; // (inherited field) + ref(fd, closer); + isInitialized = true; + } + + @Override protected void finalize() { + try { + if (isInitialized) { + if (!unref(fd)) { + // JDK8 codepath + close0(fd, closer); + } + } + } catch (Exception e) { + logger.log(Level.WARNING, "Unable to access FileDescriptor class - " + + "may cause a file descriptor leak", e); + } + } + @Override protected InputStream getInputStream() { + return new FileInputStream(getFileDescriptor()); + } + @Override protected OutputStream getOutputStream() { + return new FileOutputStream(getFileDescriptor()); + } + @Override protected void close() throws IOException { + if (fd.valid()) { + if (!close0(fd, closer)) { + // JDK7 codepath + closer.close(); + } + } + } + + // Unused: + @Override + public void setOption(int optID, Object value) { + throw new UnsupportedOperationException("setOption"); + } + @Override + public Object getOption(int optID) { + throw new UnsupportedOperationException("getOption"); + } + @Override protected void create(boolean stream) { + throw new UnsupportedOperationException("create"); + } + @Override protected void connect(String host, int port) { + throw new UnsupportedOperationException("connect"); + } + @Override protected void connect(InetAddress address, int port) { + throw new UnsupportedOperationException("connect2"); + } + @Override protected void connect(SocketAddress address, int timeout) { + throw new UnsupportedOperationException("connect3"); + } + @Override protected void bind(InetAddress host, int port) { + throw new UnsupportedOperationException("bind"); + } + @Override protected void listen(int backlog) { + throw new UnsupportedOperationException("listen"); + } + @Override protected void accept(SocketImpl s) { + throw new UnsupportedOperationException("accept"); + } + @Override protected int available() { + throw new UnsupportedOperationException("available"); + } + @Override protected void sendUrgentData(int i) { + throw new UnsupportedOperationException("sendUrgentData"); + } +} diff --git a/src/main/native/BUILD b/src/main/native/BUILD index 3df6f78f12..8780377ef4 100644 --- a/src/main/native/BUILD +++ b/src/main/native/BUILD @@ -36,6 +36,7 @@ filegroup( cc_binary( name = "libunix.so", srcs = [ + "localsocket.cc", "macros.h", "process.cc", "unix_jni.cc", diff --git a/src/main/native/localsocket.cc b/src/main/native/localsocket.cc new file mode 100644 index 0000000000..3731619e52 --- /dev/null +++ b/src/main/native/localsocket.cc @@ -0,0 +1,312 @@ +// Copyright 2014 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <jni.h> + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <poll.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> + +#include <string> + +#include "src/main/native/unix_jni.h" + +// Returns the field ID for FileDescriptor.fd. +static jfieldID GetFileDescriptorField(JNIEnv *env) { + // See http://java.sun.com/docs/books/jni/html/fldmeth.html#26855 + static jclass fd_class = NULL; + if (fd_class == NULL) { /* note: harmless race condition */ + jclass local = env->FindClass("java/io/FileDescriptor"); + CHECK(local != NULL); + fd_class = static_cast<jclass>(env->NewGlobalRef(local)); + } + static jfieldID fieldId = NULL; + if (fieldId == NULL) { /* note: harmless race condition */ + fieldId = env->GetFieldID(fd_class, "fd", "I"); + CHECK(fieldId != NULL); + } + return fieldId; +} + +// Returns the UNIX filedescriptor from a java.io.FileDescriptor instance. +static jint GetUnixFileDescriptor(JNIEnv *env, jobject fd_obj) { + return env->GetIntField(fd_obj, GetFileDescriptorField(env)); +} + +// Sets the UNIX filedescriptor of a java.io.FileDescriptor instance. +static void SetUnixFileDescriptor(JNIEnv *env, jobject fd_obj, jint fd) { + env->SetIntField(fd_obj, GetFileDescriptorField(env), fd); +} + +/* + * Class: com.google.devtools.build.lib.unix.LocalSocket + * Method: socket + * Signature: (Ljava/io/FileDescriptor;)V + */ +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocket_socket(JNIEnv *env, + jclass clazz, + jobject fd_sock) { + int sock; + if ((sock = ::socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { + ::PostException(env, errno, ::ErrorMessage(errno)); + return; + } + SetUnixFileDescriptor(env, fd_sock, sock); +} + +// Initialize "addr" from "name_chars", reporting error and returning +// false on failure. +static bool InitializeSockaddr(JNIEnv *env, + struct sockaddr_un *addr, + const char* name_chars) { + memset(addr, 0, sizeof *addr); + addr->sun_family = AF_UNIX; + // Note: UNIX_PATH_MAX is quite small! + if (strlen(name_chars) >= sizeof(addr->sun_path)) { + ::PostFileException(env, ENAMETOOLONG, name_chars); + return false; + } + strcpy((char*) &addr->sun_path, name_chars); + return true; +} + +/* + * Class: com.google.devtools.build.lib.unix.LocalSocket + * Method: bind + * Signature: (Ljava/io/FileDescriptor;Ljava/lang/String;)V + */ +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocket_bind(JNIEnv *env, + jclass clazz, + jobject fd_svr, + jstring name) { + int svr_sock = GetUnixFileDescriptor(env, fd_svr); + const char* name_chars = env->GetStringUTFChars(name, NULL); + struct sockaddr_un addr; + if (InitializeSockaddr(env, &addr, name_chars) && + ::bind(svr_sock, (struct sockaddr *) &addr, sizeof addr) < 0) { + ::PostException(env, errno, ::ErrorMessage(errno)); + } + env->ReleaseStringUTFChars(name, name_chars); +} + +/* + * Class: com.google.devtools.build.lib.unix.LocalSocket + * Method: listen + * Signature: (Ljava/io/FileDescriptor;I)V + */ +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocket_listen(JNIEnv *env, + jclass clazz, + jobject fd_svr, + jint backlog) { + int svr_sock = GetUnixFileDescriptor(env, fd_svr); + if (::listen(svr_sock, backlog) < 0) { + ::PostException(env, errno, ::ErrorMessage(errno)); + } +} + +/* + * Class: com.google.devtools.build.lib.unix.LocalSocket + * Method: select + * Signature: (L[java/io/FileDescriptor;[java/io/FileDescriptor;[java/io/FileDescriptor;J)Ljava/io/FileDescriptor + */ +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocket_poll(JNIEnv *env, + jclass clazz, + jobject rfds_svr, + jlong timeoutMillis) { + // TODO(bazel-team): Handle Unix signals, namely SIGTERM. + + // Copy Java FD into pollfd + pollfd pollfd; + pollfd.fd = GetUnixFileDescriptor(env, rfds_svr); + pollfd.events = POLLIN; + pollfd.revents = 0; + + int count = poll(&pollfd, 1, timeoutMillis); + if (count == 0) { + // throws a timeout exception. + ::PostException(env, ETIMEDOUT, ::ErrorMessage(ETIMEDOUT)); + } else if (count < 0) { + ::PostException(env, errno, ::ErrorMessage(errno)); + } +} + +/* + * Class: com.google.devtools.build.lib.unix.LocalSocket + * Method: accept + * Signature: (Ljava/io/FileDescriptor;Ljava/io/FileDescriptor;)V + */ +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocket_accept(JNIEnv *env, + jclass clazz, + jobject fd_svr, + jobject fd_cli) { + int svr_sock = GetUnixFileDescriptor(env, fd_svr); + int cli_sock; + if ((cli_sock = ::accept(svr_sock, NULL, NULL)) < 0) { + ::PostException(env, errno, ::ErrorMessage(errno)); + return; + } + SetUnixFileDescriptor(env, fd_cli, cli_sock); +} + +/* + * Class: com.google.devtools.build.lib.unix.LocalSocket + * Method: close + * Signature: (Ljava/io/FileDescriptor;)V + */ +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocket_close(JNIEnv *env, + jclass clazz, + jobject fd_svr) { + int svr_sock = GetUnixFileDescriptor(env, fd_svr); + if (::close(svr_sock) < 0) { + ::PostException(env, errno, ::ErrorMessage(errno)); + } + SetUnixFileDescriptor(env, fd_svr, -1); +} + +/* + * Class: com.google.devtools.build.lib.unix.LocalSocket + * Method: connect + * Signature: (Ljava/io/FileDescriptor;Ljava/lang/String;)V + */ +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocket_connect(JNIEnv *env, + jclass clazz, + jobject fd_cli, + jstring name) { + const char* name_chars = env->GetStringUTFChars(name, NULL); + jint cli_sock = GetUnixFileDescriptor(env, fd_cli); + if (cli_sock == -1) { + ::PostFileException(env, ENOTSOCK, name_chars); + } else { + struct sockaddr_un addr; + if (InitializeSockaddr(env, &addr, name_chars)) { + if (::connect(cli_sock, (struct sockaddr *) &addr, sizeof addr) < 0) { + ::PostException(env, errno, ::ErrorMessage(errno)); + } + } + } + env->ReleaseStringUTFChars(name, name_chars); +} + +/* + * Class: com.google.devtools.build.lib.unix.LocalSocket + * Method: shutdown() + * Signature: (Ljava/io/FileDescriptor;I)V + * Parameters: code: 0 to shutdown input and 1 to shutdown output. + */ +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocket_shutdown(JNIEnv *env, + jclass clazz, + jobject fd, + jint code) { + int action; + if (code == 0) { + action = SHUT_RD; + } else { + CHECK(code == 1); + action = SHUT_WR; + } + + int sock = GetUnixFileDescriptor(env, fd); + if (shutdown(sock, action) < 0) { + ::PostException(env, errno, ::ErrorMessage(errno)); + } +} + +// TODO(bazel-team): These methods were removed in JDK8, so they +// can be removed when we are no longer using JDK7. See note in +// LocalSocketImpl. +static jmethodID increment_use_count_; +static jmethodID decrement_use_count_; + +// >=JDK8 +static jmethodID fd_attach_; +static jmethodID fd_close_all_; + +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocketImpl_init(JNIEnv *env, jclass ignored) { + jclass cls = env->FindClass("java/io/FileDescriptor"); + if (cls == NULL) { + cls = env->FindClass("java/lang/NoClassDefFoundError"); + env->ThrowNew(cls, "FileDescriptor class not found"); + return; + } + + // JDK7 + increment_use_count_ = + env->GetMethodID(cls, "incrementAndGetUseCount", "()I"); + if (increment_use_count_ != NULL) { + decrement_use_count_ = + env->GetMethodID(cls, "decrementAndGetUseCount", "()I"); + } else { + // JDK8 + env->ExceptionClear(); // The pending exception from increment_use_count_ + + fd_attach_ = env->GetMethodID(cls, "attach", "(Ljava/io/Closeable;)V"); + fd_close_all_ = env->GetMethodID(cls, "closeAll", "(Ljava/io/Closeable;)V"); + + if (fd_attach_ == NULL || fd_close_all_ == NULL) { + cls = env->FindClass("java/lang/NoSuchMethodError"); + env->ThrowNew(cls, "FileDescriptor methods not found"); + return; + } + } +} + +extern "C" JNIEXPORT void JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocketImpl_ref(JNIEnv *env, jclass clazz, + jobject fd, jobject closer) { + if (increment_use_count_ != NULL) { + env->CallIntMethod(fd, increment_use_count_); + } + + if (fd_attach_ != NULL) { + env->CallVoidMethod(fd, fd_attach_, closer); + } +} + +extern "C" JNIEXPORT jboolean JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocketImpl_unref(JNIEnv *env, jclass clazz, + jobject fd) { + if (decrement_use_count_ != NULL) { + env->CallIntMethod(fd, decrement_use_count_); + return true; + } + return false; +} + +extern "C" JNIEXPORT jboolean JNICALL +Java_com_google_devtools_build_lib_unix_LocalSocketImpl_close0(JNIEnv *env, jclass clazz, + jobject fd, + jobject closeable) { + if (fd_close_all_ != NULL) { + env->CallVoidMethod(fd, fd_close_all_, closeable); + return true; + } + // This will happen if fd_close_all_ is NULL, which means we are running in + // <=JDK7, which means that the caller needs to invoke close() explicitly. + return false; +} diff --git a/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java new file mode 100644 index 0000000000..58948e1720 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java @@ -0,0 +1,135 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; +import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.ShutdownMethod; +import com.google.devtools.build.lib.testutil.Suite; +import com.google.devtools.build.lib.testutil.TestSpec; +import com.google.devtools.build.lib.util.JavaClock; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.build.lib.util.io.RecordingOutErr; +import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.util.FsApparatus; +import java.io.File; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Run a real RPC server on localhost, and talk to it using the testing + * client. + */ +@TestSpec(size = Suite.MEDIUM_TESTS) +@RunWith(JUnit4.class) +public class AfUnixServerTest { + + private static final long MAX_IDLE_MILLIS = 10000; + private static final long HEALTH_CHECK_MILLIS = 1000 * 3; + private static final String COMMAND_STDOUT = "Heelllloo...."; + private static final String COMMAND_STDERR = "...world!"; + + private AfUnixServer server; + private FsApparatus scratch = FsApparatus.newNative(); + private RecordingOutErr outErr = new RecordingOutErr(); + private Path serverDir; + private Path workspaceDir; + private RPCTestingClient client; + private Thread serverThread = new Thread(){ + @Override + public void run() { + server.serve(); + } + }; + + private static final ServerCommand helloWorldCommand = new ServerCommand() { + @Override + public int exec(List<String> args, OutErr outErr, LockingMode lockingMode, + String clientDescription, long firstContactTime) { + outErr.printOut(COMMAND_STDOUT); + outErr.printErr(COMMAND_STDERR); + return 42; + } + + @Override + public ShutdownMethod shutdown() { + return ShutdownMethod.NONE; + } + }; + + @Before + public final void startServer() throws Exception { + // Do not use `createUnixTempDir()` here since the file name that results is longer + // than 108 characters, so cannot be used as local socket address. + File file = File.createTempFile("scratch", ".tmp", new File("/tmp")); + file.delete(); + file.mkdir(); + serverDir = this.scratch.dir(file.getAbsolutePath()); + + workspaceDir = this.scratch.createUnixTempDir(); + workspaceDir.createDirectory(); + client = new RPCTestingClient( + outErr, serverDir.getRelative("server.socket")); + RPCService service = new RPCService(helloWorldCommand); + server = new AfUnixServer(new JavaClock(), service, MAX_IDLE_MILLIS, HEALTH_CHECK_MILLIS, + serverDir, workspaceDir); + serverThread.start(); + } + + @After + public final void stopServer() throws Exception { + serverThread.interrupt(); + serverThread.join(); + + FileSystemUtils.deleteTree(serverDir); + } + + private void runTestRequest(String request, int ret, String out, String err) throws Exception { + assertEquals(ret, client.sendRequest(request)); + assertEquals(out, outErr.outAsLatin1()); + assertThat(outErr.errAsLatin1()).contains(err); + } + + @Test + public void testUnknownCommand() throws Exception { + runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); + } + + @Test + public void testEmptyBlazeCommand() throws Exception { + runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n"); + } + + @Test + public void testWorkspaceDies() throws Exception { + assertTrue(serverThread.isAlive()); + runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR); + Thread.sleep(HEALTH_CHECK_MILLIS * 2); + assertTrue(serverThread.isAlive()); + + assertTrue(workspaceDir.delete()); + Thread.sleep(HEALTH_CHECK_MILLIS * 2); + assertFalse(serverThread.isAlive()); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java new file mode 100644 index 0000000000..3b1515d2e2 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java @@ -0,0 +1,88 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.server; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.unix.LocalClientSocket; +import com.google.devtools.build.lib.unix.LocalSocketAddress; +import com.google.devtools.build.lib.util.io.RecordingOutErr; +import com.google.devtools.build.lib.util.io.StreamDemultiplexer; +import com.google.devtools.build.lib.vfs.Path; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; + +/** + * A client to test RPCServer. + */ +public class RPCTestingClient { + + private final RecordingOutErr outErr; + private final Path socketFile; + + /** + * Create a client to RPCServer. {@code socketFile} must be a file + * on disk; this will not work with the in-memory file system. + */ + public RPCTestingClient(RecordingOutErr outErr, Path socketFile) { + this.socketFile = socketFile; + this.outErr = outErr; + } + + public int sendRequest(String command, String... params) + throws Exception { + String request = command; + for (String param : params) { + request += "\0" + param; + } + return sendRequest(request); + } + + public int sendRequest(String request) throws Exception { + LocalClientSocket connection = new LocalClientSocket(); + connection.connect(new LocalSocketAddress(socketFile.getPathFile())); + try { + OutputStream out = connection.getOutputStream(); + byte[] requestBytes = request.getBytes(UTF_8); + byte[] requestLength = new byte[4]; + requestLength[0] = (byte) (requestBytes.length << 24); + requestLength[1] = (byte) ((requestBytes.length << 16) & 0xff); + requestLength[2] = (byte) ((requestBytes.length << 8) & 0xff); + requestLength[3] = (byte) (requestBytes.length & 0xff); + out.write(requestLength); + out.write(requestBytes); + out.flush(); + connection.shutdownOutput(); + + OutputStream stdout = outErr.getOutputStream(); + OutputStream stderr = outErr.getErrorStream(); + ByteArrayOutputStream control = new ByteArrayOutputStream(); + StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1, + stdout, stderr, control); + ByteStreams.copy(connection.getInputStream(), demux); + demux.flush(); + + byte[] controlBytes = control.toByteArray(); + return (((int) controlBytes[0]) << 24) + + (((int) controlBytes[1]) << 16) + + (((int) controlBytes[2]) << 8) + + ((int) controlBytes[3]); + } finally { + connection.close(); + } + } + +} |