diff options
author | Mike Klein <mtklein@chromium.org> | 2017-07-26 15:13:47 -0400 |
---|---|---|
committer | Skia Commit-Bot <skia-commit-bot@chromium.org> | 2017-07-26 21:36:40 +0000 |
commit | 154e6dadea0db8c72de1fe1e23718e2b6933a660 (patch) | |
tree | 0c8ee67066d5d9f3da32bac92c5dc5809f2b2f0b | |
parent | 2890fbfe1400b81e4d6af98d14dfe757fec93650 (diff) |
factor Engine out of ok core
This makes Engines (task execution strategies: serial, thread, fork)
pluggable just like most of the rest of ok. It removes the thread and
process limits, as I find myself rarely caring about what they are
exactly. Instead of limiting to num-cores, we just allow any number of
concurrent threads, and any number of concurrent child processes subject
to OS limitations.
Change-Id: Icef49d86818fe9a4b7380efb60e73e40bc2e6b73
Reviewed-on: https://skia-review.googlesource.com/27140
Reviewed-by: Mike Klein <mtklein@chromium.org>
Commit-Queue: Mike Klein <mtklein@chromium.org>
-rw-r--r-- | BUILD.gn | 5 | ||||
-rw-r--r-- | tools/ok.cpp | 153 | ||||
-rw-r--r-- | tools/ok.h | 8 | ||||
-rw-r--r-- | tools/ok_engines.cpp | 85 |
4 files changed, 154 insertions, 97 deletions
@@ -891,14 +891,14 @@ if (skia_enable_tools) { if (defined(invoker.is_shared_library) && invoker.is_shared_library) { shared_library("lib" + target_name) { forward_variables_from(invoker, "*", [ "is_shared_library" ]) - configs += [ ":skia_private", ] + configs += [ ":skia_private" ] testonly = true } } else { _executable = target_name executable(_executable) { forward_variables_from(invoker, "*", [ "is_shared_library" ]) - configs += [ ":skia_private", ] + configs += [ ":skia_private" ] testonly = true } } @@ -1268,6 +1268,7 @@ if (skia_enable_tools) { sources = [ "tools/ok.cpp", "tools/ok_dsts.cpp", + "tools/ok_engines.cpp", "tools/ok_srcs.cpp", "tools/ok_test.cpp", "tools/ok_vias.cpp", diff --git a/tools/ok.cpp b/tools/ok.cpp index edda9bede0..edb60f14db 100644 --- a/tools/ok.cpp +++ b/tools/ok.cpp @@ -13,11 +13,9 @@ #include "SkImage.h" #include "ok.h" #include <chrono> -#include <future> #include <list> #include <stdio.h> #include <stdlib.h> -#include <thread> #include <vector> #if !defined(__has_include) @@ -97,80 +95,11 @@ static thread_local const char* tls_currently_running = ""; } #endif -struct Engine { - virtual ~Engine() {} - virtual bool spawn(std::function<Status(void)>) = 0; - virtual Status wait_one() = 0; -}; - -struct SerialEngine : Engine { - Status last = Status::None; - - bool spawn(std::function<Status(void)> fn) override { - last = fn(); - return true; - } - - Status wait_one() override { - Status s = last; - last = Status::None; - return s; - } -}; - -struct ThreadEngine : Engine { - std::list<std::future<Status>> live; - const std::chrono::steady_clock::time_point the_past = std::chrono::steady_clock::now(); - - bool spawn(std::function<Status(void)> fn) override { - live.push_back(std::async(std::launch::async, fn)); - return true; - } - - Status wait_one() override { - if (live.empty()) { - return Status::None; - } - - for (;;) { - for (auto it = live.begin(); it != live.end(); it++) { - if (it->wait_until(the_past) == std::future_status::ready) { - Status s = it->get(); - live.erase(it); - return s; - } - } - } - } +struct EngineType { + const char *name, *help; + std::unique_ptr<Engine> (*factory)(Options); }; - -#if defined(_MSC_VER) - using ForkEngine = ThreadEngine; -#else - #include <sys/wait.h> - #include <unistd.h> - - struct ForkEngine : Engine { - bool spawn(std::function<Status(void)> fn) override { - switch (fork()) { - case 0: _exit((int)fn()); - case -1: return false; - default: return true; - } - } - - Status wait_one() override { - do { - int status; - if (wait(&status) > 0) { - return WIFEXITED(status) ? (Status)WEXITSTATUS(status) - : Status::Crashed; - } - } while (errno == EINTR); - return Status::None; - } - }; -#endif +static std::vector<EngineType> engine_types; struct StreamType { const char *name, *help; @@ -206,7 +135,7 @@ int main(int argc, char** argv) { SkGraphics::Init(); setup_crash_handler(); - int jobs{1}; + std::unique_ptr<Engine> engine; std::unique_ptr<Stream> stream; std::function<std::unique_ptr<Dst>(void)> dst_factory = []{ // A default Dst that's enough for unit tests and not much else. @@ -218,28 +147,36 @@ int main(int argc, char** argv) { }; auto help = [&] { - std::string stream_help = help_for(stream_types), + std::string engine_help = help_for(engine_types), + stream_help = help_for(stream_types), dst_help = help_for( dst_types), via_help = help_for( via_types); - printf("%s [-j N] src[:k=v,...] dst[:k=v,...] [via[:k=v,...] ...] \n" - " -j: Run at most N processes at any time. \n" - " If <0, use -N threads instead. \n" - " If 0, use one thread in one process. \n" - " If 1 (default) or -1, auto-detect N. \n" + printf("%s [engine] src[:k=v,...] dst[:k=v,...] [via[:k=v,...] ...] \n" + " engine: how to execute tasks%s \n" " src: content to draw%s \n" " dst: how to draw that content%s \n" " via: wrappers around dst%s \n" " Most srcs, dsts and vias have options, e.g. skp:dir=skps sw:ct=565 \n", - argv[0], stream_help.c_str(), dst_help.c_str(), via_help.c_str()); + argv[0], + engine_help.c_str(), stream_help.c_str(), dst_help.c_str(), via_help.c_str()); return 1; }; for (int i = 1; i < argc; i++) { - if (0 == strcmp("-j", argv[i])) { jobs = atoi(argv[++i]); } if (0 == strcmp("-h", argv[i])) { return help(); } if (0 == strcmp("--help", argv[i])) { return help(); } + for (auto e : engine_types) { + size_t len = strlen(e.name); + if (0 == strncmp(e.name, argv[i], len)) { + switch (argv[i][len]) { + case ':': len++; + case '\0': engine = e.factory(Options{argv[i]+len}); + } + } + } + for (auto s : stream_types) { size_t len = strlen(s.name); if (0 == strncmp(s.name, argv[i], len)) { @@ -275,12 +212,12 @@ int main(int argc, char** argv) { } if (!stream) { return help(); } - std::unique_ptr<Engine> engine; - if (jobs == 0) { engine.reset(new SerialEngine); } - if (jobs > 0) { engine.reset(new ForkEngine); defer_logging(); } - if (jobs < 0) { engine.reset(new ThreadEngine); jobs = -jobs; } + if (!engine) { engine = engine_types.back().factory(Options{}); } - if (jobs == 1) { jobs = std::thread::hardware_concurrency(); } + // If we know engine->spawn() will never crash, we can defer logging until we exit. + if (engine->crashproof()) { + defer_logging(); + } int ok = 0, failed = 0, crashed = 0, skipped = 0; @@ -306,15 +243,37 @@ int main(int argc, char** argv) { fflush(stdout); }; - auto spawn = [&](std::function<Status(void)> fn) { - if (--jobs < 0) { - update_stats(engine->wait_one()); + std::list<std::future<Status>> live; + const auto the_past = std::chrono::steady_clock::now(); + + auto wait_one = [&] { + if (live.empty()) { + return Status::None; } - while (!engine->spawn(fn)) { - update_stats(engine->wait_one()); + + for (;;) { + for (auto it = live.begin(); it != live.end(); it++) { + if (it->wait_until(the_past) != std::future_status::timeout) { + Status s = it->get(); + live.erase(it); + return s; + } + } } }; + auto spawn = [&](std::function<Status(void)> fn) { + std::future<Status> status; + for (;;) { + status = engine->spawn(fn); + if (status.valid()) { + break; + } + update_stats(wait_one()); + } + live.push_back(std::move(status)); + }; + for (std::unique_ptr<Src> owned = stream->next(); owned; owned = stream->next()) { Src* raw = owned.release(); // Can't move std::unique_ptr into a lambda in C++11. :( spawn([=] { @@ -328,7 +287,7 @@ int main(int argc, char** argv) { } for (Status s = Status::OK; s != Status::None; ) { - s = engine->wait_one(); + s = wait_one(); update_stats(s); } printf("\n"); @@ -337,6 +296,10 @@ int main(int argc, char** argv) { Register::Register(const char* name, const char* help, + std::unique_ptr<Engine> (*factory)(Options)) { + engine_types.push_back(EngineType{name, help, factory}); +} +Register::Register(const char* name, const char* help, std::unique_ptr<Stream> (*factory)(Options)) { stream_types.push_back(StreamType{name, help, factory}); } diff --git a/tools/ok.h b/tools/ok.h index f55842b3d9..502df23b4a 100644 --- a/tools/ok.h +++ b/tools/ok.h @@ -10,6 +10,7 @@ #include "SkCanvas.h" #include <functional> +#include <future> #include <map> #include <memory> #include <string> @@ -24,6 +25,12 @@ void ok_log(const char*); enum class Status { OK, Failed, Crashed, Skipped, None }; +struct Engine { + virtual ~Engine() {} + virtual bool crashproof() = 0; + virtual std::future<Status> spawn(std::function<Status(void)>) = 0; +}; + struct Src { virtual ~Src() {} virtual std::string name() = 0; @@ -52,6 +59,7 @@ public: // Create globals to register your new type of Stream or Dst. struct Register { + Register(const char* name, const char* help, std::unique_ptr<Engine> (*factory)(Options)); Register(const char* name, const char* help, std::unique_ptr<Stream> (*factory)(Options)); Register(const char* name, const char* help, std::unique_ptr<Dst> (*factory)(Options)); Register(const char* name, const char* help, diff --git a/tools/ok_engines.cpp b/tools/ok_engines.cpp new file mode 100644 index 0000000000..e2218bfa2a --- /dev/null +++ b/tools/ok_engines.cpp @@ -0,0 +1,85 @@ +/* + * Copyright 2017 Google Inc. + * + * Use of this source code is governed by a BSD-style license that can be + * found in the LICENSE file. + */ + +#include "ok.h" + +struct SerialEngine : Engine { + static std::unique_ptr<Engine> Factory(Options) { + SerialEngine engine; + return move_unique(engine); + } + + bool crashproof() override { return false; } + + std::future<Status> spawn(std::function<Status(void)> fn) override { + return std::async(std::launch::deferred, fn); + } +}; +static Register serial("serial", + "Run tasks serially on the main thread of a single process.", + SerialEngine::Factory); + +struct ThreadEngine : Engine { + static std::unique_ptr<Engine> Factory(Options) { + ThreadEngine engine; + return move_unique(engine); + } + + bool crashproof() override { return false; } + + std::future<Status> spawn(std::function<Status(void)> fn) override { + return std::async(std::launch::async, fn); + } +}; +static Register thread("thread", + "Run each task on its own thread of a single process.", + ThreadEngine::Factory); + +#if !defined(_MSC_VER) + #include <sys/wait.h> + #include <unistd.h> + + struct ForkEngine : Engine { + static std::unique_ptr<Engine> Factory(Options) { + ForkEngine engine; + return move_unique(engine); + } + + bool crashproof() override { return true; } + + std::future<Status> spawn(std::function<Status(void)> fn) override { + switch (fork()) { + case 0: + // We are the spawned child process. + // Run fn() and exit() with its Status as our return code. + _exit((int)fn()); + + case -1: + // The OS won't let us fork() another process right now. + // We'll need to wait for at least one live task to finish and try again. + return std::future<Status>(); + + default: + // We succesfully spawned a child process! + // This will wait for any spawned process to finish and return its Status. + return std::async(std::launch::deferred, [] { + do { + int status; + if (wait(&status) > 0) { + return WIFEXITED(status) ? (Status)WEXITSTATUS(status) + : Status::Crashed; + } + } while (errno == EINTR); + return Status::None; + }); + } + } + }; + static Register _fork("fork", + "Run each task in an independent process with fork().", + ForkEngine::Factory); +#endif |