diff options
author | Sree Kuchibhotla <sreecha@users.noreply.github.com> | 2016-10-30 21:41:34 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-10-30 21:41:34 -0700 |
commit | 1c55919ece40ec0cf7baa280c55273ab7fbea83f (patch) | |
tree | 64230e0782288453c18f7b2fc1b7f5d67b269187 | |
parent | de76cdbcb87c6b4e9be099e50703485af09ba785 (diff) | |
parent | 1e088b4219ac2c4e6ef60457bbf0e6db1f09cbd2 (diff) |
Merge pull request #8269 from sreecha/rpc_mgr
RPC Manager: C++ Sync server redesign
22 files changed, 1203 insertions, 198 deletions
@@ -1342,6 +1342,7 @@ cc_library( "src/cpp/common/channel_filter.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h", + "src/cpp/thread_manager/thread_manager.h", "src/cpp/client/insecure_credentials.cc", "src/cpp/client/secure_credentials.cc", "src/cpp/common/auth_property_iterator.cc", @@ -1371,6 +1372,7 @@ cc_library( "src/cpp/server/server_context.cc", "src/cpp/server/server_credentials.cc", "src/cpp/server/server_posix.cc", + "src/cpp/thread_manager/thread_manager.cc", "src/cpp/util/byte_buffer_cc.cc", "src/cpp/util/slice_cc.cc", "src/cpp/util/status.cc", @@ -1496,6 +1498,7 @@ cc_library( "src/cpp/common/channel_filter.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h", + "src/cpp/thread_manager/thread_manager.h", "src/cpp/client/cronet_credentials.cc", "src/cpp/client/insecure_credentials.cc", "src/cpp/common/insecure_create_auth_context.cc", @@ -1521,6 +1524,7 @@ cc_library( "src/cpp/server/server_context.cc", "src/cpp/server/server_credentials.cc", "src/cpp/server/server_posix.cc", + "src/cpp/thread_manager/thread_manager.cc", "src/cpp/util/byte_buffer_cc.cc", "src/cpp/util/slice_cc.cc", "src/cpp/util/status.cc", @@ -1721,6 +1725,7 @@ cc_library( "src/cpp/common/channel_filter.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h", + "src/cpp/thread_manager/thread_manager.h", "src/cpp/client/insecure_credentials.cc", "src/cpp/common/insecure_create_auth_context.cc", "src/cpp/server/insecure_server_credentials.cc", @@ -1745,6 +1750,7 @@ cc_library( "src/cpp/server/server_context.cc", "src/cpp/server/server_credentials.cc", "src/cpp/server/server_posix.cc", + "src/cpp/thread_manager/thread_manager.cc", "src/cpp/util/byte_buffer_cc.cc", "src/cpp/util/slice_cc.cc", "src/cpp/util/status.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 0550a3b2ec..e2e75ed60b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1074,6 +1074,7 @@ add_library(grpc++ src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/server_posix.cc + src/cpp/thread_manager/thread_manager.cc src/cpp/util/byte_buffer_cc.cc src/cpp/util/slice_cc.cc src/cpp/util/status.cc @@ -1239,6 +1240,7 @@ add_library(grpc++_cronet src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/server_posix.cc + src/cpp/thread_manager/thread_manager.cc src/cpp/util/byte_buffer_cc.cc src/cpp/util/slice_cc.cc src/cpp/util/status.cc @@ -1495,6 +1497,7 @@ add_library(grpc++_unsecure src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/server_posix.cc + src/cpp/thread_manager/thread_manager.cc src/cpp/util/byte_buffer_cc.cc src/cpp/util/slice_cc.cc src/cpp/util/status.cc @@ -1086,6 +1086,7 @@ shutdown_test: $(BINDIR)/$(CONFIG)/shutdown_test status_test: $(BINDIR)/$(CONFIG)/status_test streaming_throughput_test: $(BINDIR)/$(CONFIG)/streaming_throughput_test stress_test: $(BINDIR)/$(CONFIG)/stress_test +thread_manager_test: $(BINDIR)/$(CONFIG)/thread_manager_test thread_stress_test: $(BINDIR)/$(CONFIG)/thread_stress_test public_headers_must_be_c89: $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 boringssl_aes_test: $(BINDIR)/$(CONFIG)/boringssl_aes_test @@ -1463,6 +1464,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/status_test \ $(BINDIR)/$(CONFIG)/streaming_throughput_test \ $(BINDIR)/$(CONFIG)/stress_test \ + $(BINDIR)/$(CONFIG)/thread_manager_test \ $(BINDIR)/$(CONFIG)/thread_stress_test \ $(BINDIR)/$(CONFIG)/boringssl_aes_test \ $(BINDIR)/$(CONFIG)/boringssl_asn1_test \ @@ -1551,6 +1553,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/status_test \ $(BINDIR)/$(CONFIG)/streaming_throughput_test \ $(BINDIR)/$(CONFIG)/stress_test \ + $(BINDIR)/$(CONFIG)/thread_manager_test \ $(BINDIR)/$(CONFIG)/thread_stress_test \ endif @@ -1865,6 +1868,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/status_test || ( echo test status_test failed ; exit 1 ) $(E) "[RUN] Testing streaming_throughput_test" $(Q) $(BINDIR)/$(CONFIG)/streaming_throughput_test || ( echo test streaming_throughput_test failed ; exit 1 ) + $(E) "[RUN] Testing thread_manager_test" + $(Q) $(BINDIR)/$(CONFIG)/thread_manager_test || ( echo test thread_manager_test failed ; exit 1 ) $(E) "[RUN] Testing thread_stress_test" $(Q) $(BINDIR)/$(CONFIG)/thread_stress_test || ( echo test thread_stress_test failed ; exit 1 ) @@ -3711,6 +3716,7 @@ LIBGRPC++_SRC = \ src/cpp/server/server_context.cc \ src/cpp/server/server_credentials.cc \ src/cpp/server/server_posix.cc \ + src/cpp/thread_manager/thread_manager.cc \ src/cpp/util/byte_buffer_cc.cc \ src/cpp/util/slice_cc.cc \ src/cpp/util/status.cc \ @@ -3905,6 +3911,7 @@ LIBGRPC++_CRONET_SRC = \ src/cpp/server/server_context.cc \ src/cpp/server/server_credentials.cc \ src/cpp/server/server_posix.cc \ + src/cpp/thread_manager/thread_manager.cc \ src/cpp/util/byte_buffer_cc.cc \ src/cpp/util/slice_cc.cc \ src/cpp/util/status.cc \ @@ -4486,6 +4493,7 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/server/server_context.cc \ src/cpp/server/server_credentials.cc \ src/cpp/server/server_posix.cc \ + src/cpp/thread_manager/thread_manager.cc \ src/cpp/util/byte_buffer_cc.cc \ src/cpp/util/slice_cc.cc \ src/cpp/util/status.cc \ @@ -13522,6 +13530,49 @@ $(OBJDIR)/$(CONFIG)/test/cpp/interop/stress_test.o: $(GENDIR)/src/proto/grpc/tes $(OBJDIR)/$(CONFIG)/test/cpp/util/metrics_server.o: $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/metrics.pb.cc $(GENDIR)/src/proto/grpc/testing/metrics.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc +THREAD_MANAGER_TEST_SRC = \ + test/cpp/thread_manager/thread_manager_test.cc \ + +THREAD_MANAGER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(THREAD_MANAGER_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/thread_manager_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/thread_manager_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/thread_manager_test: $(PROTOBUF_DEP) $(THREAD_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(THREAD_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/thread_manager_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/thread_manager/thread_manager_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a + +deps_thread_manager_test: $(THREAD_MANAGER_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(THREAD_MANAGER_TEST_OBJS:.o=.dep) +endif +endif + + THREAD_STRESS_TEST_SRC = \ test/cpp/end2end/thread_stress_test.cc \ diff --git a/build.yaml b/build.yaml index b5856d477b..2824391ef2 100644 --- a/build.yaml +++ b/build.yaml @@ -742,6 +742,7 @@ filegroups: - src/cpp/common/channel_filter.h - src/cpp/server/dynamic_thread_pool.h - src/cpp/server/thread_pool_interface.h + - src/cpp/thread_manager/thread_manager.h src: - src/cpp/client/channel_cc.cc - src/cpp/client/client_context.cc @@ -764,6 +765,7 @@ filegroups: - src/cpp/server/server_context.cc - src/cpp/server/server_credentials.cc - src/cpp/server/server_posix.cc + - src/cpp/thread_manager/thread_manager.cc - src/cpp/util/byte_buffer_cc.cc - src/cpp/util/slice_cc.cc - src/cpp/util/status.cc @@ -3490,6 +3492,16 @@ targets: - gpr_test_util - gpr - grpc++_test_config +- name: thread_manager_test + build: test + language: c++ + src: + - test/cpp/thread_manager/thread_manager_test.cc + deps: + - grpc++ + - grpc + - gpr + - grpc++_test_config - name: thread_stress_test gtest: true cpu_cost: 100 diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 4a00d7a3a1..5c41ca51b4 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -126,12 +126,6 @@ class ServerInterface : public CallHook { /// \return true on a successful shutdown. virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0; - /// Process one or more incoming calls. - virtual void RunRpc() = 0; - - /// Schedule \a RunRpc to run in the threadpool. - virtual void ScheduleCallback() = 0; - virtual void ShutdownInternal(gpr_timespec deadline) = 0; virtual int max_receive_message_size() const = 0; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index f51a6c658f..a6d70c7577 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -105,18 +105,41 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { class AsyncRequest; class ShutdownRequest; + /// SyncRequestThreadManager is an implementation of ThreadManager. This class + /// is responsible for polling for incoming RPCs and calling the RPC handlers. + /// This is only used in case of a Sync server (i.e a server exposing a sync + /// interface) + class SyncRequestThreadManager; + class UnimplementedAsyncRequestContext; class UnimplementedAsyncRequest; class UnimplementedAsyncResponse; /// Server constructors. To be used by \a ServerBuilder only. /// - /// \param thread_pool The threadpool instance to use for call processing. - /// \param thread_pool_owned Does the server own the \a thread_pool instance? - /// \param max_receive_message_size Maximum message length that the channel - /// can receive. - Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_receive_message_size, ChannelArguments* args); + /// \param max_message_size Maximum message length that the channel can + /// receive. + /// + /// \param args The channel args + /// + /// \param sync_server_cqs The completion queues to use if the server is a + /// synchronous server (or a hybrid server). The server polls for new RPCs on + /// these queues + /// + /// \param min_pollers The minimum number of polling threads per server + /// completion queue (in param sync_server_cqs) to use for listening to + /// incoming requests (used only in case of sync server) + /// + /// \param max_pollers The maximum number of polling threads per server + /// completion queue (in param sync_server_cqs) to use for listening to + /// incoming requests (used only in case of sync server) + /// + /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on + /// server completion queues passed via sync_server_cqs param. + Server(int max_message_size, ChannelArguments* args, + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs, + int min_pollers, int max_pollers, int sync_cq_timeout_msec); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -151,12 +174,6 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { /// \return true on a successful shutdown. bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE; - /// Process one or more incoming calls. - void RunRpc() GRPC_OVERRIDE; - - /// Schedule \a RunRpc to run in the threadpool. - void ScheduleCallback() GRPC_OVERRIDE; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE; @@ -171,34 +188,31 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { const int max_receive_message_size_; - // Completion queue. - CompletionQueue cq_; + /// The following completion queues are ONLY used in case of Sync API i.e if + /// the server has any services with sync methods. The server uses these + /// completion queues to poll for new RPCs + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs_; + + /// List of ThreadManager instances (one for each cq in the sync_server_cqs) + std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; // Sever status grpc::mutex mu_; bool started_; bool shutdown_; - bool shutdown_notified_; - // The number of threads which are running callbacks. - int num_running_cb_; - grpc::condition_variable callback_cv_; + bool shutdown_notified_; // Was notify called on the shutdown_cv_ grpc::condition_variable shutdown_cv_; std::shared_ptr<GlobalCallbacks> global_callbacks_; - std::list<SyncRequest>* sync_methods_; std::vector<grpc::string> services_; - std::unique_ptr<RpcServiceMethod> unknown_method_; bool has_generic_service_; - // Pointer to the c grpc server. + // Pointer to the wrapped grpc_server. grpc_server* server_; - ThreadPoolInterface* thread_pool_; - // Whether the thread pool is created and owned by the server. - bool thread_pool_owned_; - std::unique_ptr<ServerInitializer> server_initializer_; }; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 15333df60e..9252c6a63a 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -34,6 +34,7 @@ #ifndef GRPCXX_SERVER_BUILDER_H #define GRPCXX_SERVER_BUILDER_H +#include <climits> #include <map> #include <memory> #include <vector> @@ -42,6 +43,8 @@ #include <grpc++/impl/server_builder_plugin.h> #include <grpc++/support/config.h> #include <grpc/compression.h> +#include <grpc/support/cpu.h> +#include <grpc/support/useful.h> struct grpc_resource_quota; @@ -66,6 +69,8 @@ class ServerBuilder { ServerBuilder(); ~ServerBuilder(); + enum SyncServerOption { NUM_CQS, MIN_POLLERS, MAX_POLLERS, CQ_TIMEOUT_MSEC }; + /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the \a Server instance returned /// by \a BuildAndStart(). @@ -122,6 +127,9 @@ class ServerBuilder { ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option); + /// Only useful if this is a Synchronous server. + ServerBuilder& SetSyncServerOption(SyncServerOption option, int value); + /// Tries to bind \a server to the given \a addr. /// /// It can be invoked multiple times. @@ -177,6 +185,28 @@ class ServerBuilder { int* selected_port; }; + struct SyncServerSettings { + SyncServerSettings() + : num_cqs(GPR_MAX(gpr_cpu_num_cores(), 4)), + min_pollers(1), + max_pollers(INT_MAX), + cq_timeout_msec(1000) {} + + // Number of server completion queues to create to listen to incoming RPCs. + int num_cqs; + + // Minimum number of threads per completion queue that should be listening + // to incoming RPCs. + int min_pollers; + + // Maximum number of threads per completion queue that can be listening to + // incoming RPCs. + int max_pollers; + + // The timeout for server completion queue's AsyncNext call. + int cq_timeout_msec; + }; + typedef std::unique_ptr<grpc::string> HostString; struct NamedService { explicit NamedService(Service* s) : service(s) {} @@ -191,7 +221,12 @@ class ServerBuilder { std::vector<std::unique_ptr<ServerBuilderOption>> options_; std::vector<std::unique_ptr<NamedService>> services_; std::vector<Port> ports_; + + SyncServerSettings sync_server_settings_; + + // List of completion queues added via AddCompletionQueue() method std::vector<ServerCompletionQueue*> cqs_; + std::shared_ptr<ServerCredentials> creds_; std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_; grpc_resource_quota* resource_quota_; diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 953a4337ec..00a90bb184 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -36,6 +36,7 @@ #include <grpc++/impl/service_type.h> #include <grpc++/resource_quota.h> #include <grpc++/server.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -55,6 +56,7 @@ static void do_plugin_list_init(void) { ServerBuilder::ServerBuilder() : max_receive_message_size_(-1), max_send_message_size_(-1), + sync_server_settings_(SyncServerSettings()), resource_quota_(nullptr), generic_service_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); @@ -63,6 +65,7 @@ ServerBuilder::ServerBuilder() auto& factory = *it; plugins_.emplace_back(factory()); } + // all compression algorithms enabled by default. enabled_compression_algorithms_bitset_ = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; @@ -102,7 +105,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", - service); + (void*)service); } else { generic_service_ = service; } @@ -115,6 +118,25 @@ ServerBuilder& ServerBuilder::SetOption( return *this; } +ServerBuilder& ServerBuilder::SetSyncServerOption( + ServerBuilder::SyncServerOption option, int val) { + switch (option) { + case NUM_CQS: + sync_server_settings_.num_cqs = val; + break; + case MIN_POLLERS: + sync_server_settings_.min_pollers = val; + break; + case MAX_POLLERS: + sync_server_settings_.max_pollers = val; + break; + case CQ_TIMEOUT_MSEC: + sync_server_settings_.cq_timeout_msec = val; + break; + } + return *this; +} + ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus( grpc_compression_algorithm algorithm, bool enabled) { if (enabled) { @@ -157,35 +179,24 @@ ServerBuilder& ServerBuilder::AddListeningPort( } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { - std::unique_ptr<ThreadPoolInterface> thread_pool; - bool has_sync_methods = false; - for (auto it = services_.begin(); it != services_.end(); ++it) { - if ((*it)->service->has_synchronous_methods()) { - if (!thread_pool) { - thread_pool.reset(CreateDefaultThreadPool()); - has_sync_methods = true; - break; - } - } - } ChannelArguments args; for (auto option = options_.begin(); option != options_.end(); ++option) { (*option)->UpdateArguments(&args); (*option)->UpdatePlugins(&plugins_); } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { - if (!thread_pool && (*plugin)->has_sync_methods()) { - thread_pool.reset(CreateDefaultThreadPool()); - has_sync_methods = true; - } (*plugin)->UpdateChannelArguments(&args); } + if (max_receive_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } + if (max_send_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_); } + args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, enabled_compression_algorithms_bitset_); if (maybe_default_compression_level_.is_set) { @@ -196,31 +207,89 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, maybe_default_compression_algorithm_.algorithm); } + if (resource_quota_ != nullptr) { args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_, grpc_resource_quota_arg_vtable()); } - std::unique_ptr<Server> server(new Server(thread_pool.release(), true, - max_receive_message_size_, &args)); + + // == Determine if the server has any syncrhonous methods == + bool has_sync_methods = false; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_synchronous_methods()) { + has_sync_methods = true; + break; + } + } + + if (!has_sync_methods) { + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { + if ((*plugin)->has_sync_methods()) { + has_sync_methods = true; + break; + } + } + } + + // If this is a Sync server, i.e a server expositing sync API, then the server + // needs to create some completion queues to listen for incoming requests. + // 'sync_server_cqs' are those internal completion queues. + // + // This is different from the completion queues added to the server via + // ServerBuilder's AddCompletionQueue() method (those completion queues + // are in 'cqs_' member variable of ServerBuilder object) + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs(std::make_shared< + std::vector<std::unique_ptr<ServerCompletionQueue>>>()); + + if (has_sync_methods) { + // This is a Sync server + gpr_log(GPR_INFO, + "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: " + "%d, CQ timeout (msec): %d", + sync_server_settings_.num_cqs, sync_server_settings_.min_pollers, + sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec); + + // Create completion queues to listen to incoming rpc requests + for (int i = 0; i < sync_server_settings_.num_cqs; i++) { + sync_server_cqs->emplace_back(new ServerCompletionQueue()); + } + } + + std::unique_ptr<Server> server(new Server( + max_receive_message_size_, &args, sync_server_cqs, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec)); + ServerInitializer* initializer = server->initializer(); - // If the server has atleast one sync methods, we know that this is a Sync - // server or a Hybrid server and the completion queue (server->cq_) would be - // frequently polled. - int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; - - for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - // A completion queue that is not polled frequently (by calling Next() or - // AsyncNext()) is not safe to use for listening to incoming channels. - // Register all such completion queues as non-listening completion queues - // with the GRPC core library. - if ((*cq)->IsFrequentlyPolled()) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + // Register all the completion queues with the server. i.e + // 1. sync_server_cqs: internal completion queues created IF this is a sync + // server + // 2. cqs_: Completion queues added via AddCompletionQueue() call + + // All sync cqs (if any) are frequently polled by ThreadManager + int num_frequently_polled_cqs = sync_server_cqs->size(); + + for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), + nullptr); + } + + // cqs_ contains the completion queue added by calling the ServerBuilder's + // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by + // calling Next() or AsyncNext()) and hence are not safe to be used for + // listening to incoming channels. Such completion queues must be registered + // as non-listening queues + for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { + if ((*it)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), nullptr); num_frequently_polled_cqs++; } else { grpc_server_register_non_listening_completion_queue(server->server_, - (*cq)->cq(), nullptr); + (*it)->cq(), nullptr); } } @@ -236,9 +305,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->InitServer(initializer); } + if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); } else { @@ -251,6 +322,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } } } + for (auto port = ports_.begin(); port != ports_.end(); port++) { int r = server->AddListeningPort(port->addr, port->creds.get()); if (!r) return nullptr; @@ -258,13 +330,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { *port->selected_port = r; } } + auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; if (!server->Start(cqs_data, cqs_.size())) { return nullptr; } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->Finish(initializer); } + return server; } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 3f89275370..d46942d257 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -1,5 +1,4 @@ /* - * * Copyright 2015, Google Inc. * All rights reserved. * @@ -52,7 +51,7 @@ #include <grpc/support/log.h> #include "src/core/lib/profiling/timers.h" -#include "src/cpp/server/thread_pool_interface.h" +#include "src/cpp/thread_manager/thread_manager.h" namespace grpc { @@ -118,12 +117,9 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL UnimplementedAsyncRequest* const request_; }; -class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { +class ShutdownTag : public CompletionQueueTag { public: - bool FinalizeResult(void** tag, bool* status) { - delete this; - return false; - } + bool FinalizeResult(void** tag, bool* status) { return false; } }; class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { @@ -147,36 +143,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_metadata_array_destroy(&request_metadata_); } - static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { - void* tag = nullptr; - *ok = false; - if (!cq->Next(&tag, ok)) { - return nullptr; - } - auto* mrd = static_cast<SyncRequest*>(tag); - GPR_ASSERT(mrd->in_flight_); - return mrd; - } - - static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, - gpr_timespec deadline) { - void* tag = nullptr; - *ok = false; - switch (cq->AsyncNext(&tag, ok, deadline)) { - case CompletionQueue::TIMEOUT: - *req = nullptr; - return true; - case CompletionQueue::SHUTDOWN: - *req = nullptr; - return false; - case CompletionQueue::GOT_EVENT: - *req = static_cast<SyncRequest*>(tag); - GPR_ASSERT((*req)->in_flight_); - return true; - } - GPR_UNREACHABLE_CODE(return false); - } - void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -266,7 +232,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; bool in_flight_; const bool has_request_payload_; - uint32_t incoming_flags_; grpc_call* call_; grpc_call_details* call_details_; gpr_timespec deadline_; @@ -275,33 +240,141 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; +// Implementation of ThreadManager. Each instance of SyncRequestThreadManager +// manages a pool of threads that poll for incoming Sync RPCs and call the +// appropriate RPC handlers +class Server::SyncRequestThreadManager : public ThreadManager { + public: + SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, + std::shared_ptr<GlobalCallbacks> global_callbacks, + int min_pollers, int max_pollers, + int cq_timeout_msec) + : ThreadManager(min_pollers, max_pollers), + server_(server), + server_cq_(server_cq), + cq_timeout_msec_(cq_timeout_msec), + global_callbacks_(global_callbacks) {} + + WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE { + *tag = nullptr; + gpr_timespec deadline = + gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN); + + switch (server_cq_->AsyncNext(tag, ok, deadline)) { + case CompletionQueue::TIMEOUT: + return TIMEOUT; + case CompletionQueue::SHUTDOWN: + return SHUTDOWN; + case CompletionQueue::GOT_EVENT: + return WORK_FOUND; + } + + GPR_UNREACHABLE_CODE(return TIMEOUT); + } + + void DoWork(void* tag, bool ok) GRPC_OVERRIDE { + SyncRequest* sync_req = static_cast<SyncRequest*>(tag); + + if (!sync_req) { + // No tag. Nothing to work on. This is an unlikley scenario and possibly a + // bug in RPC Manager implementation. + gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag"); + return; + } + + if (ok) { + // Calldata takes ownership of the completion queue inside sync_req + SyncRequest::CallData cd(server_, sync_req); + { + // Prepare for the next request + if (!IsShutdown()) { + sync_req->SetupRequest(); // Create new completion queue for sync_req + sync_req->Request(server_->c_server(), server_cq_->cq()); + } + } + + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); + } + // TODO (sreek) If ok is false here (which it isn't in case of + // grpc_request_registered_call), we should still re-queue the request + // object + } + + void AddSyncMethod(RpcServiceMethod* method, void* tag) { + sync_requests_.emplace_back(new SyncRequest(method, tag)); + } + + void AddUnknownSyncMethod() { + if (!sync_requests_.empty()) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + sync_requests_.emplace_back( + new SyncRequest(unknown_method_.get(), nullptr)); + } + } + + void ShutdownAndDrainCompletionQueue() { + server_cq_->Shutdown(); + + // Drain any pending items from the queue + void* tag; + bool ok; + while (server_cq_->Next(&tag, &ok)) { + // Nothing to be done here + } + } + + void Start() { + if (!sync_requests_.empty()) { + for (auto m = sync_requests_.begin(); m != sync_requests_.end(); m++) { + (*m)->SetupRequest(); + (*m)->Request(server_->c_server(), server_cq_->cq()); + } + + Initialize(); // ThreadManager's Initialize() + } + } + + private: + Server* server_; + CompletionQueue* server_cq_; + int cq_timeout_msec_; + std::vector<std::unique_ptr<SyncRequest>> sync_requests_; + std::unique_ptr<RpcServiceMethod> unknown_method_; + std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; +}; + static internal::GrpcLibraryInitializer g_gli_initializer; -Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_receive_message_size, ChannelArguments* args) +Server::Server( + int max_receive_message_size, ChannelArguments* args, + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs, + int min_pollers, int max_pollers, int sync_cq_timeout_msec) : max_receive_message_size_(max_receive_message_size), + sync_server_cqs_(sync_server_cqs), started_(false), shutdown_(false), shutdown_notified_(false), - num_running_cb_(0), - sync_methods_(new std::list<SyncRequest>), has_generic_service_(false), server_(nullptr), - thread_pool_(thread_pool), - thread_pool_owned_(thread_pool_owned), server_initializer_(new ServerInitializer(this)) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; global_callbacks_->UpdateArguments(args); + + for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); + it++) { + sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( + this, (*it).get(), global_callbacks_, min_pollers, max_pollers, + sync_cq_timeout_msec)); + } + grpc_channel_args channel_args; args->SetChannelArgs(&channel_args); + server_ = grpc_server_create(&channel_args, nullptr); - if (thread_pool_ == nullptr) { - grpc_server_register_non_listening_completion_queue(server_, cq_.cq(), - nullptr); - } else { - grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); - } } Server::~Server() { @@ -311,17 +384,14 @@ Server::~Server() { lock.unlock(); Shutdown(); } else if (!started_) { - cq_.Shutdown(); + // Shutdown the completion queues + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->ShutdownAndDrainCompletionQueue(); + } } } - void* got_tag; - bool ok; - GPR_ASSERT(!cq_.Next(&got_tag, &ok)); + grpc_server_destroy(server_); - if (thread_pool_owned_) { - delete thread_pool_; - } - delete sync_methods_; } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { @@ -352,12 +422,14 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { "Can only register an asynchronous service against one server."); service->server_ = this; } + const char* method_name = nullptr; for (auto it = service->methods_.begin(); it != service->methods_.end(); ++it) { if (it->get() == nullptr) { // Handled by generic service if any. continue; } + RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, @@ -367,11 +439,15 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { method->name()); return false; } - if (method->handler() == nullptr) { + + if (method->handler() == nullptr) { // Async method method->set_server_tag(tag); } else { - sync_methods_->emplace_back(method, tag); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->AddSyncMethod(method, tag); + } } + method_name = method->name(); } @@ -406,28 +482,19 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { grpc_server_start(server_); if (!has_generic_service_) { - if (!sync_methods_->empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - // Use of emplace_back with just constructor arguments is not accepted - // here by gcc-4.4 because it can't match the anonymous nullptr with a - // proper constructor implicitly. Construct the object and use push_back. - sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->AddUnknownSyncMethod(); } + for (size_t i = 0; i < num_cqs; i++) { if (cqs[i]->IsFrequentlyPolled()) { new UnimplementedAsyncRequest(this, cqs[i]); } } } - // Start processing rpcs. - if (!sync_methods_->empty()) { - for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { - m->SetupRequest(); - m->Request(server_, cq_.cq()); - } - ScheduleCallback(); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Start(); } return true; @@ -437,29 +504,43 @@ void Server::ShutdownInternal(gpr_timespec deadline) { grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; - grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); - cq_.Shutdown(); - lock.unlock(); - // Spin, eating requests until the completion queue is completely shutdown. - // If the deadline expires then cancel anything that's pending and keep - // spinning forever until the work is actually drained. - // Since nothing else needs to touch state guarded by mu_, holding it - // through this loop is fine. - SyncRequest* request; + + /// The completion queue to use for server shutdown completion notification + CompletionQueue shutdown_cq; + ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); + + // Shutdown all ThreadManagers. This will try to gracefully stop all the + // threads in the ThreadManagers (once they process any inflight requests) + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Shutdown(); // ThreadManager's Shutdown() + } + + shutdown_cq.Shutdown(); + + void* tag; bool ok; - while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { - if (request == NULL) { // deadline expired - grpc_server_cancel_all_calls(server_); - deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - } else if (ok) { - SyncRequest::CallData call_data(this, request); - } + CompletionQueue::NextStatus status = + shutdown_cq.AsyncNext(&tag, &ok, deadline); + + // If this timed out, it means we are done with the grace period for a clean + // shutdown. We should force a shutdown now by cancelling all inflight calls + if (status == CompletionQueue::NextStatus::TIMEOUT) { + grpc_server_cancel_all_calls(server_); } - lock.lock(); + // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has + // successfully shutdown - // Wait for running callbacks to finish. - while (num_running_cb_ != 0) { - callback_cv_.wait(lock); + // Wait for threads in all ThreadManagers to terminate + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Wait(); + (*it)->ShutdownAndDrainCompletionQueue(); + } + + // Drain the shutdown queue (if the previous call to AsyncNext() timed out + // and we didn't remove the tag from the queue yet) + while (shutdown_cq.Next(&tag, &ok)) { + // Nothing to be done here. Just ignore ok and tag values } shutdown_notified_ = true; @@ -585,47 +666,6 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( request_->stream()->call_.PerformOps(this); } -void Server::ScheduleCallback() { - { - grpc::unique_lock<grpc::mutex> lock(mu_); - num_running_cb_++; - } - thread_pool_->Add(std::bind(&Server::RunRpc, this)); -} - -void Server::RunRpc() { - // Wait for one more incoming rpc. - bool ok; - GPR_TIMER_SCOPE("Server::RunRpc", 0); - auto* mrd = SyncRequest::Wait(&cq_, &ok); - if (mrd) { - ScheduleCallback(); - if (ok) { - SyncRequest::CallData cd(this, mrd); - { - mrd->SetupRequest(); - grpc::unique_lock<grpc::mutex> lock(mu_); - if (!shutdown_) { - mrd->Request(server_, cq_.cq()); - } else { - // destroy the structure that was created - mrd->TeardownRequest(); - } - } - GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); - } - } - - { - grpc::unique_lock<grpc::mutex> lock(mu_); - num_running_cb_--; - if (shutdown_) { - callback_cv_.notify_all(); - } - } -} - ServerInitializer* Server::initializer() { return server_initializer_.get(); } } // namespace grpc diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc new file mode 100644 index 0000000000..caae4c457d --- /dev/null +++ b/src/cpp/thread_manager/thread_manager.cc @@ -0,0 +1,181 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc/support/log.h> +#include <climits> + +#include "src/cpp/thread_manager/thread_manager.h" + +namespace grpc { + +ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr) + : thd_mgr_(thd_mgr), thd_(&ThreadManager::WorkerThread::Run, this) {} + +void ThreadManager::WorkerThread::Run() { + thd_mgr_->MainWorkLoop(); + thd_mgr_->MarkAsCompleted(this); +} + +ThreadManager::WorkerThread::~WorkerThread() { thd_.join(); } + +ThreadManager::ThreadManager(int min_pollers, int max_pollers) + : shutdown_(false), + num_pollers_(0), + min_pollers_(min_pollers), + max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), + num_threads_(0) {} + +ThreadManager::~ThreadManager() { + { + std::unique_lock<grpc::mutex> lock(mu_); + GPR_ASSERT(num_threads_ == 0); + } + + CleanupCompletedThreads(); +} + +void ThreadManager::Wait() { + std::unique_lock<grpc::mutex> lock(mu_); + while (num_threads_ != 0) { + shutdown_cv_.wait(lock); + } +} + +void ThreadManager::Shutdown() { + std::unique_lock<grpc::mutex> lock(mu_); + shutdown_ = true; +} + +bool ThreadManager::IsShutdown() { + std::unique_lock<grpc::mutex> lock(mu_); + return shutdown_; +} + +void ThreadManager::MarkAsCompleted(WorkerThread* thd) { + { + std::unique_lock<grpc::mutex> list_lock(list_mu_); + completed_threads_.push_back(thd); + } + + grpc::unique_lock<grpc::mutex> lock(mu_); + num_threads_--; + if (num_threads_ == 0) { + shutdown_cv_.notify_one(); + } +} + +void ThreadManager::CleanupCompletedThreads() { + std::unique_lock<grpc::mutex> lock(list_mu_); + for (auto thd = completed_threads_.begin(); thd != completed_threads_.end(); + thd = completed_threads_.erase(thd)) { + delete *thd; + } +} + +void ThreadManager::Initialize() { + for (int i = 0; i < min_pollers_; i++) { + MaybeCreatePoller(); + } +} + +// If the number of pollers (i.e threads currently blocked in PollForWork()) is +// less than max threshold (i.e max_pollers_) and the total number of threads is +// below the maximum threshold, we can let the current thread continue as poller +bool ThreadManager::MaybeContinueAsPoller() { + std::unique_lock<grpc::mutex> lock(mu_); + if (shutdown_ || num_pollers_ > max_pollers_) { + return false; + } + + num_pollers_++; + return true; +} + +// Create a new poller if the current number of pollers i.e num_pollers_ (i.e +// threads currently blocked in PollForWork()) is below the threshold (i.e +// min_pollers_) and the total number of threads is below the maximum threshold +void ThreadManager::MaybeCreatePoller() { + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_ && num_pollers_ < min_pollers_) { + num_pollers_++; + num_threads_++; + + // Create a new thread (which ends up calling the MainWorkLoop() function + new WorkerThread(this); + } +} + +void ThreadManager::MainWorkLoop() { + void* tag; + bool ok; + + /* + 1. Poll for work (i.e PollForWork()) + 2. After returning from PollForWork, reduce the number of pollers by 1. If + PollForWork() returned a TIMEOUT, then it may indicate that we have more + polling threads than needed. Check if the number of pollers is greater + than min_pollers and if so, terminate the thread. + 3. Since we are short of one poller now, see if a new poller has to be + created (i.e see MaybeCreatePoller() for more details) + 4. Do the actual work (DoWork()) + 5. After doing the work, see it this thread can resume polling work (i.e + see MaybeContinueAsPoller() for more details) */ + do { + WorkStatus work_status = PollForWork(&tag, &ok); + + { + grpc::unique_lock<grpc::mutex> lock(mu_); + num_pollers_--; + + if (work_status == TIMEOUT && num_pollers_ > min_pollers_) { + break; + } + } + + // Note that MaybeCreatePoller does check for shutdown and creates a new + // thread only if ThreadManager is not shutdown + if (work_status == WORK_FOUND) { + MaybeCreatePoller(); + DoWork(tag, ok); + } + } while (MaybeContinueAsPoller()); + + CleanupCompletedThreads(); + + // If we are here, either ThreadManager is shutting down or it already has + // enough threads. +} + +} // namespace grpc diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h new file mode 100644 index 0000000000..9cfdb8af25 --- /dev/null +++ b/src/cpp/thread_manager/thread_manager.h @@ -0,0 +1,159 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H +#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H + +#include <list> +#include <memory> + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc++/support/config.h> + +namespace grpc { + +class ThreadManager { + public: + explicit ThreadManager(int min_pollers, int max_pollers); + virtual ~ThreadManager(); + + // Initializes and Starts the Rpc Manager threads + void Initialize(); + + // The return type of PollForWork() function + enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT }; + + // "Polls" for new work. + // If the return value is WORK_FOUND: + // - The implementaion of PollForWork() MAY set some opaque identifier to + // (identify the work item found) via the '*tag' parameter + // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A + // value of 'false' indicates some implemenation specific error (that is + // neither SHUTDOWN nor TIMEOUT) + // - ThreadManager does not interpret the values of 'tag' and 'ok' + // - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to + // DoWork() + // + // If the return value is SHUTDOWN:, + // - ThreadManager WILL NOT call DoWork() and terminates the thead + // + // If the return value is TIMEOUT:, + // - ThreadManager WILL NOT call DoWork() + // - ThreadManager MAY terminate the thread depending on the current number + // of active poller threads and mix_pollers/max_pollers settings + // - Also, the value of timeout is specific to the derived class + // implementation + virtual WorkStatus PollForWork(void** tag, bool* ok) = 0; + + // The implementation of DoWork() is supposed to perform the work found by + // PollForWork(). The tag and ok parameters are the same as returned by + // PollForWork() + // + // The implementation of DoWork() should also do any setup needed to ensure + // that the next call to PollForWork() (not necessarily by the current thread) + // actually finds some work + virtual void DoWork(void* tag, bool ok) = 0; + + // Mark the ThreadManager as shutdown and begin draining the work. This is a + // non-blocking call and the caller should call Wait(), a blocking call which + // returns only once the shutdown is complete + void Shutdown(); + + // Has Shutdown() been called + bool IsShutdown(); + + // A blocking call that returns only after the ThreadManager has shutdown and + // all the threads have drained all the outstanding work + void Wait(); + + private: + // Helper wrapper class around std::thread. This takes a ThreadManager object + // and starts a new std::thread to calls the Run() function. + // + // The Run() function calls ThreadManager::MainWorkLoop() function and once + // that completes, it marks the WorkerThread completed by calling + // ThreadManager::MarkAsCompleted() + class WorkerThread { + public: + WorkerThread(ThreadManager* thd_mgr); + ~WorkerThread(); + + private: + // Calls thd_mgr_->MainWorkLoop() and once that completes, calls + // thd_mgr_>MarkAsCompleted(this) to mark the thread as completed + void Run(); + + ThreadManager* thd_mgr_; + grpc::thread thd_; + }; + + // The main funtion in ThreadManager + void MainWorkLoop(); + + // Create a new poller if the number of current pollers is less than the + // minimum number of pollers needed (i.e min_pollers). + void MaybeCreatePoller(); + + // Returns true if the current thread can resume as a poller. i.e if the + // current number of pollers is less than the max_pollers. + bool MaybeContinueAsPoller(); + + void MarkAsCompleted(WorkerThread* thd); + void CleanupCompletedThreads(); + + // Protects shutdown_, num_pollers_ and num_threads_ + // TODO: sreek - Change num_pollers and num_threads_ to atomics + grpc::mutex mu_; + + bool shutdown_; + grpc::condition_variable shutdown_cv_; + + // Number of threads doing polling + int num_pollers_; + + // The minimum and maximum number of threads that should be doing polling + int min_pollers_; + int max_pollers_; + + // The total number of threads (includes threads includes the threads that are + // currently polling i.e num_pollers_) + int num_threads_; + + grpc::mutex list_mu_; + std::list<WorkerThread*> completed_threads_; +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_THREAD_MANAGER_H diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index feadabd865..43b7d44255 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -250,6 +250,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { builder.RegisterService(&service_); builder.RegisterService("foo.test.youtube.com", &special_service_); builder.RegisterService(&dup_pkg_service_); + + builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4); + builder.SetSyncServerOption( + ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10); + server_ = builder.BuildAndStart(); is_server_started_ = true; } @@ -284,6 +289,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { ServerBuilder builder; builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials()); builder.RegisterService(proxy_service_.get()); + + builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4); + builder.SetSyncServerOption( + ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10); + proxy_server_ = builder.BuildAndStart(); channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials()); diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc new file mode 100644 index 0000000000..5c70103947 --- /dev/null +++ b/test/cpp/thread_manager/thread_manager_test.cc @@ -0,0 +1,136 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + *is % allowed in string + */ + +#include <memory> +#include <string> + +#include <gflags/gflags.h> +#include <grpc++/grpc++.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +#include "src/cpp/thread_manager/thread_manager.h" +#include "test/cpp/util/test_config.h" + +namespace grpc { +class ThreadManagerTest GRPC_FINAL : public grpc::ThreadManager { + public: + ThreadManagerTest() + : ThreadManager(kMinPollers, kMaxPollers), + num_do_work_(0), + num_poll_for_work_(0), + num_work_found_(0) {} + + grpc::ThreadManager::WorkStatus PollForWork(void **tag, + bool *ok) GRPC_OVERRIDE; + void DoWork(void *tag, bool ok) GRPC_OVERRIDE; + void PerformTest(); + + private: + void SleepForMs(int sleep_time_ms); + + static const int kMinPollers = 2; + static const int kMaxPollers = 10; + + static const int kPollingTimeoutMsec = 10; + static const int kDoWorkDurationMsec = 1; + + // PollForWork will return SHUTDOWN after these many number of invocations + static const int kMaxNumPollForWork = 50; + + gpr_atm num_do_work_; // Number of calls to DoWork + gpr_atm num_poll_for_work_; // Number of calls to PollForWork + gpr_atm num_work_found_; // Number of times WORK_FOUND was returned +}; + +void ThreadManagerTest::SleepForMs(int duration_ms) { + gpr_timespec sleep_time = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(duration_ms, GPR_TIMESPAN)); + gpr_sleep_until(sleep_time); +} + +grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void **tag, + bool *ok) { + int call_num = gpr_atm_no_barrier_fetch_add(&num_poll_for_work_, 1); + + if (call_num >= kMaxNumPollForWork) { + Shutdown(); + return SHUTDOWN; + } + + // Simulate "polling for work" by sleeping for sometime + SleepForMs(kPollingTimeoutMsec); + + *tag = nullptr; + *ok = true; + + // Return timeout roughly 1 out of every 3 calls + if (call_num % 3 == 0) { + return TIMEOUT; + } else { + gpr_atm_no_barrier_fetch_add(&num_work_found_, 1); + return WORK_FOUND; + } +} + +void ThreadManagerTest::DoWork(void *tag, bool ok) { + gpr_atm_no_barrier_fetch_add(&num_do_work_, 1); + SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping +} + +void ThreadManagerTest::PerformTest() { + // Initialize() starts the ThreadManager + Initialize(); + + // Wait for all the threads to gracefully terminate + Wait(); + + // The number of times DoWork() was called is equal to the number of times + // WORK_FOUND was returned + gpr_log(GPR_DEBUG, "DoWork() called %ld times", + gpr_atm_no_barrier_load(&num_do_work_)); + GPR_ASSERT(gpr_atm_no_barrier_load(&num_do_work_) == + gpr_atm_no_barrier_load(&num_work_found_)); +} +} // namespace grpc + +int main(int argc, char **argv) { + std::srand(std::time(NULL)); + + grpc::testing::InitTest(&argc, &argv, true); + grpc::ThreadManagerTest test_rpc_manager; + test_rpc_manager.PerformTest(); + + return 0; +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 252bdb7ed1..6c2b475ed0 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -864,6 +864,7 @@ src/cpp/client/create_channel_internal.h \ src/cpp/common/channel_filter.h \ src/cpp/server/dynamic_thread_pool.h \ src/cpp/server/thread_pool_interface.h \ +src/cpp/thread_manager/thread_manager.h \ src/cpp/client/insecure_credentials.cc \ src/cpp/client/secure_credentials.cc \ src/cpp/common/auth_property_iterator.cc \ @@ -893,6 +894,7 @@ src/cpp/server/server_cc.cc \ src/cpp/server/server_context.cc \ src/cpp/server/server_credentials.cc \ src/cpp/server/server_posix.cc \ +src/cpp/thread_manager/thread_manager.cc \ src/cpp/util/byte_buffer_cc.cc \ src/cpp/util/slice_cc.cc \ src/cpp/util/status.cc \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index a137848c97..87ac8add3b 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -3203,6 +3203,23 @@ { "deps": [ "gpr", + "grpc", + "grpc++", + "grpc++_test_config" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "thread_manager_test", + "src": [ + "test/cpp/thread_manager/thread_manager_test.cc" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ + "gpr", "gpr_test_util", "grpc", "grpc++", @@ -7451,7 +7468,8 @@ "src/cpp/client/create_channel_internal.h", "src/cpp/common/channel_filter.h", "src/cpp/server/dynamic_thread_pool.h", - "src/cpp/server/thread_pool_interface.h" + "src/cpp/server/thread_pool_interface.h", + "src/cpp/thread_manager/thread_manager.h" ], "is_filegroup": true, "language": "c++", @@ -7530,6 +7548,8 @@ "src/cpp/server/server_credentials.cc", "src/cpp/server/server_posix.cc", "src/cpp/server/thread_pool_interface.h", + "src/cpp/thread_manager/thread_manager.cc", + "src/cpp/thread_manager/thread_manager.h", "src/cpp/util/byte_buffer_cc.cc", "src/cpp/util/slice_cc.cc", "src/cpp/util/status.cc", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 09cf89d5e7..733bfe3b8f 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -3001,6 +3001,27 @@ "posix", "windows" ], + "cpu_cost": 1.0, + "exclude_configs": [], + "flaky": false, + "gtest": false, + "language": "c++", + "name": "thread_manager_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], "cpu_cost": 100, "exclude_configs": [], "exclude_iomgrs": [], diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj index 43c5281a02..bf9c3a5c9d 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj @@ -364,6 +364,7 @@ <ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" /> + <ClInclude Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="$(SolutionDir)\..\src\cpp\client\insecure_credentials.cc"> @@ -424,6 +425,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\server\server_posix.cc"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.cc"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\util\byte_buffer_cc.cc"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\util\slice_cc.cc"> diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters index 6ad212a125..b88a78ad92 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters @@ -88,6 +88,9 @@ <ClCompile Include="$(SolutionDir)\..\src\cpp\server\server_posix.cc"> <Filter>src\cpp\server</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.cc"> + <Filter>src\cpp\thread_manager</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\util\byte_buffer_cc.cc"> <Filter>src\cpp\util</Filter> </ClCompile> @@ -422,6 +425,9 @@ <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h"> <Filter>src\cpp\server</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.h"> + <Filter>src\cpp\thread_manager</Filter> + </ClInclude> </ItemGroup> <ItemGroup> @@ -476,6 +482,9 @@ <Filter Include="src\cpp\server"> <UniqueIdentifier>{321b0980-74ad-e8ca-f23b-deffa5d6bb8f}</UniqueIdentifier> </Filter> + <Filter Include="src\cpp\thread_manager"> + <UniqueIdentifier>{23f9df56-8604-52a0-e6a2-f01b8e68d0e7}</UniqueIdentifier> + </Filter> <Filter Include="src\cpp\util"> <UniqueIdentifier>{f842537a-2bf1-1ec3-b495-7d62c64a1c06}</UniqueIdentifier> </Filter> diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj index 9e6f2c0d0f..5d0759790c 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -360,6 +360,7 @@ <ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" /> + <ClInclude Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="$(SolutionDir)\..\src\cpp\client\insecure_credentials.cc"> @@ -410,6 +411,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\server\server_posix.cc"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.cc"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\util\byte_buffer_cc.cc"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\util\slice_cc.cc"> diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index c73be4e63f..bdb7134081 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -73,6 +73,9 @@ <ClCompile Include="$(SolutionDir)\..\src\cpp\server\server_posix.cc"> <Filter>src\cpp\server</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.cc"> + <Filter>src\cpp\thread_manager</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\util\byte_buffer_cc.cc"> <Filter>src\cpp\util</Filter> </ClCompile> @@ -395,6 +398,9 @@ <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h"> <Filter>src\cpp\server</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.h"> + <Filter>src\cpp\thread_manager</Filter> + </ClInclude> </ItemGroup> <ItemGroup> @@ -449,6 +455,9 @@ <Filter Include="src\cpp\server"> <UniqueIdentifier>{8a54a279-d14b-4237-0df3-1ffe1ef5a7af}</UniqueIdentifier> </Filter> + <Filter Include="src\cpp\thread_manager"> + <UniqueIdentifier>{e5b55f25-d99f-b8e5-9981-7da7fa7ba628}</UniqueIdentifier> + </Filter> <Filter Include="src\cpp\util"> <UniqueIdentifier>{fb5d9a64-20ca-5119-ed38-04a3cf94923d}</UniqueIdentifier> </Filter> diff --git a/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj b/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj new file mode 100644 index 0000000000..2c35a03a02 --- /dev/null +++ b/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj @@ -0,0 +1,201 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" /> + <ItemGroup Label="ProjectConfigurations"> + <ProjectConfiguration Include="Debug|Win32"> + <Configuration>Debug</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Debug|x64"> + <Configuration>Debug</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|Win32"> + <Configuration>Release</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|x64"> + <Configuration>Release</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + </ItemGroup> + <PropertyGroup Label="Globals"> + <ProjectGuid>{08C611E4-7F87-73BE-76CE-C158A4CC05A3}</ProjectGuid> + <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected> + <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration"> + <PlatformToolset>v100</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration"> + <PlatformToolset>v110</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration"> + <PlatformToolset>v120</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration"> + <PlatformToolset>v140</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>true</UseDebugLibraries> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>false</UseDebugLibraries> + <WholeProgramOptimization>true</WholeProgramOptimization> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> + <ImportGroup Label="ExtensionSettings"> + </ImportGroup> + <ImportGroup Label="PropertySheets"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + <Import Project="$(SolutionDir)\..\vsprojects\cpptest.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\global.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\protobuf.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" /> + </ImportGroup> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup Condition="'$(Configuration)'=='Debug'"> + <TargetName>thread_manager_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib> + <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> + <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Release'"> + <TargetName>thread_manager_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib> + <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> + <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl> + </PropertyGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>Disabled</Optimization> + <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>Disabled</Optimization> + <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>MaxSpeed</Optimization> + <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>MaxSpeed</Optimization> + <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + </Link> + </ItemDefinitionGroup> + + <ItemGroup> + <ClCompile Include="$(SolutionDir)\..\test\cpp\thread_manager\thread_manager_test.cc"> + </ClCompile> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++\grpc++.vcxproj"> + <Project>{C187A093-A0FE-489D-A40A-6E33DE0F9FEB}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj"> + <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj"> + <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++_test_config\grpc++_test_config.vcxproj"> + <Project>{3F7D093D-11F9-C4BC-BEB7-18EB28E3F290}</Project> + </ProjectReference> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> + <ImportGroup Label="ExtensionTargets"> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + </ImportGroup> + <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild"> + <PropertyGroup> + <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText> + </PropertyGroup> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" /> + </Target> +</Project> + diff --git a/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj.filters b/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj.filters new file mode 100644 index 0000000000..e1741f8316 --- /dev/null +++ b/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj.filters @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup> + <ClCompile Include="$(SolutionDir)\..\test\cpp\thread_manager\thread_manager_test.cc"> + <Filter>test\cpp\thread_manager</Filter> + </ClCompile> + </ItemGroup> + + <ItemGroup> + <Filter Include="test"> + <UniqueIdentifier>{e9e471cd-7f7e-9abc-af13-ec58851849ac}</UniqueIdentifier> + </Filter> + <Filter Include="test\cpp"> + <UniqueIdentifier>{b350f72c-af76-7272-4342-1b0fc7a458ee}</UniqueIdentifier> + </Filter> + <Filter Include="test\cpp\thread_manager"> + <UniqueIdentifier>{6b09ea8d-fbc6-e6fe-f884-b3d3dfcbfc12}</UniqueIdentifier> + </Filter> + </ItemGroup> +</Project> + |