aboutsummaryrefslogtreecommitdiffhomepage
path: root/tools
diff options
context:
space:
mode:
authorGravatar Mike Klein <mtklein@chromium.org>2017-07-26 15:13:47 -0400
committerGravatar Skia Commit-Bot <skia-commit-bot@chromium.org>2017-07-26 21:36:40 +0000
commit154e6dadea0db8c72de1fe1e23718e2b6933a660 (patch)
tree0c8ee67066d5d9f3da32bac92c5dc5809f2b2f0b /tools
parent2890fbfe1400b81e4d6af98d14dfe757fec93650 (diff)
factor Engine out of ok core
This makes Engines (task execution strategies: serial, thread, fork) pluggable just like most of the rest of ok. It removes the thread and process limits, as I find myself rarely caring about what they are exactly. Instead of limiting to num-cores, we just allow any number of concurrent threads, and any number of concurrent child processes subject to OS limitations. Change-Id: Icef49d86818fe9a4b7380efb60e73e40bc2e6b73 Reviewed-on: https://skia-review.googlesource.com/27140 Reviewed-by: Mike Klein <mtklein@chromium.org> Commit-Queue: Mike Klein <mtklein@chromium.org>
Diffstat (limited to 'tools')
-rw-r--r--tools/ok.cpp153
-rw-r--r--tools/ok.h8
-rw-r--r--tools/ok_engines.cpp85
3 files changed, 151 insertions, 95 deletions
diff --git a/tools/ok.cpp b/tools/ok.cpp
index edda9bede0..edb60f14db 100644
--- a/tools/ok.cpp
+++ b/tools/ok.cpp
@@ -13,11 +13,9 @@
#include "SkImage.h"
#include "ok.h"
#include <chrono>
-#include <future>
#include <list>
#include <stdio.h>
#include <stdlib.h>
-#include <thread>
#include <vector>
#if !defined(__has_include)
@@ -97,80 +95,11 @@ static thread_local const char* tls_currently_running = "";
}
#endif
-struct Engine {
- virtual ~Engine() {}
- virtual bool spawn(std::function<Status(void)>) = 0;
- virtual Status wait_one() = 0;
-};
-
-struct SerialEngine : Engine {
- Status last = Status::None;
-
- bool spawn(std::function<Status(void)> fn) override {
- last = fn();
- return true;
- }
-
- Status wait_one() override {
- Status s = last;
- last = Status::None;
- return s;
- }
-};
-
-struct ThreadEngine : Engine {
- std::list<std::future<Status>> live;
- const std::chrono::steady_clock::time_point the_past = std::chrono::steady_clock::now();
-
- bool spawn(std::function<Status(void)> fn) override {
- live.push_back(std::async(std::launch::async, fn));
- return true;
- }
-
- Status wait_one() override {
- if (live.empty()) {
- return Status::None;
- }
-
- for (;;) {
- for (auto it = live.begin(); it != live.end(); it++) {
- if (it->wait_until(the_past) == std::future_status::ready) {
- Status s = it->get();
- live.erase(it);
- return s;
- }
- }
- }
- }
+struct EngineType {
+ const char *name, *help;
+ std::unique_ptr<Engine> (*factory)(Options);
};
-
-#if defined(_MSC_VER)
- using ForkEngine = ThreadEngine;
-#else
- #include <sys/wait.h>
- #include <unistd.h>
-
- struct ForkEngine : Engine {
- bool spawn(std::function<Status(void)> fn) override {
- switch (fork()) {
- case 0: _exit((int)fn());
- case -1: return false;
- default: return true;
- }
- }
-
- Status wait_one() override {
- do {
- int status;
- if (wait(&status) > 0) {
- return WIFEXITED(status) ? (Status)WEXITSTATUS(status)
- : Status::Crashed;
- }
- } while (errno == EINTR);
- return Status::None;
- }
- };
-#endif
+static std::vector<EngineType> engine_types;
struct StreamType {
const char *name, *help;
@@ -206,7 +135,7 @@ int main(int argc, char** argv) {
SkGraphics::Init();
setup_crash_handler();
- int jobs{1};
+ std::unique_ptr<Engine> engine;
std::unique_ptr<Stream> stream;
std::function<std::unique_ptr<Dst>(void)> dst_factory = []{
// A default Dst that's enough for unit tests and not much else.
@@ -218,28 +147,36 @@ int main(int argc, char** argv) {
};
auto help = [&] {
- std::string stream_help = help_for(stream_types),
+ std::string engine_help = help_for(engine_types),
+ stream_help = help_for(stream_types),
dst_help = help_for( dst_types),
via_help = help_for( via_types);
- printf("%s [-j N] src[:k=v,...] dst[:k=v,...] [via[:k=v,...] ...] \n"
- " -j: Run at most N processes at any time. \n"
- " If <0, use -N threads instead. \n"
- " If 0, use one thread in one process. \n"
- " If 1 (default) or -1, auto-detect N. \n"
+ printf("%s [engine] src[:k=v,...] dst[:k=v,...] [via[:k=v,...] ...] \n"
+ " engine: how to execute tasks%s \n"
" src: content to draw%s \n"
" dst: how to draw that content%s \n"
" via: wrappers around dst%s \n"
" Most srcs, dsts and vias have options, e.g. skp:dir=skps sw:ct=565 \n",
- argv[0], stream_help.c_str(), dst_help.c_str(), via_help.c_str());
+ argv[0],
+ engine_help.c_str(), stream_help.c_str(), dst_help.c_str(), via_help.c_str());
return 1;
};
for (int i = 1; i < argc; i++) {
- if (0 == strcmp("-j", argv[i])) { jobs = atoi(argv[++i]); }
if (0 == strcmp("-h", argv[i])) { return help(); }
if (0 == strcmp("--help", argv[i])) { return help(); }
+ for (auto e : engine_types) {
+ size_t len = strlen(e.name);
+ if (0 == strncmp(e.name, argv[i], len)) {
+ switch (argv[i][len]) {
+ case ':': len++;
+ case '\0': engine = e.factory(Options{argv[i]+len});
+ }
+ }
+ }
+
for (auto s : stream_types) {
size_t len = strlen(s.name);
if (0 == strncmp(s.name, argv[i], len)) {
@@ -275,12 +212,12 @@ int main(int argc, char** argv) {
}
if (!stream) { return help(); }
- std::unique_ptr<Engine> engine;
- if (jobs == 0) { engine.reset(new SerialEngine); }
- if (jobs > 0) { engine.reset(new ForkEngine); defer_logging(); }
- if (jobs < 0) { engine.reset(new ThreadEngine); jobs = -jobs; }
+ if (!engine) { engine = engine_types.back().factory(Options{}); }
- if (jobs == 1) { jobs = std::thread::hardware_concurrency(); }
+ // If we know engine->spawn() will never crash, we can defer logging until we exit.
+ if (engine->crashproof()) {
+ defer_logging();
+ }
int ok = 0, failed = 0, crashed = 0, skipped = 0;
@@ -306,15 +243,37 @@ int main(int argc, char** argv) {
fflush(stdout);
};
- auto spawn = [&](std::function<Status(void)> fn) {
- if (--jobs < 0) {
- update_stats(engine->wait_one());
+ std::list<std::future<Status>> live;
+ const auto the_past = std::chrono::steady_clock::now();
+
+ auto wait_one = [&] {
+ if (live.empty()) {
+ return Status::None;
}
- while (!engine->spawn(fn)) {
- update_stats(engine->wait_one());
+
+ for (;;) {
+ for (auto it = live.begin(); it != live.end(); it++) {
+ if (it->wait_until(the_past) != std::future_status::timeout) {
+ Status s = it->get();
+ live.erase(it);
+ return s;
+ }
+ }
}
};
+ auto spawn = [&](std::function<Status(void)> fn) {
+ std::future<Status> status;
+ for (;;) {
+ status = engine->spawn(fn);
+ if (status.valid()) {
+ break;
+ }
+ update_stats(wait_one());
+ }
+ live.push_back(std::move(status));
+ };
+
for (std::unique_ptr<Src> owned = stream->next(); owned; owned = stream->next()) {
Src* raw = owned.release(); // Can't move std::unique_ptr into a lambda in C++11. :(
spawn([=] {
@@ -328,7 +287,7 @@ int main(int argc, char** argv) {
}
for (Status s = Status::OK; s != Status::None; ) {
- s = engine->wait_one();
+ s = wait_one();
update_stats(s);
}
printf("\n");
@@ -337,6 +296,10 @@ int main(int argc, char** argv) {
Register::Register(const char* name, const char* help,
+ std::unique_ptr<Engine> (*factory)(Options)) {
+ engine_types.push_back(EngineType{name, help, factory});
+}
+Register::Register(const char* name, const char* help,
std::unique_ptr<Stream> (*factory)(Options)) {
stream_types.push_back(StreamType{name, help, factory});
}
diff --git a/tools/ok.h b/tools/ok.h
index f55842b3d9..502df23b4a 100644
--- a/tools/ok.h
+++ b/tools/ok.h
@@ -10,6 +10,7 @@
#include "SkCanvas.h"
#include <functional>
+#include <future>
#include <map>
#include <memory>
#include <string>
@@ -24,6 +25,12 @@ void ok_log(const char*);
enum class Status { OK, Failed, Crashed, Skipped, None };
+struct Engine {
+ virtual ~Engine() {}
+ virtual bool crashproof() = 0;
+ virtual std::future<Status> spawn(std::function<Status(void)>) = 0;
+};
+
struct Src {
virtual ~Src() {}
virtual std::string name() = 0;
@@ -52,6 +59,7 @@ public:
// Create globals to register your new type of Stream or Dst.
struct Register {
+ Register(const char* name, const char* help, std::unique_ptr<Engine> (*factory)(Options));
Register(const char* name, const char* help, std::unique_ptr<Stream> (*factory)(Options));
Register(const char* name, const char* help, std::unique_ptr<Dst> (*factory)(Options));
Register(const char* name, const char* help,
diff --git a/tools/ok_engines.cpp b/tools/ok_engines.cpp
new file mode 100644
index 0000000000..e2218bfa2a
--- /dev/null
+++ b/tools/ok_engines.cpp
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#include "ok.h"
+
+struct SerialEngine : Engine {
+ static std::unique_ptr<Engine> Factory(Options) {
+ SerialEngine engine;
+ return move_unique(engine);
+ }
+
+ bool crashproof() override { return false; }
+
+ std::future<Status> spawn(std::function<Status(void)> fn) override {
+ return std::async(std::launch::deferred, fn);
+ }
+};
+static Register serial("serial",
+ "Run tasks serially on the main thread of a single process.",
+ SerialEngine::Factory);
+
+struct ThreadEngine : Engine {
+ static std::unique_ptr<Engine> Factory(Options) {
+ ThreadEngine engine;
+ return move_unique(engine);
+ }
+
+ bool crashproof() override { return false; }
+
+ std::future<Status> spawn(std::function<Status(void)> fn) override {
+ return std::async(std::launch::async, fn);
+ }
+};
+static Register thread("thread",
+ "Run each task on its own thread of a single process.",
+ ThreadEngine::Factory);
+
+#if !defined(_MSC_VER)
+ #include <sys/wait.h>
+ #include <unistd.h>
+
+ struct ForkEngine : Engine {
+ static std::unique_ptr<Engine> Factory(Options) {
+ ForkEngine engine;
+ return move_unique(engine);
+ }
+
+ bool crashproof() override { return true; }
+
+ std::future<Status> spawn(std::function<Status(void)> fn) override {
+ switch (fork()) {
+ case 0:
+ // We are the spawned child process.
+ // Run fn() and exit() with its Status as our return code.
+ _exit((int)fn());
+
+ case -1:
+ // The OS won't let us fork() another process right now.
+ // We'll need to wait for at least one live task to finish and try again.
+ return std::future<Status>();
+
+ default:
+ // We succesfully spawned a child process!
+ // This will wait for any spawned process to finish and return its Status.
+ return std::async(std::launch::deferred, [] {
+ do {
+ int status;
+ if (wait(&status) > 0) {
+ return WIFEXITED(status) ? (Status)WEXITSTATUS(status)
+ : Status::Crashed;
+ }
+ } while (errno == EINTR);
+ return Status::None;
+ });
+ }
+ }
+ };
+ static Register _fork("fork",
+ "Run each task in an independent process with fork().",
+ ForkEngine::Factory);
+#endif