aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Janak Ramakrishnan <janakr@google.com>2016-08-11 20:46:20 +0000
committerGravatar Yue Gan <yueg@google.com>2016-08-12 08:52:58 +0000
commitbab0d481dea8be7568cd593460c26111bf302175 (patch)
tree30b454482bcded54cf216b30fa40a98d56fa679a
parent7e33704e7546bb676e9052089c30f1dd625fd082 (diff)
Rollback of commit f107debac45ddf5859b1eb963379769b5815b18f. Also includes the logical rollback of commit 67ad82a319ff8959e69e774e7c15d3af904ec23d.
RELNOTES[INC]: Bazel supports Unix domain sockets for communication between its client and server again, temporarily, while we diagnose a memory leak. -- MOS_MIGRATED_REVID=130027009
-rw-r--r--src/main/cpp/blaze.cc312
-rw-r--r--src/main/cpp/blaze_startup_options_common.cc4
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java21
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java558
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java60
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/RPCServer.java71
-rw-r--r--src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java117
-rw-r--r--src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java173
-rw-r--r--src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java216
-rw-r--r--src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java56
-rw-r--r--src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java171
-rw-r--r--src/main/native/BUILD1
-rw-r--r--src/main/native/localsocket.cc312
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java135
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java88
15 files changed, 2217 insertions, 78 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index 116efd5314..7fb3511a0a 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -241,6 +241,22 @@ uint64_t BlazeServer::AcquireLock() {
globals->options.block_for_lock, &blaze_lock_);
}
+// Communication method that uses an AF_UNIX socket and a custom protocol.
+class AfUnixBlazeServer : public BlazeServer {
+ public:
+ AfUnixBlazeServer();
+ virtual ~AfUnixBlazeServer() {}
+
+ virtual bool Connect();
+ virtual void Disconnect();
+ virtual unsigned int Communicate();
+ virtual void KillRunningServer();
+ virtual void Cancel();
+
+ private:
+ int server_socket_;
+};
+
// Communication method that uses gRPC on a socket bound to localhost. More
// documentation is in command_server.proto .
class GrpcBlazeServer : public BlazeServer {
@@ -695,6 +711,242 @@ static void StartStandalone(BlazeServer* server) {
pdie(blaze_exit_code::INTERNAL_ERROR, "execv of '%s' failed", exe.c_str());
}
+AfUnixBlazeServer::AfUnixBlazeServer() {
+ server_socket_ = -1;
+ connected_ = false;
+}
+
+bool AfUnixBlazeServer::Connect() {
+ assert(!connected_);
+
+ if (server_socket_ == -1) {
+ server_socket_ = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (server_socket_ == -1) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "can't create AF_UNIX socket");
+ }
+
+ if (fcntl(server_socket_, F_SETFD, FD_CLOEXEC) == -1) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "fcntl(F_SETFD, FD_CLOEXEC) failed");
+ }
+ }
+
+ struct sockaddr_un addr;
+ addr.sun_family = AF_UNIX;
+
+ string socket_file = globals->options.output_base + "/server/server.socket";
+ char *resolved_path = realpath(socket_file.c_str(), NULL);
+ if (resolved_path != NULL) {
+ strncpy(addr.sun_path, resolved_path, sizeof addr.sun_path);
+ addr.sun_path[sizeof addr.sun_path - 1] = '\0';
+ free(resolved_path);
+ sockaddr *paddr = reinterpret_cast<sockaddr *>(&addr);
+ int result = connect(server_socket_, paddr, sizeof addr);
+ connected_ = result == 0;
+ if (connected_) {
+ string server_dir = globals->options.output_base + "/server";
+ globals->server_pid = GetServerPid(server_dir);
+ if (globals->server_pid <= 0) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "can't get PID of existing server (server dir=%s)",
+ server_dir.c_str());
+ }
+ }
+
+ return connected_;
+ } else if (errno == ENOENT) { // No socket means no server to connect to
+ errno = ECONNREFUSED;
+ return false;
+ } else {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "realpath('%s') failed", socket_file.c_str());
+ return false;
+ }
+}
+
+void AfUnixBlazeServer::Disconnect() {
+ assert(connected_);
+
+ close(server_socket_);
+ connected_ = false;
+ server_socket_ = -1;
+}
+
+static int ServerEof() {
+ // e.g. external SIGKILL of server, misplaced System.exit() in the server,
+ // or a JVM crash. Print out the jvm.out file in case there's something
+ // useful.
+ fprintf(stderr, "Error: unexpected EOF from %s server.\n"
+ "Contents of '%s':\n", globals->options.product_name.c_str(),
+ globals->jvm_log_file.c_str());
+ WriteFileToStreamOrDie(stderr, globals->jvm_log_file.c_str());
+ return GetExitCodeForAbruptExit(*globals);
+}
+
+// Reads a single char from the specified stream.
+static int ReadServerChar(int fd, unsigned char *result) {
+ if (read(fd, result, 1) != 1) {
+ return ServerEof();
+ }
+ return 0;
+}
+
+static int ReadServerInt(int fd, unsigned int *result) {
+ unsigned char buffer[4];
+ unsigned char *p = buffer;
+ int remaining = 4;
+
+ while (remaining > 0) {
+ int bytes_read = read(fd, p, remaining);
+ if (bytes_read <= 0) {
+ return ServerEof();
+ }
+
+ remaining -= bytes_read;
+ p += bytes_read;
+ }
+
+ *result = (buffer[0] << 24) + (buffer[1] << 16) + (buffer[2] << 8)
+ + buffer[3];
+ return 0;
+}
+
+static char server_output_buffer[8192];
+
+// Forwards the output of the server to the specified file handle.
+static int ForwardServerOutput(int socket, int output) {
+ unsigned int remaining;
+ int exit_code = ReadServerInt(socket, &remaining);
+ if (exit_code != 0) {
+ return exit_code;
+ }
+ while (remaining > 0) {
+ int bytes = remaining > 8192 ? 8192 : remaining;
+ bytes = read(socket, server_output_buffer, bytes);
+ if (bytes <= 0) {
+ return ServerEof();
+ }
+
+ remaining -= bytes;
+ if (write(output, server_output_buffer, bytes) != bytes) {
+ // Not much we can do if this doesn't work, just placate the compiler.
+ }
+ }
+
+ return 0;
+}
+
+unsigned int AfUnixBlazeServer::Communicate() {
+ assert(connected_);
+
+ const string request = BuildServerRequest();
+
+ // Send request (Request is written in a single chunk.)
+ char request_size[4];
+ request_size[0] = (request.size() >> 24) & 0xff;
+ request_size[1] = (request.size() >> 16) & 0xff;
+ request_size[2] = (request.size() >> 8) & 0xff;
+ request_size[3] = (request.size()) & 0xff;
+ if (write(server_socket_, request_size, 4) != 4) {
+ pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed");
+ }
+
+ if (write(server_socket_, request.data(), request.size()) != request.size()) {
+ pdie(blaze_exit_code::INTERNAL_ERROR, "write() to server failed");
+ }
+
+ // Wait until we receive some response from the server.
+ // (We do this by calling select() with a timeout.)
+ // If we don't receive a response within 3 seconds, print a message,
+ // so that the user has some idea what is going on.
+ while (true) {
+ fd_set fdset;
+ FD_ZERO(&fdset);
+ FD_SET(server_socket_, &fdset);
+ struct timeval timeout;
+ timeout.tv_sec = 3;
+ timeout.tv_usec = 0;
+ int result = select(server_socket_ + 1, &fdset, NULL, &fdset, &timeout);
+ if (result > 0) {
+ // Data is ready on socket. Go ahead and read it.
+ break;
+ } else if (result == 0) {
+ // Timeout. Print a message, then go ahead and read from
+ // the socket (the read will usually block).
+ fprintf(stderr,
+ "INFO: Waiting for response from %s server (pid %d)...\n",
+ globals->options.product_name.c_str(), globals->server_pid);
+ break;
+ } else { // result < 0
+ // Error. For EINTR we try again, all other errors are fatal.
+ if (errno != EINTR) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "select() on server socket failed");
+ }
+ }
+ }
+
+ // Read and demux the response.
+ const int TAG_STDOUT = 1;
+ const int TAG_STDERR = 2;
+ const int TAG_CONTROL = 3;
+ unsigned int exit_code;
+ for (;;) {
+ // Read the tag
+ unsigned char tag;
+ exit_code = ReadServerChar(server_socket_, &tag);
+ if (exit_code != 0) {
+ return exit_code;
+ }
+
+ switch (tag) {
+ // stdout
+ case TAG_STDOUT:
+ exit_code = ForwardServerOutput(server_socket_, STDOUT_FILENO);
+ if (exit_code != 0) {
+ return exit_code;
+ }
+ break;
+
+ // stderr
+ case TAG_STDERR:
+ exit_code = ForwardServerOutput(server_socket_, STDERR_FILENO);
+ if (exit_code != 0) {
+ return exit_code;
+ }
+ break;
+
+ // Control stream. Currently only used for reporting the exit code.
+ case TAG_CONTROL:
+ unsigned int length;
+ exit_code = ReadServerInt(server_socket_, &length);
+ if (exit_code != 0) {
+ // We cannot read the length field. The return value of ReadSeverInt()
+ // is the result of ServerEof(), so we bail out early so that we don't
+ // call ServerEof() twice.
+ return exit_code;
+ }
+
+ if (length != 4) {
+ return ServerEof();
+ }
+ unsigned int server_exit_code;
+ exit_code = ReadServerInt(server_socket_, &server_exit_code);
+ return exit_code != 0 ? exit_code : server_exit_code;
+
+ default:
+ fprintf(stderr, "bad tag %d\n", tag);
+ return ServerEof();
+ }
+ }
+}
+
+void AfUnixBlazeServer::Cancel() {
+ assert(connected_);
+ kill(globals->server_pid, SIGINT);
+}
+
// Write the contents of file_name to stream.
static void WriteFileToStreamOrDie(FILE *stream, const char *file_name) {
FILE *fp = fopen(file_name, "r");
@@ -809,6 +1061,55 @@ static void StartServerAndConnect(BlazeServer *server) {
socket_file.c_str());
}
+// Poll until the given process denoted by pid goes away. Return false if this
+// does not occur within wait_time_secs.
+static bool WaitForServerDeath(pid_t pid, int wait_time_secs) {
+ for (int ii = 0; ii < wait_time_secs * 10; ++ii) {
+ if (kill(pid, 0) == -1) {
+ if (errno == ESRCH) {
+ return true;
+ }
+ pdie(blaze_exit_code::INTERNAL_ERROR, "could not be killed");
+ }
+ poll(NULL, 0, 100); // sleep 100ms. (usleep(3) is obsolete.)
+ }
+ return false;
+}
+
+// Kills the specified running Blaze server. First we send a SIGTERM, and if
+// that does not kill the process, a SIGKILL.
+void AfUnixBlazeServer::KillRunningServer() {
+ assert(connected_);
+ assert(globals->server_pid > 0);
+
+ close(server_socket_);
+ server_socket_ = -1;
+ fprintf(stderr, "Sending SIGTERM to previous %s server (pid=%d)... ",
+ globals->options.product_name.c_str(), globals->server_pid);
+ fflush(stderr);
+ kill(globals->server_pid, SIGTERM);
+ if (WaitForServerDeath(globals->server_pid, 10)) {
+ fprintf(stderr, "done.\n");
+ connected_ = false;
+ return;
+ }
+
+ // If the previous attempt did not suceeded, kill the whole group.
+ fprintf(stderr,
+ "Sending SIGKILL to previous %s server process group (pid=%d)... ",
+ globals->options.product_name.c_str(), globals->server_pid);
+ fflush(stderr);
+ killpg(globals->server_pid, SIGKILL);
+ if (WaitForServerDeath(globals->server_pid, 10)) {
+ fprintf(stderr, "killed.\n");
+ connected_ = false;
+ return;
+ }
+
+ // Process did not go away 10s after SIGKILL. Stuck in state 'Z' or 'D'?
+ pdie(blaze_exit_code::INTERNAL_ERROR, "SIGKILL unsuccessful after 10s");
+}
+
// Calls fsync() on the file (or directory) specified in 'file_path'.
// pdie()'s if syncing fails.
static void SyncFile(const char *file_path) {
@@ -1530,6 +1831,13 @@ int main(int argc, const char *argv[]) {
CheckBinaryPath(argv[0]);
ParseOptions(argc, argv);
+#ifdef __CYGWIN__
+ if (globals->options.command_port == -1) {
+ // AF_UNIX does not work on Windows, so use gRPC instead.
+ globals->options.command_port = 0;
+ }
+#endif
+
string error;
blaze_exit_code::ExitCode reexec_options_exit_code =
globals->options.CheckForReExecuteOptions(argc, argv, &error);
@@ -1542,7 +1850,9 @@ int main(int argc, const char *argv[]) {
const string self_path = GetSelfPath();
ComputeBaseDirectories(self_path);
- blaze_server = static_cast<BlazeServer *>(new GrpcBlazeServer());
+ blaze_server = globals->options.command_port >= 0
+ ? static_cast<BlazeServer *>(new GrpcBlazeServer())
+ : static_cast<BlazeServer *>(new AfUnixBlazeServer());
globals->command_wait_time = blaze_server->AcquireLock();
diff --git a/src/main/cpp/blaze_startup_options_common.cc b/src/main/cpp/blaze_startup_options_common.cc
index b3cbaadbe6..449907b4a2 100644
--- a/src/main/cpp/blaze_startup_options_common.cc
+++ b/src/main/cpp/blaze_startup_options_common.cc
@@ -242,10 +242,10 @@ blaze_exit_code::ExitCode BlazeStartupOptions::ProcessArg(
} else if ((value = GetUnaryOption(
arg, next_arg, "--command_port")) != NULL) {
if (!blaze_util::safe_strto32(value, &command_port) ||
- command_port < 0 || command_port > 65535) {
+ command_port < -1 || command_port > 65535) {
blaze_util::StringPrintf(error,
"Invalid argument to --command_port: '%s'. "
- "Must be a valid port number or 0.\n",
+ "Must be a valid port number or -1 to disable the gRPC server.\n",
value);
return blaze_exit_code::BAD_ARGV;
}
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
index 5a4ebd0875..808759628a 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
@@ -47,6 +47,7 @@ import com.google.devtools.build.lib.query2.output.OutputFormatter;
import com.google.devtools.build.lib.rules.test.CoverageReportActionFactory;
import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
import com.google.devtools.build.lib.runtime.proto.InvocationPolicyOuterClass.InvocationPolicy;
+import com.google.devtools.build.lib.server.AfUnixServer;
import com.google.devtools.build.lib.server.RPCServer;
import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
import com.google.devtools.build.lib.shell.JavaSubprocessFactory;
@@ -777,17 +778,23 @@ public final class BlazeRuntime {
BlazeServerStartupOptions startupOptions =
runtime.getStartupOptionsProvider().getOptions(BlazeServerStartupOptions.class);
- try {
- // This is necessary so that Bazel kind of works during bootstrapping, at which time the
- // gRPC server is not compiled in so that we don't need gRPC for bootstrapping.
- Class<?> factoryClass = Class.forName(
- "com.google.devtools.build.lib.server.GrpcServerImpl$Factory");
+ if (startupOptions.commandPort != -1) {
+ try {
+ // This is necessary so that Bazel kind of works during bootstrapping, at which time the
+ // gRPC server is not compiled in so that we don't need gRPC for bootstrapping.
+ Class<?> factoryClass = Class.forName(
+ "com.google.devtools.build.lib.server.GrpcServerImpl$Factory");
RPCServer.Factory factory = (RPCServer.Factory) factoryClass.getConstructor().newInstance();
return factory.create(commandExecutor, runtime.getClock(),
startupOptions.commandPort, runtime.getServerDirectory(),
startupOptions.maxIdleSeconds);
- } catch (ReflectiveOperationException | IllegalArgumentException e) {
- throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR);
+ } catch (ReflectiveOperationException | IllegalArgumentException e) {
+ throw new AbruptExitException("gRPC server not compiled in", ExitCode.BLAZE_INTERNAL_ERROR);
+ }
+ } else {
+ return AfUnixServer.newServerWith(runtime.getClock(), commandExecutor,
+ runtime.getServerDirectory(), runtime.workspace.getWorkspace(),
+ startupOptions.maxIdleSeconds);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
new file mode 100644
index 0000000000..4444d8424e
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
@@ -0,0 +1,558 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.ShutdownMethod;
+import com.google.devtools.build.lib.server.RPCService.UnknownCommandException;
+import com.google.devtools.build.lib.unix.LocalClientSocket;
+import com.google.devtools.build.lib.unix.LocalServerSocket;
+import com.google.devtools.build.lib.unix.LocalSocketAddress;
+import com.google.devtools.build.lib.unix.NativePosixFiles;
+import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.lib.util.ThreadUtils;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.util.io.StreamMultiplexer;
+import com.google.devtools.build.lib.vfs.Path;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+/**
+ * An RPCServer server is a Java object that sits and waits for RPC requests
+ * (the sit-and-wait is implemented in {@link #serve()}). These requests
+ * arrive via UNIX file sockets. The RPCServer then calls the application
+ * (which implements ServerCommand) to handle the request. (Since the Blaze
+ * server may need to stat hundreds of directories during initialization, this
+ * is a significant speedup.) The server thread will terminate after idling
+ * for a user-specified time.
+ *
+ * Note: If you are contemplating to call into the RPCServer from
+ * within Java, consider using the {@link RPCService} class instead.
+ */
+// TODO(bazel-team): Signal handling.
+// TODO(bazel-team): Gives clients status information when the server is busy. One
+// way to do this is to put the server status in a file (pid, the current
+// target, etc) in the server directory. Alternatively, we can have a separate
+// thread taking care of the server socket and put the information into socket
+// handshakes.
+// TODO(bazel-team): Use Reporter for server-side messages.
+public final class AfUnixServer extends RPCServer {
+ private final Clock clock;
+ private final RPCService rpcService;
+ private final LocalServerSocket serverSocket;
+ private final long maxIdleMillis;
+ private final long statusCheckMillis;
+ private final Path serverDirectory;
+ private final Path workspaceDir;
+ private static final Logger LOG = Logger.getLogger(AfUnixServer.class.getName());
+ private volatile boolean lameDuck;
+
+ private static final long STATUS_CHECK_PERIOD_MILLIS = 1000 * 60; // 1 minute.
+ private static final Splitter NULLTERMINATOR_SPLITTER = Splitter.on('\0');
+
+ /**
+ * Create a new server instance. After creating the server, you can start it
+ * by calling the {@link #serve()} method.
+ *
+ * @param clock The clock to take time measurements
+ * @param rpcService The underlying service object, which takes
+ * care of dispatching to the {@link ServerCommand}
+ * instances, as requests arrive.
+ * @param maxIdleMillis The maximum time the server will wait idly.
+ * @param statusCheckPeriodMillis How long to wait between system status checks.
+ * @param serverDirectory Directory to put file socket and pid files, etc.
+ * @param workspaceDir The workspace. Used solely to ensure it persists.
+ * @throws IOException
+ */
+ public AfUnixServer(Clock clock, RPCService rpcService,
+ long maxIdleMillis, long statusCheckPeriodMillis,
+ Path serverDirectory, Path workspaceDir)
+ throws IOException {
+ super(serverDirectory);
+ this.clock = clock;
+ this.rpcService = rpcService;
+ this.maxIdleMillis = maxIdleMillis;
+ this.statusCheckMillis = statusCheckPeriodMillis;
+ this.serverDirectory = serverDirectory;
+ this.workspaceDir = workspaceDir;
+
+ this.serverSocket = openServerSocket();
+ serverSocket.setSoTimeout(Math.min(maxIdleMillis, statusCheckMillis));
+ lameDuck = false;
+ }
+
+ /**
+ * Create a new server instance. After creating the server, you can start it
+ * by calling the {@link #serve()} method.
+ *
+ * @param clock The clock to take time measurements
+ * @param rpcService The underlying service object, which takes
+ * care of dispatching to the {@link ServerCommand}
+ * instances, as requests arrive.
+ * @param maxIdleMillis The maximum time the server will wait idly.
+ * @param serverDirectory Directory to put file socket and pid files, etc.
+ * @param workspaceDir The workspace. Used solely to ensure it persists.
+ * @throws IOException
+ */
+ public AfUnixServer(Clock clock, RPCService rpcService,
+ long maxIdleMillis, Path serverDirectory, Path workspaceDir)
+ throws IOException {
+ this(clock, rpcService, maxIdleMillis, STATUS_CHECK_PERIOD_MILLIS,
+ serverDirectory, workspaceDir);
+ }
+
+
+ private final AtomicBoolean inAction = new AtomicBoolean(false);
+ private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
+ private final AtomicLong cmdNum = new AtomicLong();
+ private final Thread mainThread = Thread.currentThread();
+ private final Object interruptLock = new Object();
+
+ @Override
+ public void interrupt() {
+ // Only interrupt during actions - otherwise we may end up setting the interrupt bit
+ // at the end of a build and responding to it at the beginning of the subsequent build.
+ synchronized (interruptLock) {
+ if (allowingInterrupt.get()) {
+ mainThread.interrupt();
+ }
+ }
+
+ if (inAction.get()) {
+ Runnable interruptWatcher =
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long originalCmd = cmdNum.get();
+ Thread.sleep(10 * 1000);
+ if (inAction.get() && cmdNum.get() == originalCmd) {
+ // We're still operating on the same command.
+ // Interrupt took too long.
+ ThreadUtils.warnAboutSlowInterrupt();
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ };
+ Thread interruptWatcherThread =
+ new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ }
+ }
+
+ /**
+ * Wait on a socket for business (answer requests). Note that this
+ * method won't return until the server shuts down.
+ */
+ @Override
+ public void serve() {
+ try {
+ while (!lameDuck) {
+ try {
+ IdleServerTasks idleChecker = new IdleServerTasks(workspaceDir);
+ idleChecker.idle();
+ RequestIo requestIo;
+
+ long startTime = clock.currentTimeMillis();
+ while (true) {
+ try {
+ allowingInterrupt.set(true);
+ Socket socket = serverSocket.accept();
+ long firstContactTime = clock.currentTimeMillis();
+ requestIo = new RequestIo(socket, firstContactTime);
+ break;
+ } catch (SocketTimeoutException e) {
+ long idleTime = clock.currentTimeMillis() - startTime;
+ if (lameDuck) {
+ closeServerSocket();
+ return;
+ } else if (idleTime > maxIdleMillis
+ || (idleTime > statusCheckMillis && !idleChecker.continueProcessing(idleTime))) {
+ enterLameDuck();
+ }
+ }
+ }
+ idleChecker.busy();
+
+
+ List<String> request = null;
+ try {
+ request = extractRequest(requestIo);
+ cmdNum.incrementAndGet();
+ inAction.set(true);
+ if (request != null) {
+ executeRequest(request, requestIo);
+ }
+ } finally {
+ inAction.set(false);
+ // Don't reset interruption unless we executed a request. Otherwise this is just a
+ // ping from the client verifying our existence, in which case we should retain the
+ // interrupt status for the subsequent request.
+ if (request != null) {
+ synchronized (interruptLock) {
+ allowingInterrupt.set(false);
+ Thread.interrupted(); // clears thread interrupted status
+ }
+ }
+ requestIo.shutdown();
+ switch (rpcService.getShutdown()) {
+ case NONE:
+ break;
+
+ case CLEAN:
+ return;
+
+ case EXPUNGE:
+ disableShutdownHooks();
+ return;
+ }
+ }
+ } catch (EOFException e) {
+ LOG.info("Connection to the client lost: "
+ + e.getMessage());
+ } catch (IOException e) {
+ // Something else happened. Print a stack trace for debugging.
+ printStack(e);
+ }
+ }
+ } finally {
+ rpcService.shutdown(ShutdownMethod.CLEAN);
+ LOG.info("Logging finished");
+ }
+ }
+
+ private void closeServerSocket() {
+ LOG.info("Closing serverSocket.");
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+
+ if (!lameDuck) {
+ try {
+ getSocketPath().delete();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+ }
+
+ /**
+ * Allow one last request to be serviced.
+ */
+ private void enterLameDuck() {
+ lameDuck = true;
+ try {
+ getSocketPath().delete();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ serverSocket.setSoTimeout(1);
+ }
+
+ /**
+ * Returns the path of the socket file to be used.
+ */
+ public Path getSocketPath() {
+ return serverDirectory.getRelative("server.socket");
+ }
+
+ /**
+ * Ensures no other server is running for the current socket file. This
+ * guarantees that no two servers are running against the same output
+ * directory.
+ *
+ * @throws IOException if another server holds the lock for the socket file.
+ */
+ public static void ensureExclusiveAccess(Path socketFile) throws IOException {
+ LocalSocketAddress address =
+ new LocalSocketAddress(socketFile.getPathFile());
+ if (socketFile.exists()) {
+ try {
+ new LocalClientSocket(address).close();
+ } catch (IOException e) {
+ // The previous server process is dead--unlink the file:
+ socketFile.delete();
+ return;
+ }
+ // TODO(bazel-team): (2009) Read the previous server's pid from the "hello" message
+ // and add it to the message.
+ throw new IOException("Socket file " + socketFile.getPathString()
+ + " is locked by another server");
+ }
+ }
+
+ /**
+ * Opens a UNIX local server socket.
+ * @throws IOException if the socket file is used by another server or can
+ * not be made exclusive.
+ */
+ private LocalServerSocket openServerSocket() throws IOException {
+ // This is the "well known" socket path via which the server is found...
+ Path socketFile = getSocketPath();
+
+ // ...but it may have a name that's too long for AF_UNIX, in which case we
+ // make it a symlink to /tmp/something. This typically only happens in
+ // tests where the --output_base is beneath a very deep temp dir.
+ // (All this extra complexity is just used in tests... *sigh*).
+ if (socketFile.toString().length() >= 104) { // = UNIX_PATH_MAX
+ Path socketLink = socketFile;
+ String tmpDirDefault = System.getenv("TMPDIR");
+ if (tmpDirDefault == null
+ || tmpDirDefault.length() > 104 - "/blaze-4294967296/server.socket".length()) {
+ // Default for unset TMPDIR, or if TMPDIR is so that the resulting
+ // path would be too long.
+ tmpDirDefault = "/tmp";
+ }
+ String tmpDir = System.getProperty("blaze.rpcserver.tmpdir", tmpDirDefault);
+ socketFile = createTempSocketDirectory(socketFile.getRelative(tmpDir)).
+ getRelative("server.socket");
+ LOG.info("Using symlinked socket at " + socketFile);
+
+ socketLink.delete(); // Remove stale symlink, if any.
+ socketLink.createSymbolicLink(socketFile);
+
+ deleteAtExit(socketLink, /*deleteParent=*/false);
+ deleteAtExit(socketFile, /*deleteParent=*/true);
+ } else {
+ deleteAtExit(socketFile, /*deleteParent=*/false);
+ }
+
+ ensureExclusiveAccess(socketFile);
+
+
+ LocalServerSocket serverSocket = new LocalServerSocket();
+ serverSocket.bind(new LocalSocketAddress(socketFile.getPathFile()));
+ NativePosixFiles.chmod(socketFile.getPathFile(), 0600); // Lock it down.
+ serverSocket.listen(/*backlog=*/50);
+ return serverSocket;
+ }
+
+ // Atomically create a new directory in the (assumed sticky) /tmp directory for use with a
+ // Unix domain socket. The directory will be mode 0700. Retries indefinitely until it
+ // succeeds.
+ private static Path createTempSocketDirectory(Path tempDir) {
+ Random random = new Random();
+ while (true) {
+ Path socketDir = tempDir.getRelative(String.format("blaze-%d", random.nextInt()));
+ try {
+ if (socketDir.createDirectory()) {
+ // Make sure it's private; unfortunately, createDirectory() doesn't take a mode
+ // argument.
+ socketDir.chmod(0700);
+ return socketDir; // Created.
+ }
+ // Already existed; try again.
+ } catch (IOException e) {
+ // Failed; try again.
+ }
+ }
+ }
+
+ /**
+ * Read a string in platform default encoding and split it into a list of
+ * NUL-separated words.
+ *
+ * <p>Blaze consistently uses the platform default encoding (defined in
+ * blaze.cc) to interface with Unix APIs.
+ */
+ private static List<String> readRequest(InputStream input) throws IOException {
+ byte[] sizeBuffer = new byte[4];
+ ByteStreams.readFully(input, sizeBuffer);
+ int size = ((sizeBuffer[0] & 0xff) << 24)
+ + ((sizeBuffer[1] & 0xff) << 16)
+ + ((sizeBuffer[2] & 0xff) << 8)
+ + (sizeBuffer[3] & 0xff);
+ byte[] inputBytes = new byte[size];
+ ByteStreams.readFully(input, inputBytes);
+
+ String s = new String(inputBytes, Charset.defaultCharset());
+ return ImmutableList.copyOf(NULLTERMINATOR_SPLITTER.split(s));
+ }
+
+ private static List<String> extractRequest(RequestIo requestIo) throws IOException {
+ List<String> request = readRequest(requestIo.in);
+ if (request == null) {
+ LOG.info("Short-circuiting empty request");
+ return null;
+ }
+ return request;
+ }
+
+ private void executeRequest(List<String> request, RequestIo requestIo) {
+ Preconditions.checkNotNull(request);
+ int exitStatus = 2;
+ try {
+ exitStatus = rpcService.executeRequest(request, requestIo.requestOutErr,
+ requestIo.firstContactTime);
+ LOG.info("Finished executing request");
+ } catch (UnknownCommandException e) {
+ requestIo.requestOutErr.printErrLn("SERVER ERROR: " + e.getMessage());
+ LOG.severe("SERVER ERROR: " + e.getMessage());
+ } catch (Exception e) {
+ // Stacktrace for unknown exception.
+ StringWriter trace = new StringWriter();
+ e.printStackTrace(new PrintWriter(trace, true));
+ requestIo.requestOutErr.printErr("SERVER ERROR: " + trace);
+ LOG.severe("SERVER ERROR: " + trace);
+ }
+
+ if (rpcService.getShutdown() != ShutdownMethod.NONE) {
+ // In case of shutdown, disable the listening socket *before* we write
+ // the last part of the response. Otherwise, a sufficiently fast client
+ // could read the response and exit, and a new client could make a
+ // connection to this server, which is still in the listening state, even
+ // though it is about to shut down imminently.
+ closeServerSocket();
+ }
+
+ requestIo.writeExitStatus(exitStatus);
+ }
+
+ /**
+ * Because it's a little complicated, this class factors out all the IO Hook
+ * up we need per request, that is, in
+ * {@link AfUnixServer#executeRequest(List, RequestIo)}.
+ * It's unfortunately complicated, so it's explained here.
+ */
+ private static class RequestIo {
+
+ // Used by the client code
+ private final InputStream in;
+ private final OutErr requestOutErr;
+ private final OutputStream controlChannel;
+
+ // just used by this class to keep the state around
+ private final Socket requestSocket;
+ private final OutputStream requestOut;
+ private final long firstContactTime;
+
+ RequestIo(Socket requestSocket, long firstContactTime) throws IOException {
+ this.requestSocket = requestSocket;
+ this.firstContactTime = firstContactTime;
+ this.in = requestSocket.getInputStream();
+ this.requestOut = requestSocket.getOutputStream();
+
+ // We encode the response sent to the client with a multiplexer so
+ // we can send three streams (out / err / control) over one wire stream
+ // (requestOut).
+ StreamMultiplexer multiplexer = new StreamMultiplexer(requestOut);
+
+ // We'll be writing control messages (exit code + out of date message)
+ // to this control channel.
+ controlChannel = multiplexer.createControl();
+
+ // This is the outErr part of the multiplexed output.
+ requestOutErr = OutErr.create(multiplexer.createStdout(),
+ multiplexer.createStderr());
+ // We hook up System.out / System.err to our IO object. Stuff written to
+ // System.out / System.err will show up on the user's screen, prefixed
+ // with "System.out "/"System.err ".
+ requestOutErr.addSystemOutErrAsSource();
+ }
+
+ public void writeExitStatus(int exitStatus) {
+ // Make sure to flush the output / error streams prior to writing the exit status.
+ // The client may stop reading that direction of the socket immediately upon reading the
+ // exit code.
+ flushOutErr();
+ try {
+ controlChannel.write((exitStatus >> 24) & 0xff);
+ controlChannel.write((exitStatus >> 16) & 0xff);
+ controlChannel.write((exitStatus >> 8) & 0xff);
+ controlChannel.write(exitStatus & 0xff);
+ controlChannel.flush();
+ LOG.info("" + exitStatus);
+ } catch (IOException ignored) {
+ // This exception is historically ignored.
+ }
+ }
+
+ private void flushOutErr() {
+ try {
+ requestOutErr.getOutputStream().flush();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ requestOutErr.getErrorStream().flush();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+
+ public void shutdown() {
+ try {
+ requestOut.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ in.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ try {
+ requestSocket.close();
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+ }
+
+ /**
+ * Creates and returns a new RPC server.
+ * Use {@link AfUnixServer#serve()} to start the server.
+ *
+ * @param appCommand The application's ServerCommand implementation.
+ * @param serverDirectory The directory for server-related files. The caller
+ * must ensure the directory has been created.
+ * @param workspaceDir The workspace, used solely to ensure it persists.
+ * @param maxIdleSeconds The idle time in seconds after which the rpc
+ * server will die unless it receives a request.
+ */
+ public static AfUnixServer newServerWith(Clock clock,
+ ServerCommand appCommand,
+ Path serverDirectory,
+ Path workspaceDir,
+ int maxIdleSeconds)
+ throws IOException {
+ // Creates and starts the RPC server.
+ RPCService service = new RPCService(appCommand);
+
+ return new AfUnixServer(clock, service, maxIdleSeconds * 1000L,
+ serverDirectory, workspaceDir);
+ }
+
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 6e1555ca00..6d22550e8a 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -40,8 +40,6 @@ import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.security.SecureRandom;
@@ -50,10 +48,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
import javax.annotation.concurrent.GuardedBy;
/**
@@ -62,7 +57,7 @@ import javax.annotation.concurrent.GuardedBy;
* <p>Only this class should depend on gRPC so that we only need to exclude this during
* bootstrapping.
*/
-public class GrpcServerImpl implements RPCServer {
+public class GrpcServerImpl extends RPCServer {
// UTF-8 won't do because we want to be able to pass arbitrary binary strings.
// Not that the internals of Bazel handle that correctly, but why not make at least this little
// part correct?
@@ -70,8 +65,6 @@ public class GrpcServerImpl implements RPCServer {
private static final long NANOSECONDS_IN_MS = TimeUnit.MILLISECONDS.toNanos(1);
- private static final Logger LOG = Logger.getLogger(RPCServer.class.getName());
-
private class RunningCommand implements AutoCloseable {
private final Thread thread;
private final String id;
@@ -152,8 +145,6 @@ public class GrpcServerImpl implements RPCServer {
private static final String REQUEST_COOKIE_FILE = "request_cookie";
private static final String RESPONSE_COOKIE_FILE = "response_cookie";
- private static final AtomicBoolean runShutdownHooks = new AtomicBoolean(true);
-
@GuardedBy("runningCommands")
private final Map<String, RunningCommand> runningCommands = new HashMap<>();
private final CommandExecutor commandExecutor;
@@ -170,14 +161,7 @@ public class GrpcServerImpl implements RPCServer {
public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
Path serverDirectory, int maxIdleSeconds) throws IOException {
- // server.pid was written in the C++ launcher after fork() but before exec() .
- // The client only accesses the pid file after connecting to the socket
- // which ensures that it gets the correct pid value.
- Path pidFile = serverDirectory.getRelative("server.pid.txt");
- Path pidSymlink = serverDirectory.getRelative("server.pid");
- deleteAtExit(pidFile, /*deleteParent=*/ false);
- deleteAtExit(pidSymlink, /*deleteParent=*/ false);
-
+ super(serverDirectory);
this.commandExecutor = commandExecutor;
this.clock = clock;
this.serverDirectory = serverDirectory;
@@ -331,46 +315,6 @@ public class GrpcServerImpl implements RPCServer {
}
- protected void disableShutdownHooks() {
- runShutdownHooks.set(false);
- }
-
- /**
- * Schedule the specified file for (attempted) deletion at JVM exit.
- */
- protected static void deleteAtExit(final Path path, final boolean deleteParent) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- if (!runShutdownHooks.get()) {
- return;
- }
-
- try {
- path.delete();
- if (deleteParent) {
- path.getParentDirectory().delete();
- }
- } catch (IOException e) {
- printStack(e);
- }
- }
- });
- }
-
- static void printStack(IOException e) {
- /*
- * Hopefully this never happens. It's not very nice to just write this
- * to the user's console, but I'm not sure what better choice we have.
- */
- StringWriter err = new StringWriter();
- PrintWriter printErr = new PrintWriter(err);
- printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]=======");
- e.printStackTrace(printErr);
- printErr.println("=====================================================");
- LOG.severe(err.toString());
- }
-
private final CommandServerGrpc.CommandServerImplBase commandServer =
new CommandServerGrpc.CommandServerImplBase() {
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
index 4880349e15..40ccdeb915 100644
--- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
@@ -16,35 +16,86 @@ package com.google.devtools.build.lib.server;
import com.google.devtools.build.lib.runtime.CommandExecutor;
import com.google.devtools.build.lib.util.Clock;
import com.google.devtools.build.lib.vfs.Path;
-
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
/**
- * A Bazel server instance.
- *
- * <p>Even though it only has one implementation, that implementation cannot be compiled during
- * bootstrapping Bazel because it depends on the gRPC Java stubs, so we add a layer of abstraction
- * so that we can still use its functionality without resorting to reflection every time.
+ * A server instance. Can either an AF_UNIX or a gRPC one.
*/
-public interface RPCServer {
+public abstract class RPCServer {
+ private static final Logger LOG = Logger.getLogger(RPCServer.class.getName());
+ private static AtomicBoolean runShutdownHooks = new AtomicBoolean(true);
/**
* Factory class for the gRPC server.
*
* Present so that we don't need to invoke a constructor with multiple arguments by reflection.
*/
- interface Factory {
+ public interface Factory {
RPCServer create(CommandExecutor commandExecutor, Clock clock, int port, Path serverDirectory,
int maxIdleSeconds) throws IOException;
}
+ protected RPCServer(Path serverDirectory) throws IOException {
+ // server.pid was written in the C++ launcher after fork() but before exec() .
+ // The client only accesses the pid file after connecting to the socket
+ // which ensures that it gets the correct pid value.
+ Path pidFile = serverDirectory.getRelative("server.pid.txt");
+ Path pidSymlink = serverDirectory.getRelative("server.pid");
+ RPCServer.deleteAtExit(pidFile, /*deleteParent=*/ false);
+ RPCServer.deleteAtExit(pidSymlink, /*deleteParent=*/ false);
+ }
+
+ protected void disableShutdownHooks() {
+ runShutdownHooks.set(false);
+ }
+
+ /**
+ * Schedule the specified file for (attempted) deletion at JVM exit.
+ */
+ protected static void deleteAtExit(final Path path, final boolean deleteParent) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ if (!runShutdownHooks.get()) {
+ return;
+ }
+
+ try {
+ path.delete();
+ if (deleteParent) {
+ path.getParentDirectory().delete();
+ }
+ } catch (IOException e) {
+ printStack(e);
+ }
+ }
+ });
+ }
+
+ static void printStack(IOException e) {
+ /*
+ * Hopefully this never happens. It's not very nice to just write this
+ * to the user's console, but I'm not sure what better choice we have.
+ */
+ StringWriter err = new StringWriter();
+ PrintWriter printErr = new PrintWriter(err);
+ printErr.println("=======[BLAZE SERVER: ENCOUNTERED IO EXCEPTION]=======");
+ e.printStackTrace(printErr);
+ printErr.println("=====================================================");
+ LOG.severe(err.toString());
+ }
+
/**
* Start serving and block until the a shutdown command is received.
*/
- void serve() throws IOException;
+ public abstract void serve() throws IOException;
/**
* Called when the server receives a SIGINT.
*/
- void interrupt();
+ public abstract void interrupt();
}
diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java
new file mode 100644
index 0000000000..10335d1a93
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/unix/LocalClientSocket.java
@@ -0,0 +1,117 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.unix;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+
+/**
+ * <p>An implementation of client Socket for local (AF_UNIX) sockets.
+ *
+ * <p>This class intentionally doesn't extend java.net.Socket although it
+ * has some similarity to it. The java.net class hierarchy is a terrible mess
+ * and is inextricably coupled to the Internet Protocol.
+ *
+ * <p>This code is not intended to be portable to non-UNIX platforms.
+ */
+public class LocalClientSocket extends LocalSocket {
+
+ /**
+ * Constructs an unconnected local client socket.
+ *
+ * @throws IOException if the socket could not be created.
+ */
+ public LocalClientSocket() throws IOException {
+ super();
+ }
+
+ /**
+ * Constructs a client socket and connects it to the specified address.
+ *
+ * @throws IOException if either of the socket/connect operations failed.
+ */
+ public LocalClientSocket(LocalSocketAddress address) throws IOException {
+ super();
+ connect(address);
+ }
+
+ /**
+ * Connect to the specified server. Blocks until the server accepts the
+ * connection.
+ *
+ * @throws IOException if the connection failed.
+ */
+ public synchronized void connect(LocalSocketAddress address)
+ throws IOException {
+ checkNotClosed();
+ if (state == State.CONNECTED) {
+ throw new SocketException("socket is already connected");
+ }
+ connect(fd, address.getName().toString()); // JNI
+ this.address = address;
+ this.state = State.CONNECTED;
+ }
+
+ /**
+ * Returns the input stream for reading from the server.
+ *
+ * @param closeSocket close the socket when this input stream is closed.
+ * @throws IOException if there was a problem.
+ */
+ public synchronized InputStream getInputStream(final boolean closeSocket) throws IOException {
+ checkConnected();
+ checkInputNotShutdown();
+ return new FileInputStream(fd) {
+ @Override
+ public void close() throws IOException {
+ if (closeSocket) {
+ LocalClientSocket.this.close();
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns the input stream for reading from the server.
+ *
+ * @throws IOException if there was a problem.
+ */
+ public synchronized InputStream getInputStream() throws IOException {
+ return getInputStream(false);
+ }
+
+ /**
+ * Returns the output stream for writing to the server.
+ *
+ * @throws IOException if there was a problem.
+ */
+ public synchronized OutputStream getOutputStream() throws IOException {
+ checkConnected();
+ checkOutputNotShutdown();
+ return new FileOutputStream(fd) {
+ @Override public void close() {
+ // Don't close the file descriptor.
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return "LocalClientSocket(" + address + ")";
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java
new file mode 100644
index 0000000000..0c0bd22c3f
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/unix/LocalServerSocket.java
@@ -0,0 +1,173 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.unix;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+/**
+ * <p>An implementation of ServerSocket for local (AF_UNIX) sockets.
+ *
+ * <p>This class intentionally doesn't extend java.net.ServerSocket although it
+ * has some similarity to it. The java.net class hierarchy is a terrible mess
+ * and is inextricably coupled to the Internet Protocol.
+ *
+ * <p>This code is not intended to be portable to non-UNIX platforms.
+ */
+public class LocalServerSocket extends LocalSocket {
+
+ // Socket timeout in milliseconds. No timeout by default.
+ private long soTimeoutMillis = 0;
+
+ /**
+ * Constructs an unbound local server socket.
+ */
+ public LocalServerSocket() throws IOException {
+ super();
+ }
+
+ /**
+ * Constructs a server socket, binds it to the specified address, and
+ * listens for incoming connections with the specified backlog.
+ *
+ * @throws IOException if any of the socket/bind/listen operations failed.
+ */
+ public LocalServerSocket(LocalSocketAddress address, int backlog)
+ throws IOException {
+ this();
+ bind(address);
+ listen(backlog);
+ }
+
+ /**
+ * Constructs a server socket, binds it to the specified address, and begin
+ * listening for incoming connections using the default backlog.
+ *
+ * @throws IOException if any of the socket/bind/listen operations failed.
+ */
+ public LocalServerSocket(LocalSocketAddress address) throws IOException {
+ this(address, 50);
+ }
+
+ /**
+ * Specifies the timeout in milliseconds for accept(). Setting it to
+ * zero means an indefinite timeout.
+ */
+ public void setSoTimeout(long timeoutMillis) {
+ soTimeoutMillis = timeoutMillis;
+ }
+
+ /**
+ * Returns the current timeout in milliseconds.
+ */
+ public long getSoTimeout() {
+ return soTimeoutMillis;
+ }
+
+ /**
+ * Binds the specified address to this socket. The socket must be unbound.
+ * This causes the filesystem entry to appear.
+ *
+ * @throws IOException if the bind failed.
+ */
+ public synchronized void bind(LocalSocketAddress address)
+ throws IOException {
+ if (address == null) {
+ throw new NullPointerException("address");
+ }
+ checkNotClosed();
+ if (state != State.NEW) {
+ throw new SocketException("socket is already bound to an address");
+ }
+ bind(fd, address.getName().toString()); // JNI
+ this.address = address;
+ this.state = State.BOUND;
+ }
+
+ /**
+ * Listen for incoming connections on a socket using the specfied backlog.
+ * The socket must be bound but not already listening.
+ *
+ * @throws IOException if the listen failed.
+ */
+ public synchronized void listen(int backlog) throws IOException {
+ if (backlog < 1) {
+ throw new IllegalArgumentException("backlog=" + backlog);
+ }
+ checkNotClosed();
+ if (address == null) {
+ throw new SocketException("socket has no address bound");
+ }
+ if (state == State.LISTENING) {
+ throw new SocketException("socket is already listening");
+ }
+ listen(fd, backlog); // JNI
+ this.state = State.LISTENING;
+ }
+
+ /**
+ * Blocks until a connection is made to this socket and accepts it, returning
+ * a new socket connected to the client.
+ *
+ * @return the new socket connected to the client.
+ * @throws IOException if an error occurs when waiting for a connection.
+ * @throws SocketTimeoutException if a timeout was previously set with
+ * setSoTimeout and the timeout has been reached.
+ * @throws InterruptedIOException if the thread is interrupted when the
+ * method is blocked.
+ */
+ public synchronized Socket accept()
+ throws IOException, SocketTimeoutException, InterruptedIOException {
+ if (state != State.LISTENING) {
+ throw new SocketException("socket is not in listening state");
+ }
+
+ // Throws a SocketTimeoutException if timeout.
+ if (soTimeoutMillis != 0) {
+ poll(fd, soTimeoutMillis); // JNI
+ }
+
+ FileDescriptor clientFd = new FileDescriptor();
+ accept(fd, clientFd); // JNI
+ final LocalSocketImpl impl = new LocalSocketImpl(clientFd);
+ return new Socket(impl) {
+ @Override
+ public boolean isConnected() {
+ return true;
+ }
+ @Override
+ public synchronized void close() throws IOException {
+ if (isClosed()) {
+ return;
+ } else {
+ super.close();
+ // Workaround for the fact that super.created==false because we
+ // created the impl ourselves. As a result, super.close() doesn't
+ // call impl.close(). *Sigh*, java.net is horrendous.
+ // (Perhaps we should dispense with Socket/SocketImpl altogether?)
+ impl.close();
+ }
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return "LocalServerSocket(" + address + ")";
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java
new file mode 100644
index 0000000000..d51f2bad36
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/unix/LocalSocket.java
@@ -0,0 +1,216 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.unix;
+
+import com.google.devtools.build.lib.UnixJniLoader;
+import java.io.Closeable;
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+/**
+ * Abstract superclass for client and server local sockets.
+ */
+abstract class LocalSocket implements Closeable {
+
+ protected enum State {
+ NEW,
+ BOUND, // server only
+ LISTENING, // server only
+ CONNECTED, // client only
+ CLOSED,
+ }
+
+ protected LocalSocketAddress address = null;
+ protected FileDescriptor fd = new FileDescriptor();
+ protected State state;
+ protected boolean inputShutdown = false;
+ protected boolean outputShutdown = false;
+
+ /**
+ * Constructs an unconnected local socket.
+ */
+ protected LocalSocket() throws IOException {
+ socket(fd);
+ if (!fd.valid()) {
+ throw new IOException("Couldn't create socket!");
+ }
+ this.state = State.NEW;
+ }
+
+ /**
+ * Returns the address of the endpoint this socket is bound to.
+ *
+ * @return a <code>SocketAddress</code> representing the local endpoint of
+ * this socket.
+ */
+ public LocalSocketAddress getLocalSocketAddress() {
+ return address;
+ }
+
+ /**
+ * Closes this socket. This operation is idempotent.
+ *
+ * To be consistent with Java Socket, the shutdown states of the socket are
+ * not changed. This makes it easier to port applications between Socket and
+ * LocalSocket.
+ *
+ * @throws IOException if an I/O error occurred when closing the socket.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (state == State.CLOSED) {
+ return;
+ }
+ // Closes the file descriptor if it has not been closed by the
+ // input/output streams.
+ if (!fd.valid()) {
+ throw new IllegalStateException("LocalSocket.close(-1)");
+ }
+ close(fd);
+ if (fd.valid()) {
+ throw new IllegalStateException("LocalSocket.close() did not set fd to -1");
+ }
+ this.state = State.CLOSED;
+ }
+
+ /**
+ * Returns the closed state of the ServerSocket.
+ *
+ * @return true if the socket has been closed
+ */
+ public synchronized boolean isClosed() {
+ // If the file descriptor has been closed by the input/output
+ // streams, marks the socket as closed too.
+ return state == State.CLOSED;
+ }
+
+ /**
+ * Returns the connected state of the ClientSocket.
+ *
+ * @return true if the socket is currently connected.
+ */
+ public synchronized boolean isConnected() {
+ return state == State.CONNECTED;
+ }
+
+ protected synchronized void checkConnected() throws SocketException {
+ if (!isConnected()) {
+ throw new SocketException("Transport endpoint is not connected");
+ }
+ }
+
+ protected synchronized void checkNotClosed() throws SocketException {
+ if (isClosed()) {
+ throw new SocketException("socket is closed");
+ }
+ }
+
+ /**
+ * Returns the shutdown state of the input channel.
+ *
+ * @return true is the input channel of the socket is shutdown.
+ */
+ public synchronized boolean isInputShutdown() {
+ return inputShutdown;
+ }
+
+ /**
+ * Returns the shutdown state of the output channel.
+ *
+ * @return true is the input channel of the socket is shutdown.
+ */
+ public synchronized boolean isOutputShutdown() {
+ return outputShutdown;
+ }
+
+ protected synchronized void checkInputNotShutdown() throws SocketException {
+ if (isInputShutdown()) {
+ throw new SocketException("Socket input is shutdown");
+ }
+ }
+
+ protected synchronized void checkOutputNotShutdown() throws SocketException {
+ if (isOutputShutdown()) {
+ throw new SocketException("Socket output is shutdown");
+ }
+ }
+
+ static final int SHUT_RD = 0; // Mapped to BSD SHUT_RD in JNI.
+ static final int SHUT_WR = 1; // Mapped to BSD SHUT_WR in JNI.
+
+ public synchronized void shutdownInput() throws IOException {
+ checkNotClosed();
+ checkConnected();
+ checkInputNotShutdown();
+ inputShutdown = true;
+ shutdown(fd, SHUT_RD);
+ }
+
+ public synchronized void shutdownOutput() throws IOException {
+ checkNotClosed();
+ checkConnected();
+ checkOutputNotShutdown();
+ outputShutdown = true;
+ shutdown(fd, SHUT_WR);
+ }
+
+ ////////////////////////////////////////////////////////////////////////
+ // JNI:
+
+ static {
+ UnixJniLoader.loadJni();
+ }
+
+ // The native calls below are thin wrappers around linux system calls. The
+ // semantics remains the same except for poll(). See the comments for the
+ // method.
+ //
+ // Note: FileDescriptor is a box for a mutable integer that is visible only
+ // to native code.
+
+ // Generic operations:
+ protected static native void socket(FileDescriptor server)
+ throws IOException;
+ static native void close(FileDescriptor server)
+ throws IOException;
+ /**
+ * Shut down part of a full-duplex connection
+ * @param code Must be either SHUT_RD or SHUT_WR
+ */
+ static native void shutdown(FileDescriptor fd, int code)
+ throws IOException;
+
+ /**
+ * This method checks waits for the given file descriptor to become available for read.
+ * If timeoutMillis passed and there is no activity, a SocketTimeoutException will be thrown.
+ */
+ protected static native void poll(FileDescriptor read, long timeoutMillis)
+ throws IOException, SocketTimeoutException, InterruptedIOException;
+
+ // Server operations:
+ protected static native void bind(FileDescriptor server, String filename)
+ throws IOException;
+ protected static native void listen(FileDescriptor server, int backlog)
+ throws IOException;
+ protected static native void accept(FileDescriptor server,
+ FileDescriptor client)
+ throws IOException;
+
+ // Client operations:
+ protected static native void connect(FileDescriptor client, String filename)
+ throws IOException;
+}
diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java
new file mode 100644
index 0000000000..f9b9d43f06
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketAddress.java
@@ -0,0 +1,56 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.unix;
+
+import java.io.File;
+import java.net.SocketAddress;
+
+/**
+ * An implementation of SocketAddress for naming local sockets, i.e. files in
+ * the UNIX file system.
+ */
+public class LocalSocketAddress extends SocketAddress {
+
+ private final File name;
+
+ /**
+ * Constructs a SocketAddress for the specified file.
+ */
+ public LocalSocketAddress(File name) {
+ this.name = name;
+ }
+
+ /**
+ * Returns the filename of this local socket address.
+ */
+ public File getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return name.toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof LocalSocketAddress &&
+ ((LocalSocketAddress) other).name.equals(this.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java
new file mode 100644
index 0000000000..a30b450dc1
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/unix/LocalSocketImpl.java
@@ -0,0 +1,171 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.unix;
+
+import com.google.devtools.build.lib.UnixJniLoader;
+import com.google.devtools.build.lib.util.OS;
+
+import java.io.Closeable;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+import java.net.SocketImpl;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A simple implementation of SocketImpl for sockets that wrap a UNIX
+ * file-descriptor. This SocketImpl assumes that the socket is already
+ * created, bound, connected and supports no socket options or out-of-band
+ * features. This is used to implement server-side accepted client sockets
+ * (i.e. those returned by {@link LocalServerSocket#accept}).
+ */
+class LocalSocketImpl extends SocketImpl {
+ private static final Logger logger =
+ Logger.getLogger(LocalSocketImpl.class.getName());
+
+ static {
+ UnixJniLoader.loadJni();
+ if (OS.getCurrent() != OS.WINDOWS) {
+ init();
+ }
+ }
+
+ // The logic here is a little twisted, to support JDK7 and JDK8.
+
+ // 1) In JDK7, the FileDescriptor class keeps a reference count of
+ // instances using the fd, and closes it when it goes to 0. The
+ // reference count is only decremented by the finalizer for a
+ // given class. When the call to close() happens, the fd is
+ // closed regardless of the current state of the refcount.
+ //
+ // 2) In JDK8, every instance that uses the fd registers a Closeable
+ // with the FileDescriptor. Since the FileDescriptor has a
+ // reference to every user, only when all of the users and the
+ // FileDescriptor get GC'd does the finalizer run. An explicit
+ // call to close() calls FileDescriptor.closeAll(), which
+ // force-closes all of the users.
+
+ // So, in our case:
+
+ // 1) ref() increments the refcount in JDK7, and registers with the
+ // FD in JDK8.
+
+ // 2) unref() decrements the refcount in JDK7, and does nothing in
+ // JDK8.
+
+ // 3) The finalizer decrements the refcount in JDK7, and simply
+ // calls close() in JDK8 (where we don't have to worry about
+ // multiple live users of the FD). The close() method itself is
+ // idempotent.
+
+ // 4) close() calls fd.closeAll in JDK8, which, in turn, calls
+ // closer.close(). In JDK7, close() calls closer.close()
+ // explicitly.
+ private static native void init();
+ private static native void ref(FileDescriptor fd, Closeable closeable);
+ private static native boolean unref(FileDescriptor fd);
+ private static native boolean close0(FileDescriptor fd, Closeable closeable);
+
+ private final boolean isInitialized;
+ private final Closeable closer = new Closeable() {
+ AtomicBoolean isClosed = new AtomicBoolean(false);
+ @Override public void close() throws IOException {
+ if (isClosed.compareAndSet(false, true)) {
+ LocalSocket.close(fd);
+ }
+ }
+ };
+
+ // Note to callers: if you pass a FD into this constructor, this
+ // instance is now responsible for closing it (in the sense of
+ // LocalSocket.close()). If some other instance tries to close it,
+ // then terrible things will happen.
+ LocalSocketImpl(FileDescriptor fd) {
+ this.fd = fd; // (inherited field)
+ ref(fd, closer);
+ isInitialized = true;
+ }
+
+ @Override protected void finalize() {
+ try {
+ if (isInitialized) {
+ if (!unref(fd)) {
+ // JDK8 codepath
+ close0(fd, closer);
+ }
+ }
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Unable to access FileDescriptor class - " +
+ "may cause a file descriptor leak", e);
+ }
+ }
+ @Override protected InputStream getInputStream() {
+ return new FileInputStream(getFileDescriptor());
+ }
+ @Override protected OutputStream getOutputStream() {
+ return new FileOutputStream(getFileDescriptor());
+ }
+ @Override protected void close() throws IOException {
+ if (fd.valid()) {
+ if (!close0(fd, closer)) {
+ // JDK7 codepath
+ closer.close();
+ }
+ }
+ }
+
+ // Unused:
+ @Override
+ public void setOption(int optID, Object value) {
+ throw new UnsupportedOperationException("setOption");
+ }
+ @Override
+ public Object getOption(int optID) {
+ throw new UnsupportedOperationException("getOption");
+ }
+ @Override protected void create(boolean stream) {
+ throw new UnsupportedOperationException("create");
+ }
+ @Override protected void connect(String host, int port) {
+ throw new UnsupportedOperationException("connect");
+ }
+ @Override protected void connect(InetAddress address, int port) {
+ throw new UnsupportedOperationException("connect2");
+ }
+ @Override protected void connect(SocketAddress address, int timeout) {
+ throw new UnsupportedOperationException("connect3");
+ }
+ @Override protected void bind(InetAddress host, int port) {
+ throw new UnsupportedOperationException("bind");
+ }
+ @Override protected void listen(int backlog) {
+ throw new UnsupportedOperationException("listen");
+ }
+ @Override protected void accept(SocketImpl s) {
+ throw new UnsupportedOperationException("accept");
+ }
+ @Override protected int available() {
+ throw new UnsupportedOperationException("available");
+ }
+ @Override protected void sendUrgentData(int i) {
+ throw new UnsupportedOperationException("sendUrgentData");
+ }
+}
diff --git a/src/main/native/BUILD b/src/main/native/BUILD
index 3df6f78f12..8780377ef4 100644
--- a/src/main/native/BUILD
+++ b/src/main/native/BUILD
@@ -36,6 +36,7 @@ filegroup(
cc_binary(
name = "libunix.so",
srcs = [
+ "localsocket.cc",
"macros.h",
"process.cc",
"unix_jni.cc",
diff --git a/src/main/native/localsocket.cc b/src/main/native/localsocket.cc
new file mode 100644
index 0000000000..3731619e52
--- /dev/null
+++ b/src/main/native/localsocket.cc
@@ -0,0 +1,312 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <jni.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <poll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <string>
+
+#include "src/main/native/unix_jni.h"
+
+// Returns the field ID for FileDescriptor.fd.
+static jfieldID GetFileDescriptorField(JNIEnv *env) {
+ // See http://java.sun.com/docs/books/jni/html/fldmeth.html#26855
+ static jclass fd_class = NULL;
+ if (fd_class == NULL) { /* note: harmless race condition */
+ jclass local = env->FindClass("java/io/FileDescriptor");
+ CHECK(local != NULL);
+ fd_class = static_cast<jclass>(env->NewGlobalRef(local));
+ }
+ static jfieldID fieldId = NULL;
+ if (fieldId == NULL) { /* note: harmless race condition */
+ fieldId = env->GetFieldID(fd_class, "fd", "I");
+ CHECK(fieldId != NULL);
+ }
+ return fieldId;
+}
+
+// Returns the UNIX filedescriptor from a java.io.FileDescriptor instance.
+static jint GetUnixFileDescriptor(JNIEnv *env, jobject fd_obj) {
+ return env->GetIntField(fd_obj, GetFileDescriptorField(env));
+}
+
+// Sets the UNIX filedescriptor of a java.io.FileDescriptor instance.
+static void SetUnixFileDescriptor(JNIEnv *env, jobject fd_obj, jint fd) {
+ env->SetIntField(fd_obj, GetFileDescriptorField(env), fd);
+}
+
+/*
+ * Class: com.google.devtools.build.lib.unix.LocalSocket
+ * Method: socket
+ * Signature: (Ljava/io/FileDescriptor;)V
+ */
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocket_socket(JNIEnv *env,
+ jclass clazz,
+ jobject fd_sock) {
+ int sock;
+ if ((sock = ::socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ ::PostException(env, errno, ::ErrorMessage(errno));
+ return;
+ }
+ SetUnixFileDescriptor(env, fd_sock, sock);
+}
+
+// Initialize "addr" from "name_chars", reporting error and returning
+// false on failure.
+static bool InitializeSockaddr(JNIEnv *env,
+ struct sockaddr_un *addr,
+ const char* name_chars) {
+ memset(addr, 0, sizeof *addr);
+ addr->sun_family = AF_UNIX;
+ // Note: UNIX_PATH_MAX is quite small!
+ if (strlen(name_chars) >= sizeof(addr->sun_path)) {
+ ::PostFileException(env, ENAMETOOLONG, name_chars);
+ return false;
+ }
+ strcpy((char*) &addr->sun_path, name_chars);
+ return true;
+}
+
+/*
+ * Class: com.google.devtools.build.lib.unix.LocalSocket
+ * Method: bind
+ * Signature: (Ljava/io/FileDescriptor;Ljava/lang/String;)V
+ */
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocket_bind(JNIEnv *env,
+ jclass clazz,
+ jobject fd_svr,
+ jstring name) {
+ int svr_sock = GetUnixFileDescriptor(env, fd_svr);
+ const char* name_chars = env->GetStringUTFChars(name, NULL);
+ struct sockaddr_un addr;
+ if (InitializeSockaddr(env, &addr, name_chars) &&
+ ::bind(svr_sock, (struct sockaddr *) &addr, sizeof addr) < 0) {
+ ::PostException(env, errno, ::ErrorMessage(errno));
+ }
+ env->ReleaseStringUTFChars(name, name_chars);
+}
+
+/*
+ * Class: com.google.devtools.build.lib.unix.LocalSocket
+ * Method: listen
+ * Signature: (Ljava/io/FileDescriptor;I)V
+ */
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocket_listen(JNIEnv *env,
+ jclass clazz,
+ jobject fd_svr,
+ jint backlog) {
+ int svr_sock = GetUnixFileDescriptor(env, fd_svr);
+ if (::listen(svr_sock, backlog) < 0) {
+ ::PostException(env, errno, ::ErrorMessage(errno));
+ }
+}
+
+/*
+ * Class: com.google.devtools.build.lib.unix.LocalSocket
+ * Method: select
+ * Signature: (L[java/io/FileDescriptor;[java/io/FileDescriptor;[java/io/FileDescriptor;J)Ljava/io/FileDescriptor
+ */
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocket_poll(JNIEnv *env,
+ jclass clazz,
+ jobject rfds_svr,
+ jlong timeoutMillis) {
+ // TODO(bazel-team): Handle Unix signals, namely SIGTERM.
+
+ // Copy Java FD into pollfd
+ pollfd pollfd;
+ pollfd.fd = GetUnixFileDescriptor(env, rfds_svr);
+ pollfd.events = POLLIN;
+ pollfd.revents = 0;
+
+ int count = poll(&pollfd, 1, timeoutMillis);
+ if (count == 0) {
+ // throws a timeout exception.
+ ::PostException(env, ETIMEDOUT, ::ErrorMessage(ETIMEDOUT));
+ } else if (count < 0) {
+ ::PostException(env, errno, ::ErrorMessage(errno));
+ }
+}
+
+/*
+ * Class: com.google.devtools.build.lib.unix.LocalSocket
+ * Method: accept
+ * Signature: (Ljava/io/FileDescriptor;Ljava/io/FileDescriptor;)V
+ */
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocket_accept(JNIEnv *env,
+ jclass clazz,
+ jobject fd_svr,
+ jobject fd_cli) {
+ int svr_sock = GetUnixFileDescriptor(env, fd_svr);
+ int cli_sock;
+ if ((cli_sock = ::accept(svr_sock, NULL, NULL)) < 0) {
+ ::PostException(env, errno, ::ErrorMessage(errno));
+ return;
+ }
+ SetUnixFileDescriptor(env, fd_cli, cli_sock);
+}
+
+/*
+ * Class: com.google.devtools.build.lib.unix.LocalSocket
+ * Method: close
+ * Signature: (Ljava/io/FileDescriptor;)V
+ */
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocket_close(JNIEnv *env,
+ jclass clazz,
+ jobject fd_svr) {
+ int svr_sock = GetUnixFileDescriptor(env, fd_svr);
+ if (::close(svr_sock) < 0) {
+ ::PostException(env, errno, ::ErrorMessage(errno));
+ }
+ SetUnixFileDescriptor(env, fd_svr, -1);
+}
+
+/*
+ * Class: com.google.devtools.build.lib.unix.LocalSocket
+ * Method: connect
+ * Signature: (Ljava/io/FileDescriptor;Ljava/lang/String;)V
+ */
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocket_connect(JNIEnv *env,
+ jclass clazz,
+ jobject fd_cli,
+ jstring name) {
+ const char* name_chars = env->GetStringUTFChars(name, NULL);
+ jint cli_sock = GetUnixFileDescriptor(env, fd_cli);
+ if (cli_sock == -1) {
+ ::PostFileException(env, ENOTSOCK, name_chars);
+ } else {
+ struct sockaddr_un addr;
+ if (InitializeSockaddr(env, &addr, name_chars)) {
+ if (::connect(cli_sock, (struct sockaddr *) &addr, sizeof addr) < 0) {
+ ::PostException(env, errno, ::ErrorMessage(errno));
+ }
+ }
+ }
+ env->ReleaseStringUTFChars(name, name_chars);
+}
+
+/*
+ * Class: com.google.devtools.build.lib.unix.LocalSocket
+ * Method: shutdown()
+ * Signature: (Ljava/io/FileDescriptor;I)V
+ * Parameters: code: 0 to shutdown input and 1 to shutdown output.
+ */
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocket_shutdown(JNIEnv *env,
+ jclass clazz,
+ jobject fd,
+ jint code) {
+ int action;
+ if (code == 0) {
+ action = SHUT_RD;
+ } else {
+ CHECK(code == 1);
+ action = SHUT_WR;
+ }
+
+ int sock = GetUnixFileDescriptor(env, fd);
+ if (shutdown(sock, action) < 0) {
+ ::PostException(env, errno, ::ErrorMessage(errno));
+ }
+}
+
+// TODO(bazel-team): These methods were removed in JDK8, so they
+// can be removed when we are no longer using JDK7. See note in
+// LocalSocketImpl.
+static jmethodID increment_use_count_;
+static jmethodID decrement_use_count_;
+
+// >=JDK8
+static jmethodID fd_attach_;
+static jmethodID fd_close_all_;
+
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocketImpl_init(JNIEnv *env, jclass ignored) {
+ jclass cls = env->FindClass("java/io/FileDescriptor");
+ if (cls == NULL) {
+ cls = env->FindClass("java/lang/NoClassDefFoundError");
+ env->ThrowNew(cls, "FileDescriptor class not found");
+ return;
+ }
+
+ // JDK7
+ increment_use_count_ =
+ env->GetMethodID(cls, "incrementAndGetUseCount", "()I");
+ if (increment_use_count_ != NULL) {
+ decrement_use_count_ =
+ env->GetMethodID(cls, "decrementAndGetUseCount", "()I");
+ } else {
+ // JDK8
+ env->ExceptionClear(); // The pending exception from increment_use_count_
+
+ fd_attach_ = env->GetMethodID(cls, "attach", "(Ljava/io/Closeable;)V");
+ fd_close_all_ = env->GetMethodID(cls, "closeAll", "(Ljava/io/Closeable;)V");
+
+ if (fd_attach_ == NULL || fd_close_all_ == NULL) {
+ cls = env->FindClass("java/lang/NoSuchMethodError");
+ env->ThrowNew(cls, "FileDescriptor methods not found");
+ return;
+ }
+ }
+}
+
+extern "C" JNIEXPORT void JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocketImpl_ref(JNIEnv *env, jclass clazz,
+ jobject fd, jobject closer) {
+ if (increment_use_count_ != NULL) {
+ env->CallIntMethod(fd, increment_use_count_);
+ }
+
+ if (fd_attach_ != NULL) {
+ env->CallVoidMethod(fd, fd_attach_, closer);
+ }
+}
+
+extern "C" JNIEXPORT jboolean JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocketImpl_unref(JNIEnv *env, jclass clazz,
+ jobject fd) {
+ if (decrement_use_count_ != NULL) {
+ env->CallIntMethod(fd, decrement_use_count_);
+ return true;
+ }
+ return false;
+}
+
+extern "C" JNIEXPORT jboolean JNICALL
+Java_com_google_devtools_build_lib_unix_LocalSocketImpl_close0(JNIEnv *env, jclass clazz,
+ jobject fd,
+ jobject closeable) {
+ if (fd_close_all_ != NULL) {
+ env->CallVoidMethod(fd, fd_close_all_, closeable);
+ return true;
+ }
+ // This will happen if fd_close_all_ is NULL, which means we are running in
+ // <=JDK7, which means that the caller needs to invoke close() explicitly.
+ return false;
+}
diff --git a/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java
new file mode 100644
index 0000000000..58948e1720
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/server/AfUnixServerTest.java
@@ -0,0 +1,135 @@
+// Copyright 2015 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
+import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.ShutdownMethod;
+import com.google.devtools.build.lib.testutil.Suite;
+import com.google.devtools.build.lib.testutil.TestSpec;
+import com.google.devtools.build.lib.util.JavaClock;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.build.lib.util.io.RecordingOutErr;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.util.FsApparatus;
+import java.io.File;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Run a real RPC server on localhost, and talk to it using the testing
+ * client.
+ */
+@TestSpec(size = Suite.MEDIUM_TESTS)
+@RunWith(JUnit4.class)
+public class AfUnixServerTest {
+
+ private static final long MAX_IDLE_MILLIS = 10000;
+ private static final long HEALTH_CHECK_MILLIS = 1000 * 3;
+ private static final String COMMAND_STDOUT = "Heelllloo....";
+ private static final String COMMAND_STDERR = "...world!";
+
+ private AfUnixServer server;
+ private FsApparatus scratch = FsApparatus.newNative();
+ private RecordingOutErr outErr = new RecordingOutErr();
+ private Path serverDir;
+ private Path workspaceDir;
+ private RPCTestingClient client;
+ private Thread serverThread = new Thread(){
+ @Override
+ public void run() {
+ server.serve();
+ }
+ };
+
+ private static final ServerCommand helloWorldCommand = new ServerCommand() {
+ @Override
+ public int exec(List<String> args, OutErr outErr, LockingMode lockingMode,
+ String clientDescription, long firstContactTime) {
+ outErr.printOut(COMMAND_STDOUT);
+ outErr.printErr(COMMAND_STDERR);
+ return 42;
+ }
+
+ @Override
+ public ShutdownMethod shutdown() {
+ return ShutdownMethod.NONE;
+ }
+ };
+
+ @Before
+ public final void startServer() throws Exception {
+ // Do not use `createUnixTempDir()` here since the file name that results is longer
+ // than 108 characters, so cannot be used as local socket address.
+ File file = File.createTempFile("scratch", ".tmp", new File("/tmp"));
+ file.delete();
+ file.mkdir();
+ serverDir = this.scratch.dir(file.getAbsolutePath());
+
+ workspaceDir = this.scratch.createUnixTempDir();
+ workspaceDir.createDirectory();
+ client = new RPCTestingClient(
+ outErr, serverDir.getRelative("server.socket"));
+ RPCService service = new RPCService(helloWorldCommand);
+ server = new AfUnixServer(new JavaClock(), service, MAX_IDLE_MILLIS, HEALTH_CHECK_MILLIS,
+ serverDir, workspaceDir);
+ serverThread.start();
+ }
+
+ @After
+ public final void stopServer() throws Exception {
+ serverThread.interrupt();
+ serverThread.join();
+
+ FileSystemUtils.deleteTree(serverDir);
+ }
+
+ private void runTestRequest(String request, int ret, String out, String err) throws Exception {
+ assertEquals(ret, client.sendRequest(request));
+ assertEquals(out, outErr.outAsLatin1());
+ assertThat(outErr.errAsLatin1()).contains(err);
+ }
+
+ @Test
+ public void testUnknownCommand() throws Exception {
+ runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n");
+ }
+
+ @Test
+ public void testEmptyBlazeCommand() throws Exception {
+ runTestRequest("unknown", 2, "", "SERVER ERROR: Unknown command: unknown\n");
+ }
+
+ @Test
+ public void testWorkspaceDies() throws Exception {
+ assertTrue(serverThread.isAlive());
+ runTestRequest("blaze", 42, COMMAND_STDOUT, COMMAND_STDERR);
+ Thread.sleep(HEALTH_CHECK_MILLIS * 2);
+ assertTrue(serverThread.isAlive());
+
+ assertTrue(workspaceDir.delete());
+ Thread.sleep(HEALTH_CHECK_MILLIS * 2);
+ assertFalse(serverThread.isAlive());
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java
new file mode 100644
index 0000000000..3b1515d2e2
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/server/RPCTestingClient.java
@@ -0,0 +1,88 @@
+// Copyright 2015 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.unix.LocalClientSocket;
+import com.google.devtools.build.lib.unix.LocalSocketAddress;
+import com.google.devtools.build.lib.util.io.RecordingOutErr;
+import com.google.devtools.build.lib.util.io.StreamDemultiplexer;
+import com.google.devtools.build.lib.vfs.Path;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+
+/**
+ * A client to test RPCServer.
+ */
+public class RPCTestingClient {
+
+ private final RecordingOutErr outErr;
+ private final Path socketFile;
+
+ /**
+ * Create a client to RPCServer. {@code socketFile} must be a file
+ * on disk; this will not work with the in-memory file system.
+ */
+ public RPCTestingClient(RecordingOutErr outErr, Path socketFile) {
+ this.socketFile = socketFile;
+ this.outErr = outErr;
+ }
+
+ public int sendRequest(String command, String... params)
+ throws Exception {
+ String request = command;
+ for (String param : params) {
+ request += "\0" + param;
+ }
+ return sendRequest(request);
+ }
+
+ public int sendRequest(String request) throws Exception {
+ LocalClientSocket connection = new LocalClientSocket();
+ connection.connect(new LocalSocketAddress(socketFile.getPathFile()));
+ try {
+ OutputStream out = connection.getOutputStream();
+ byte[] requestBytes = request.getBytes(UTF_8);
+ byte[] requestLength = new byte[4];
+ requestLength[0] = (byte) (requestBytes.length << 24);
+ requestLength[1] = (byte) ((requestBytes.length << 16) & 0xff);
+ requestLength[2] = (byte) ((requestBytes.length << 8) & 0xff);
+ requestLength[3] = (byte) (requestBytes.length & 0xff);
+ out.write(requestLength);
+ out.write(requestBytes);
+ out.flush();
+ connection.shutdownOutput();
+
+ OutputStream stdout = outErr.getOutputStream();
+ OutputStream stderr = outErr.getErrorStream();
+ ByteArrayOutputStream control = new ByteArrayOutputStream();
+ StreamDemultiplexer demux = new StreamDemultiplexer((byte) 1,
+ stdout, stderr, control);
+ ByteStreams.copy(connection.getInputStream(), demux);
+ demux.flush();
+
+ byte[] controlBytes = control.toByteArray();
+ return (((int) controlBytes[0]) << 24)
+ + (((int) controlBytes[1]) << 16)
+ + (((int) controlBytes[2]) << 8)
+ + ((int) controlBytes[3]);
+ } finally {
+ connection.close();
+ }
+ }
+
+}