aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tensorflow/core/BUILD2
-rw-r--r--tensorflow/core/distributed_runtime/rpc/grpc_testlib.h2
-rw-r--r--tensorflow/core/platform/posix/subprocess.cc464
-rw-r--r--tensorflow/core/platform/posix/subprocess.h131
-rw-r--r--tensorflow/core/platform/posix/test.cc57
-rw-r--r--tensorflow/core/platform/subprocess.h61
-rw-r--r--tensorflow/core/platform/subprocess_test.cc184
-rw-r--r--tensorflow/core/platform/test.h23
8 files changed, 850 insertions, 74 deletions
diff --git a/tensorflow/core/BUILD b/tensorflow/core/BUILD
index 455774bf03..a1054472e3 100644
--- a/tensorflow/core/BUILD
+++ b/tensorflow/core/BUILD
@@ -214,6 +214,7 @@ cc_library(
"platform/protobuf.h",
"platform/stacktrace.h",
"platform/strong_hash.h",
+ "platform/subprocess.h",
"platform/thread_annotations.h",
"platform/types.h",
],
@@ -1539,6 +1540,7 @@ tf_cc_tests(
"platform/net_test.cc",
"platform/port_test.cc",
"platform/profile_utils/cpu_utils_test.cc",
+ "platform/subprocess_test.cc",
],
deps = [
":lib",
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_testlib.h b/tensorflow/core/distributed_runtime/rpc/grpc_testlib.h
index cfaf4c0da9..5e81b90189 100644
--- a/tensorflow/core/distributed_runtime/rpc/grpc_testlib.h
+++ b/tensorflow/core/distributed_runtime/rpc/grpc_testlib.h
@@ -60,7 +60,7 @@ class TestCluster {
private:
TestCluster() = default;
- std::vector<std::unique_ptr<testing::SubProcess>> subprocesses_;
+ std::vector<std::unique_ptr<SubProcess>> subprocesses_;
std::vector<string> targets_;
std::vector<DeviceAttributes> devices_;
diff --git a/tensorflow/core/platform/posix/subprocess.cc b/tensorflow/core/platform/posix/subprocess.cc
new file mode 100644
index 0000000000..ef646baf4d
--- /dev/null
+++ b/tensorflow/core/platform/posix/subprocess.cc
@@ -0,0 +1,464 @@
+/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include <fcntl.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/subprocess.h"
+
+// 1) FYI from m3b@ about fork():
+// A danger of calling fork() (as opposed to clone() or vfork()) is that if
+// many people have used pthread_atfork() to acquire locks, fork() can deadlock,
+// because it's unlikely that the locking order will be correct in a large
+// programme where different layers are unaware of one another and using
+// pthread_atfork() independently.
+//
+// The danger of not calling fork() is that if libc managed to use
+// pthread_atfork() correctly (for example, to lock the environment), you'd
+// miss out on that protection. (But as far as I can see most libc's don't get
+// that right; certainly glibc doesn't seem to.)
+//
+// clone() or vfork() are also frustrating because clone() exists only on Linux,
+// and both clone(...CLONE_VM...) and vfork() have interesting issues around
+// signals being delivered after the fork and before the exec. It may be
+// possible to work around the latter by blocking all signals before the fork
+// and unblocking them afterwards.
+//
+// Fortunately, most people haven't heard of pthread_atfork().
+//
+//
+// 2) FYI from m3b@ about execv():
+// The execv() call implicitly uses the libc global variable environ, which was
+// copied by fork(), and that copy could have raced with a setenv() call in
+// another thread, since libc implementations are usually not very careful about
+// this. (glibc isn't careful, for example.)
+//
+// If this were inside libc, we could use locks or memory barriers to avoid the
+// race, but as it is, I see nothing you can do. Even if you tried to copy the
+// environment before the fork(), the copying could race with other threads
+// calling setenv(). The good news is that few people call setenv().
+//
+// Amusingly, the standard says of fork(): "...to avoid errors, the child
+// process may only execute async-signal-safe operations until such time as one
+// of the exec functions is called." Notice that execve() is listed as
+// async-signal-safe, but execv() is not, and the difference is just the
+// handling of the environment.
+
+namespace tensorflow {
+
+SubProcess::SubProcess(int nfds)
+ : running_(false), pid_(-1), exec_path_(nullptr), exec_argv_(nullptr) {
+ // The input 'nfds' parameter is currently ignored and the internal constant
+ // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
+ for (int i = 0; i < kNFds; i++) {
+ action_[i] = ACTION_CLOSE;
+ parent_pipe_[i] = -1;
+ child_pipe_[i] = -1;
+ }
+}
+
+SubProcess::~SubProcess() {
+ mutex_lock procLock(proc_mu_);
+ mutex_lock dataLock(data_mu_);
+ pid_ = -1;
+ running_ = false;
+ FreeArgs();
+ ClosePipes();
+}
+
+void SubProcess::FreeArgs() {
+ free(exec_path_);
+ exec_path_ = nullptr;
+
+ if (exec_argv_) {
+ for (char** p = exec_argv_; *p != nullptr; p++) {
+ free(*p);
+ }
+ delete[] exec_argv_;
+ exec_argv_ = nullptr;
+ }
+}
+
+void SubProcess::ClosePipes() {
+ for (int i = 0; i < kNFds; i++) {
+ if (parent_pipe_[i] >= 0) {
+ close(parent_pipe_[i]);
+ parent_pipe_[i] = -1;
+ }
+ if (child_pipe_[i] >= 0) {
+ close(child_pipe_[i]);
+ child_pipe_[i] = -1;
+ }
+ }
+}
+
+void SubProcess::SetProgram(const string& file,
+ const std::vector<string>& argv) {
+ mutex_lock procLock(proc_mu_);
+ mutex_lock dataLock(data_mu_);
+ if (running_) {
+ LOG(FATAL) << "SetProgram called after the process was started.";
+ return;
+ }
+
+ FreeArgs();
+ exec_path_ = strdup(file.c_str());
+ if (exec_path_ == nullptr) {
+ LOG(FATAL) << "SetProgram failed to allocate file string.";
+ return;
+ }
+
+ int argc = argv.size();
+ exec_argv_ = new char*[argc + 1];
+ for (int i = 0; i < argc; i++) {
+ exec_argv_[i] = strdup(argv[i].c_str());
+ if (exec_argv_[i] == nullptr) {
+ LOG(FATAL) << "SetProgram failed to allocate command argument.";
+ return;
+ }
+ }
+ exec_argv_[argc] = nullptr;
+}
+
+void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
+ mutex_lock procLock(proc_mu_);
+ mutex_lock dataLock(data_mu_);
+ if (running_) {
+ LOG(FATAL) << "SetChannelAction called after the process was started.";
+ } else if (!chan_valid(chan)) {
+ LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
+ } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
+ (action != ACTION_DUPPARENT)) {
+ LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
+ } else {
+ action_[chan] = action;
+ }
+}
+
+bool SubProcess::Start() {
+ mutex_lock procLock(proc_mu_);
+ mutex_lock dataLock(data_mu_);
+ if (running_) {
+ LOG(ERROR) << "Start called after the process was started.";
+ return false;
+ }
+ if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
+ LOG(ERROR) << "Start called without setting a program.";
+ return false;
+ }
+
+ // Create parent/child pipes for the specified channels and make the
+ // parent-side of the pipes non-blocking.
+ for (int i = 0; i < kNFds; i++) {
+ if (action_[i] == ACTION_PIPE) {
+ int pipe_fds[2];
+ if (pipe(pipe_fds) < 0) {
+ LOG(ERROR) << "Start cannot create pipe: " << strerror(errno);
+ ClosePipes();
+ return false;
+ }
+ // Handle the direction of the pipe (stdin vs stdout/err).
+ if (i == 0) {
+ parent_pipe_[i] = pipe_fds[1];
+ child_pipe_[i] = pipe_fds[0];
+ } else {
+ parent_pipe_[i] = pipe_fds[0];
+ child_pipe_[i] = pipe_fds[1];
+ }
+
+ if (fcntl(parent_pipe_[i], F_SETFL, O_NONBLOCK) < 0) {
+ LOG(ERROR) << "Start cannot make pipe non-blocking: "
+ << strerror(errno);
+ ClosePipes();
+ return false;
+ }
+ if (fcntl(parent_pipe_[i], F_SETFD, FD_CLOEXEC) < 0) {
+ LOG(ERROR) << "Start cannot make pipe close-on-exec: "
+ << strerror(errno);
+ ClosePipes();
+ return false;
+ }
+ }
+ }
+
+ // Start the child process and setup the file descriptors of both processes.
+ // See comment (1) in the header about issues with the use of fork().
+ pid_ = fork();
+ if (pid_ < 0) {
+ LOG(ERROR) << "Start cannot fork() child process: " << strerror(errno);
+ ClosePipes();
+ return false;
+ }
+
+ if (pid_ > 0) {
+ // Parent process: close the child-side pipes and return.
+ running_ = true;
+ for (int i = 0; i < kNFds; i++) {
+ if (child_pipe_[i] >= 0) {
+ close(child_pipe_[i]);
+ child_pipe_[i] = -1;
+ }
+ }
+ return true;
+ }
+
+ // Child process: close parent-side pipes and channels marked for closing.
+ // For pipe channels, replace their file descriptors with the pipes.
+ int devnull_fd = -1;
+ for (int i = 0; i < kNFds; i++) {
+ if (parent_pipe_[i] >= 0) {
+ close(parent_pipe_[i]);
+ parent_pipe_[i] = -1;
+ }
+
+ switch (action_[i]) {
+ case ACTION_DUPPARENT:
+ // Nothing to do, fork() took care of it.
+ break;
+
+ case ACTION_PIPE:
+ while (dup2(child_pipe_[i], i) < 0) {
+ if (!retry(errno)) {
+ _exit(1);
+ }
+ }
+ close(child_pipe_[i]);
+ child_pipe_[i] = -1;
+ break;
+
+ case ACTION_CLOSE:
+ default:
+ // Do not close stdin/out/err, instead redirect them to /dev/null so
+ // their file descriptors remain unavailable for reuse by open(), etc.
+ if (i <= CHAN_STDERR) {
+ if (devnull_fd < 0) {
+ while ((devnull_fd = open("/dev/null", O_RDWR, 0)) < 0) {
+ if (!retry(errno)) {
+ _exit(1);
+ }
+ }
+ }
+ while (dup2(devnull_fd, i) < 0) {
+ if (!retry(errno)) {
+ _exit(1);
+ }
+ }
+ } else {
+ close(i);
+ }
+ break;
+ }
+ }
+
+ if (devnull_fd >= 0) {
+ close(devnull_fd);
+ }
+
+ // Execute the child program.
+ // See comment (2) in the header about issues with the use of execv().
+ execv(exec_path_, exec_argv_);
+ _exit(1);
+}
+
+bool SubProcess::Wait() {
+ int status;
+ return WaitInternal(&status);
+}
+
+bool SubProcess::WaitInternal(int* status) {
+ // The waiter must release proc_mu_ while waiting in order for Kill() to work.
+ proc_mu_.lock();
+ bool running = running_;
+ pid_t pid = pid_;
+ proc_mu_.unlock();
+
+ bool ret = false;
+ if (running && (pid > 1)) {
+ pid_t cpid;
+ int cstat;
+ bool done = false;
+ while (!done) {
+ cpid = waitpid(pid, &cstat, 0);
+ if ((cpid < 0) && !retry(errno)) {
+ done = true;
+ } else if ((cpid == pid) && (WIFEXITED(cstat) || WIFSIGNALED(cstat))) {
+ *status = cstat;
+ ret = true;
+ done = true;
+ }
+ }
+ }
+
+ proc_mu_.lock();
+ if ((running_ == running) && (pid_ == pid)) {
+ running_ = false;
+ pid_ = -1;
+ }
+ proc_mu_.unlock();
+ return ret;
+}
+
+bool SubProcess::Kill(int signal) {
+ proc_mu_.lock();
+ bool running = running_;
+ pid_t pid = pid_;
+ proc_mu_.unlock();
+
+ bool ret = false;
+ if (running && (pid > 1)) {
+ ret = (kill(pid, signal) == 0);
+ }
+ return ret;
+}
+
+int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
+ string* stderr_output) {
+ struct pollfd fds[kNFds];
+ size_t nbytes[kNFds];
+ string* iobufs[kNFds];
+ int fd_count = 0;
+
+ proc_mu_.lock();
+ bool running = running_;
+ proc_mu_.unlock();
+ if (!running) {
+ LOG(ERROR) << "Communicate called without a running process.";
+ return 1;
+ }
+
+ // If SIGPIPE handling is the default action, change it to ignore SIGPIPE and
+ // keep it ignored, don't change it back. This is needed while communicating
+ // with the child process so the parent process can survive the death of the
+ // child process while it is writing to its stdin. If the application has
+ // registered a SIGPIPE handler, then let it deal with any signals generated
+ // by the premature death of the child process, don't overwrite its handler.
+ struct sigaction act;
+ if (sigaction(SIGPIPE, nullptr, &act) < 0) {
+ LOG(ERROR) << "Communicate cannot get SIGPIPE handler: " << strerror(errno);
+ return 1;
+ }
+ if (act.sa_handler == SIG_DFL) {
+ memset(&act, 0, sizeof(act));
+ act.sa_handler = SIG_IGN;
+ sigemptyset(&act.sa_mask);
+ if (sigaction(SIGPIPE, &act, nullptr) < 0) {
+ LOG(ERROR) << "Communicate cannot ignore SIGPIPE: " << strerror(errno);
+ return 1;
+ }
+ }
+
+ // Lock data_mu_ but not proc_mu_ while communicating with the child process
+ // in order for Kill() to be able to terminate the child from another thread.
+ data_mu_.lock();
+
+ // Initialize the poll() structures and buffer tracking.
+ for (int i = 0; i < kNFds; i++) {
+ if (action_[i] == ACTION_PIPE) {
+ switch (i) {
+ case CHAN_STDIN:
+ // Special case: if no data is given to send to the child process,
+ // close the pipe to unblock the child, and skip the file descriptor.
+ if (stdin_input == nullptr) {
+ close(parent_pipe_[i]);
+ parent_pipe_[i] = -1;
+ continue;
+ }
+ iobufs[fd_count] = const_cast<string*>(stdin_input);
+ break;
+ case CHAN_STDOUT:
+ iobufs[fd_count] = stdout_output;
+ break;
+ case CHAN_STDERR:
+ iobufs[fd_count] = stderr_output;
+ break;
+ default:
+ iobufs[fd_count] = nullptr;
+ break;
+ }
+ nbytes[fd_count] = 0;
+ fds[fd_count].fd = parent_pipe_[i];
+ fds[fd_count].events = (i > 0) ? POLLIN : POLLOUT;
+ fds[fd_count].revents = 0;
+ fd_count++;
+ }
+ }
+
+ // Loop communicating with the child process.
+ int fd_remain = fd_count;
+ char buf[4096];
+ while (fd_remain > 0) {
+ int n = poll(fds, fd_count, -1);
+ if ((n < 0) && !retry(errno)) {
+ LOG(ERROR) << "Communicate cannot poll(): " << strerror(errno);
+ fd_remain = 0;
+ } else if (n == 0) {
+ LOG(ERROR) << "Communicate cannot poll(): timeout not possible";
+ fd_remain = 0;
+ } else if (n > 0) {
+ // Handle the pipes ready for I/O.
+ for (int i = 0; i < fd_count; i++) {
+ if ((fds[i].revents & (POLLIN | POLLHUP)) != 0) {
+ // Read from one of the child's outputs.
+ ssize_t n = read(fds[i].fd, buf, sizeof(buf));
+ if (n > 0) {
+ if (iobufs[i] != nullptr) {
+ iobufs[i]->append(buf, n);
+ nbytes[i] += n;
+ }
+ } else if ((n == 0) || !retry(errno)) {
+ fds[i].fd = -1;
+ fd_remain--;
+ }
+ } else if ((fds[i].revents & POLLOUT) != 0) {
+ // Write to the child's stdin.
+ ssize_t n = iobufs[i]->size() - nbytes[i];
+ if (n > 0) {
+ n = write(fds[i].fd, iobufs[i]->c_str() + nbytes[i], n);
+ }
+ if (n >= 0) {
+ nbytes[i] += n;
+ if (nbytes[i] >= iobufs[i]->size()) {
+ fds[i].fd = -1;
+ fd_remain--;
+ // Close the child's stdin pipe to unblock the process.
+ close(parent_pipe_[CHAN_STDIN]);
+ parent_pipe_[CHAN_STDIN] = -1;
+ }
+ } else if (!retry(errno)) {
+ fds[i].fd = -1;
+ fd_remain--;
+ }
+ } else if ((fds[i].revents & POLLERR) != 0) {
+ fds[i].fd = -1;
+ fd_remain--;
+ }
+ }
+ }
+ }
+
+ data_mu_.unlock();
+
+ // Wait for the child process to exit and return its status.
+ int status;
+ return WaitInternal(&status) ? status : -1;
+}
+
+} // namespace tensorflow
diff --git a/tensorflow/core/platform/posix/subprocess.h b/tensorflow/core/platform/posix/subprocess.h
new file mode 100644
index 0000000000..53f95f3c14
--- /dev/null
+++ b/tensorflow/core/platform/posix/subprocess.h
@@ -0,0 +1,131 @@
+/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_
+#define TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_
+
+#include <errno.h>
+#include <unistd.h>
+
+#include <string>
+#include <vector>
+
+#include "tensorflow/core/platform/macros.h"
+#include "tensorflow/core/platform/mutex.h"
+#include "tensorflow/core/platform/types.h"
+
+namespace tensorflow {
+
+class SubProcess {
+ public:
+ // SubProcess()
+ // nfds: The number of file descriptors to use.
+ explicit SubProcess(int nfds = 3);
+
+ // Virtual for backwards compatibility; do not create new subclasses.
+ // It is illegal to delete the SubProcess within its exit callback.
+ virtual ~SubProcess();
+
+ // SetChannelAction()
+ // Set how to handle a channel. The default action is ACTION_CLOSE.
+ // The action is set for all subsequent processes, until SetChannel()
+ // is called again.
+ //
+ // SetChannel may not be called while the process is running.
+ //
+ // chan: Which channel this applies to.
+ // action: What to do with the channel.
+ // Virtual for backwards compatibility; do not create new subclasses.
+ virtual void SetChannelAction(Channel chan, ChannelAction action);
+
+ // SetProgram()
+ // Set up a program and argument list for execution, with the full
+ // "raw" argument list passed as a vector of strings. argv[0]
+ // should be the program name, just as in execv().
+ //
+ // file: The file containing the program. This must be an absolute path
+ // name - $PATH is not searched.
+ // argv: The argument list.
+ virtual void SetProgram(const string& file, const std::vector<string>& argv);
+
+ // Start()
+ // Run the command that was previously set up with SetProgram().
+ // The following are fatal programming errors:
+ // * Attempting to start when a process is already running.
+ // * Attempting to start without first setting the command.
+ // Note, however, that Start() does not try to validate that the binary
+ // does anything reasonable (e.g. exists or can execute); as such, you can
+ // specify a non-existent binary and Start() will still return true. You
+ // will get a failure from the process, but only after Start() returns.
+ //
+ // Return true normally, or false if the program couldn't be started
+ // because of some error.
+ // Virtual for backwards compatibility; do not create new subclasses.
+ virtual bool Start();
+
+ // Kill()
+ // Send the given signal to the process.
+ // Return true normally, or false if we couldn't send the signal - likely
+ // because the process doesn't exist.
+ virtual bool Kill(int signal);
+
+ // Wait()
+ // Block until the process exits.
+ // Return true normally, or false if the process wasn't running.
+ virtual bool Wait();
+
+ // Communicate()
+ // Read from stdout and stderr and writes to stdin until all pipes have
+ // closed, then waits for the process to exit.
+ // Note: Do NOT call Wait() after calling Communicate as it will always
+ // fail, since Communicate calls Wait() internally.
+ // 'stdin_input', 'stdout_output', and 'stderr_output' may be NULL.
+ // If this process is not configured to send stdout or stderr to pipes,
+ // the output strings will not be modified.
+ // If this process is not configured to take stdin from a pipe, stdin_input
+ // will be ignored.
+ // Returns the command's exit status.
+ virtual int Communicate(const string* stdin_input, string* stdout_output,
+ string* stderr_output);
+
+ private:
+ static const int kNFds = 3;
+ static bool chan_valid(int chan) { return ((chan >= 0) && (chan < kNFds)); }
+ static bool retry(int e) {
+ return ((e == EINTR) || (e == EAGAIN) || (e == EWOULDBLOCK));
+ }
+ void FreeArgs() EXCLUSIVE_LOCKS_REQUIRED(data_mu_);
+ void ClosePipes() EXCLUSIVE_LOCKS_REQUIRED(data_mu_);
+ bool WaitInternal(int* status);
+
+ // The separation between proc_mu_ and data_mu_ mutexes allows Kill() to be
+ // called by a thread while another thread is inside Wait() or Communicate().
+ mutable mutex proc_mu_;
+ bool running_ GUARDED_BY(proc_mu_);
+ pid_t pid_ GUARDED_BY(proc_mu_);
+
+ mutable mutex data_mu_ ACQUIRED_AFTER(proc_mu_);
+ char* exec_path_ GUARDED_BY(data_mu_);
+ char** exec_argv_ GUARDED_BY(data_mu_);
+ ChannelAction action_[kNFds] GUARDED_BY(data_mu_);
+ int parent_pipe_[kNFds] GUARDED_BY(data_mu_);
+ int child_pipe_[kNFds] GUARDED_BY(data_mu_);
+
+ TF_DISALLOW_COPY_AND_ASSIGN(SubProcess);
+};
+
+} // namespace tensorflow
+
+#endif // TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_
diff --git a/tensorflow/core/platform/posix/test.cc b/tensorflow/core/platform/posix/test.cc
index f83fccaa22..a69127b3e8 100644
--- a/tensorflow/core/platform/posix/test.cc
+++ b/tensorflow/core/platform/posix/test.cc
@@ -20,62 +20,17 @@ limitations under the License.
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/subprocess.h"
namespace tensorflow {
namespace testing {
-namespace {
-class PosixSubProcess : public SubProcess {
- public:
- PosixSubProcess(const std::vector<string>& argv) : argv_(argv), pid_(0) {}
-
- ~PosixSubProcess() override {}
-
- bool Start() override {
- if (pid_ != 0) {
- LOG(ERROR) << "Tried to start process multiple times.";
- return false;
- }
- pid_ = fork();
- if (pid_ == 0) {
- // We are in the child process.
- const char* path = argv_[0].c_str();
- const char** argv = new const char*[argv_.size() + 1];
- int i = 0;
- for (const string& arg : argv_) {
- argv[i++] = arg.c_str();
- }
- argv[argv_.size()] = nullptr;
- execv(path, (char* const*)argv);
- // Never executes.
- return true;
- } else if (pid_ < 0) {
- LOG(ERROR) << "Failed to fork process.";
- return false;
- } else {
- // We are in the parent process and fork() was successful.
- // TODO(mrry): Consider collecting stderr from the child.
- return true;
- }
- }
-
- bool Kill(int signal) override {
- if (pid_ == 0) {
- LOG(ERROR) << "Tried to kill process before starting it.";
- return false;
- }
- return kill(pid_, signal) == 0;
- }
-
- private:
- const std::vector<string> argv_;
- pid_t pid_;
- TF_DISALLOW_COPY_AND_ASSIGN(PosixSubProcess);
-};
-} // namespace
-
std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
- return std::unique_ptr<SubProcess>(new PosixSubProcess(argv));
+ std::unique_ptr<SubProcess> proc(new SubProcess());
+ proc->SetProgram(argv[0], argv);
+ proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
+ proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
+ return proc;
}
int PickUnusedPortOrDie() { return internal::PickUnusedPortOrDie(); }
diff --git a/tensorflow/core/platform/subprocess.h b/tensorflow/core/platform/subprocess.h
new file mode 100644
index 0000000000..7dfd38688d
--- /dev/null
+++ b/tensorflow/core/platform/subprocess.h
@@ -0,0 +1,61 @@
+/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef TENSORFLOW_PLATFORM_SUBPROCESS_H_
+#define TENSORFLOW_PLATFORM_SUBPROCESS_H_
+
+namespace tensorflow {
+
+// Channel identifiers.
+enum Channel {
+ CHAN_STDIN = 0,
+ CHAN_STDOUT = 1,
+ CHAN_STDERR = 2,
+};
+
+// Specify how a channel is handled.
+enum ChannelAction {
+ // Close the file descriptor when the process starts.
+ // This is the default behavior.
+ ACTION_CLOSE,
+
+ // Make a pipe to the channel. It is used in the Communicate() method to
+ // transfer data between the parent and child processes.
+ ACTION_PIPE,
+
+ // Duplicate the parent's file descriptor. Useful if stdout/stderr should
+ // go to the same place that the parent writes it.
+ ACTION_DUPPARENT,
+};
+
+// Supports spawning and killing child processes.
+class SubProcess;
+
+} // namespace tensorflow
+
+#include "tensorflow/core/platform/platform.h"
+
+#if defined(PLATFORM_GOOGLE)
+#include "tensorflow/core/platform/google/subprocess.h"
+#elif defined(PLATFORM_POSIX) || defined(PLATFORM_POSIX_ANDROID) || \
+ defined(PLATFORM_GOOGLE_ANDROID)
+#include "tensorflow/core/platform/posix/subprocess.h"
+#elif defined(PLATFORM_WINDOWS)
+#error SubProcess not yet implemented for Windows
+#else
+#error Define the appropriate PLATFORM_<foo> macro for this platform
+#endif
+
+#endif // TENSORFLOW_PLATFORM_SUBPROCESS_H_
diff --git a/tensorflow/core/platform/subprocess_test.cc b/tensorflow/core/platform/subprocess_test.cc
new file mode 100644
index 0000000000..3d58b011cb
--- /dev/null
+++ b/tensorflow/core/platform/subprocess_test.cc
@@ -0,0 +1,184 @@
+/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#include <sys/wait.h>
+
+#include "tensorflow/core/lib/core/status_test_util.h"
+#include "tensorflow/core/platform/subprocess.h"
+#include "tensorflow/core/platform/test.h"
+
+namespace tensorflow {
+
+class SubProcessTest : public ::testing::Test {};
+
+TEST_F(SubProcessTest, NoOutputNoComm) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/cat", {"cat", "/dev/null"});
+ EXPECT_TRUE(proc.Start());
+ EXPECT_TRUE(proc.Wait());
+}
+
+TEST_F(SubProcessTest, NoOutput) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/cat", {"cat", "/dev/null"});
+ proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
+ proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ string out, err;
+ int status = proc.Communicate(nullptr, &out, &err);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+ EXPECT_EQ("", out);
+ EXPECT_EQ("", err);
+}
+
+TEST_F(SubProcessTest, Stdout) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/echo", {"echo", "-n", "hello world"});
+ proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
+ proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ string out, err;
+ int status = proc.Communicate(nullptr, &out, &err);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+ EXPECT_EQ("hello world", out);
+ EXPECT_EQ("", err);
+}
+
+TEST_F(SubProcessTest, StdoutIgnored) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/echo", {"echo", "-n", "hello world"});
+ proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
+ proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ int status = proc.Communicate(nullptr, nullptr, nullptr);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+}
+
+TEST_F(SubProcessTest, Stderr) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/cat", {"cat", "/file_does_not_exist"});
+ proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
+ proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ string out, err;
+ int status = proc.Communicate(nullptr, &out, &err);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(1, WEXITSTATUS(status));
+ EXPECT_EQ("", out);
+ EXPECT_NE(string::npos, err.find("/file_does_not_exist"));
+}
+
+TEST_F(SubProcessTest, StderrIgnored) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/cat", {"cat", "/file_does_not_exist"});
+ proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
+ proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ int status = proc.Communicate(nullptr, nullptr, nullptr);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(1, WEXITSTATUS(status));
+}
+
+TEST_F(SubProcessTest, Stdin) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/usr/bin/wc", {"wc", "-l"});
+ proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ string in = "foobar\nbarfoo\nhaha\n";
+ int status = proc.Communicate(&in, nullptr, nullptr);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+}
+
+TEST_F(SubProcessTest, StdinStdout) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/usr/bin/wc", {"wc", "-l"});
+ proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
+ proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ string in = "foobar\nbarfoo\nhaha\n";
+ string out;
+ int status = proc.Communicate(&in, &out, nullptr);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+ int count = stoi(out);
+ EXPECT_EQ(3, count);
+}
+
+TEST_F(SubProcessTest, StdinChildExit) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/sleep", {"sleep", "0"});
+ proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ // Verify that the parent handles the child exiting immediately as the
+ // parent is trying to write a large string to the child's stdin.
+ string in;
+ in.reserve(1000000);
+ for (int i = 0; i < 100000; i++) {
+ in += "hello xyz\n";
+ }
+
+ int status = proc.Communicate(&in, nullptr, nullptr);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+}
+
+TEST_F(SubProcessTest, StdinStdoutOverlap) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/cat", {"cat"});
+ proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
+ proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ // Verify that the parent handles multiplexed reading/writing to the child
+ // process. The string is large enough to exceed the buffering of the pipes.
+ string in;
+ in.reserve(1000000);
+ for (int i = 0; i < 100000; i++) {
+ in += "hello xyz\n";
+ }
+
+ string out;
+ int status = proc.Communicate(&in, &out, nullptr);
+ EXPECT_TRUE(WIFEXITED(status));
+ EXPECT_EQ(0, WEXITSTATUS(status));
+ EXPECT_EQ(in, out);
+}
+
+TEST_F(SubProcessTest, KillProc) {
+ tensorflow::SubProcess proc;
+ proc.SetProgram("/bin/cat", {"cat"});
+ proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
+ proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
+ EXPECT_TRUE(proc.Start());
+
+ EXPECT_TRUE(proc.Kill(SIGKILL));
+ EXPECT_TRUE(proc.Wait());
+
+ EXPECT_FALSE(proc.Kill(SIGKILL));
+}
+
+} // namespace tensorflow
diff --git a/tensorflow/core/platform/test.h b/tensorflow/core/platform/test.h
index 0046b6c3f1..295957c3d8 100644
--- a/tensorflow/core/platform/test.h
+++ b/tensorflow/core/platform/test.h
@@ -21,6 +21,7 @@ limitations under the License.
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/platform/platform.h"
+#include "tensorflow/core/platform/subprocess.h"
#include "tensorflow/core/platform/types.h"
// As of September 2016, we continue to attempt to avoid the use of gmock aka
@@ -48,28 +49,6 @@ string TensorFlowSrcRoot();
// Returns the same value for the lifetime of the process.
int RandomSeed();
-// Supports spawning and killing child processes, for use in
-// multi-process testing.
-class SubProcess {
- public:
- virtual ~SubProcess() {}
-
- // Starts the subprocess. Returns true on success, otherwise false.
- // NOTE: This method is not thread-safe.
- virtual bool Start() = 0;
-
- // Kills the subprocess with the given signal number. Returns true
- // on success, otherwise false.
- // NOTE: This method is not thread-safe.
- virtual bool Kill(int signal) = 0;
-
- protected:
- SubProcess() {}
-
- private:
- TF_DISALLOW_COPY_AND_ASSIGN(SubProcess);
-};
-
// Returns an object that represents a child process that will be
// launched with the given command-line arguments `argv`. The process
// must be explicitly started by calling the Start() method on the