diff options
23 files changed, 1688 insertions, 284 deletions
@@ -1270,6 +1270,7 @@ cc_library( "src/cpp/server/secure_server_credentials.h", "src/cpp/client/create_channel_internal.h", "src/cpp/common/channel_filter.h", + "src/cpp/rpcmanager/grpc_rpc_manager.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h", "src/cpp/client/insecure_credentials.cc", @@ -1292,6 +1293,7 @@ cc_library( "src/cpp/common/completion_queue_cc.cc", "src/cpp/common/core_codegen.cc", "src/cpp/common/rpc_method.cc", + "src/cpp/rpcmanager/grpc_rpc_manager.cc", "src/cpp/server/async_generic_service.cc", "src/cpp/server/create_default_thread_pool.cc", "src/cpp/server/dynamic_thread_pool.cc", @@ -1497,6 +1499,7 @@ cc_library( srcs = [ "src/cpp/client/create_channel_internal.h", "src/cpp/common/channel_filter.h", + "src/cpp/rpcmanager/grpc_rpc_manager.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h", "src/cpp/client/insecure_credentials.cc", @@ -1514,6 +1517,7 @@ cc_library( "src/cpp/common/completion_queue_cc.cc", "src/cpp/common/core_codegen.cc", "src/cpp/common/rpc_method.cc", + "src/cpp/rpcmanager/grpc_rpc_manager.cc", "src/cpp/server/async_generic_service.cc", "src/cpp/server/create_default_thread_pool.cc", "src/cpp/server/dynamic_thread_pool.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 97cedbe449..a6e89cf619 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1025,6 +1025,7 @@ add_library(grpc++ src/cpp/common/completion_queue_cc.cc src/cpp/common/core_codegen.cc src/cpp/common/rpc_method.cc + src/cpp/rpcmanager/grpc_rpc_manager.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/dynamic_thread_pool.cc @@ -1279,6 +1280,7 @@ add_library(grpc++_unsecure src/cpp/common/completion_queue_cc.cc src/cpp/common/core_codegen.cc src/cpp/common/rpc_method.cc + src/cpp/rpcmanager/grpc_rpc_manager.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/dynamic_thread_pool.cc @@ -1048,6 +1048,7 @@ grpc_csharp_plugin: $(BINDIR)/$(CONFIG)/grpc_csharp_plugin grpc_node_plugin: $(BINDIR)/$(CONFIG)/grpc_node_plugin grpc_objective_c_plugin: $(BINDIR)/$(CONFIG)/grpc_objective_c_plugin grpc_python_plugin: $(BINDIR)/$(CONFIG)/grpc_python_plugin +grpc_rpc_manager_test: $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test grpc_ruby_plugin: $(BINDIR)/$(CONFIG)/grpc_ruby_plugin grpc_tool_test: $(BINDIR)/$(CONFIG)/grpc_tool_test grpclb_api_test: $(BINDIR)/$(CONFIG)/grpclb_api_test @@ -1416,6 +1417,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/generic_end2end_test \ $(BINDIR)/$(CONFIG)/golden_file_test \ $(BINDIR)/$(CONFIG)/grpc_cli \ + $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test \ $(BINDIR)/$(CONFIG)/grpc_tool_test \ $(BINDIR)/$(CONFIG)/grpclb_api_test \ $(BINDIR)/$(CONFIG)/grpclb_test \ @@ -1503,6 +1505,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/generic_end2end_test \ $(BINDIR)/$(CONFIG)/golden_file_test \ $(BINDIR)/$(CONFIG)/grpc_cli \ + $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test \ $(BINDIR)/$(CONFIG)/grpc_tool_test \ $(BINDIR)/$(CONFIG)/grpclb_api_test \ $(BINDIR)/$(CONFIG)/grpclb_test \ @@ -1801,6 +1804,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/generic_end2end_test || ( echo test generic_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing golden_file_test" $(Q) $(BINDIR)/$(CONFIG)/golden_file_test || ( echo test golden_file_test failed ; exit 1 ) + $(E) "[RUN] Testing grpc_rpc_manager_test" + $(Q) $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test || ( echo test grpc_rpc_manager_test failed ; exit 1 ) $(E) "[RUN] Testing grpc_tool_test" $(Q) $(BINDIR)/$(CONFIG)/grpc_tool_test || ( echo test grpc_tool_test failed ; exit 1 ) $(E) "[RUN] Testing grpclb_api_test" @@ -3591,6 +3596,7 @@ LIBGRPC++_SRC = \ src/cpp/common/completion_queue_cc.cc \ src/cpp/common/core_codegen.cc \ src/cpp/common/rpc_method.cc \ + src/cpp/rpcmanager/grpc_rpc_manager.cc \ src/cpp/server/async_generic_service.cc \ src/cpp/server/create_default_thread_pool.cc \ src/cpp/server/dynamic_thread_pool.cc \ @@ -4121,6 +4127,7 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/common/completion_queue_cc.cc \ src/cpp/common/core_codegen.cc \ src/cpp/common/rpc_method.cc \ + src/cpp/rpcmanager/grpc_rpc_manager.cc \ src/cpp/server/async_generic_service.cc \ src/cpp/server/create_default_thread_pool.cc \ src/cpp/server/dynamic_thread_pool.cc \ @@ -11727,6 +11734,49 @@ ifneq ($(NO_DEPS),true) endif +GRPC_RPC_MANAGER_TEST_SRC = \ + test/cpp/rpcmanager/grpc_rpc_manager_test.cc \ + +GRPC_RPC_MANAGER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_RPC_MANAGER_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: $(PROTOBUF_DEP) $(GRPC_RPC_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(GRPC_RPC_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/rpcmanager/grpc_rpc_manager_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a + +deps_grpc_rpc_manager_test: $(GRPC_RPC_MANAGER_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(GRPC_RPC_MANAGER_TEST_OBJS:.o=.dep) +endif +endif + + GRPC_RUBY_PLUGIN_SRC = \ src/compiler/ruby_plugin.cc \ diff --git a/build.yaml b/build.yaml index 4746cc1a48..87225684fa 100644 --- a/build.yaml +++ b/build.yaml @@ -711,6 +711,7 @@ filegroups: headers: - src/cpp/client/create_channel_internal.h - src/cpp/common/channel_filter.h + - src/cpp/rpcmanager/grpc_rpc_manager.h - src/cpp/server/dynamic_thread_pool.h - src/cpp/server/thread_pool_interface.h src: @@ -726,6 +727,7 @@ filegroups: - src/cpp/common/completion_queue_cc.cc - src/cpp/common/core_codegen.cc - src/cpp/common/rpc_method.cc + - src/cpp/rpcmanager/grpc_rpc_manager.cc - src/cpp/server/async_generic_service.cc - src/cpp/server/create_default_thread_pool.cc - src/cpp/server/dynamic_thread_pool.cc @@ -2869,6 +2871,18 @@ targets: secure: false vs_config_type: Application vs_project_guid: '{DF52D501-A6CF-4E6F-BA38-6EBE2E8DAFB2}' +- name: grpc_rpc_manager_test + build: test + language: c++ + headers: + - test/cpp/rpcmanager/grpc_rpc_manager_test.h + src: + - test/cpp/rpcmanager/grpc_rpc_manager_test.cc + deps: + - grpc++ + - grpc + - gpr + - grpc++_test_config - name: grpc_ruby_plugin build: protoc language: c++ diff --git a/include/grpc++/server.h b/include/grpc++/server.h index f51a6c658f..bae83eee3f 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -50,6 +50,8 @@ #include <grpc++/support/status.h> #include <grpc/compression.h> +#include "src/cpp/rpcmanager/grpc_rpc_manager.h" + struct grpc_server; namespace grpc { @@ -105,18 +107,41 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { class AsyncRequest; class ShutdownRequest; + /// SyncRequestManager is an implementation of GrpcRpcManager. This class is + /// responsible for polling for incoming RPCs and calling the RPC handlers. + /// This is only used in case of a Sync server (i.e a server exposing a sync + /// interface) + class SyncRequestManager; + class UnimplementedAsyncRequestContext; class UnimplementedAsyncRequest; class UnimplementedAsyncResponse; /// Server constructors. To be used by \a ServerBuilder only. /// - /// \param thread_pool The threadpool instance to use for call processing. - /// \param thread_pool_owned Does the server own the \a thread_pool instance? - /// \param max_receive_message_size Maximum message length that the channel - /// can receive. - Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_receive_message_size, ChannelArguments* args); + /// \param sync_server_cqs The completion queues to use if the server is a + /// synchronous server (or a hybrid server). The server polls for new RPCs on + /// these queues + /// + /// \param max_message_size Maximum message length that the channel can + /// receive. + /// + /// \param args The channel args + /// + /// \param min_pollers The minimum number of polling threads per server + /// completion queue (in param sync_server_cqs) to use for listening to + /// incoming requests (used only in case of sync server) + /// + /// \param max_pollers The maximum number of polling threads per server + /// completion queue (in param sync_server_cqs) to use for listening to + /// incoming requests (used only in case of sync server) + /// + /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on + /// server completion queues passed via sync_server_cqs param. + Server(std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs, + int max_message_size, ChannelArguments* args, int min_pollers, + int max_pollers, int sync_cq_timeout_msec); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -171,34 +196,36 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { const int max_receive_message_size_; - // Completion queue. - CompletionQueue cq_; + /// The following completion queues are ONLY used in case of Sync API i.e if + /// the server has any services with sync methods. The server uses these + /// completion queues to poll for new RPCs + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs_; + + /// List of GrpcRpcManager instances (one for each cq in the sync_server_cqs) + std::vector<std::unique_ptr<SyncRequestManager>> sync_req_mgrs_; // Sever status grpc::mutex mu_; bool started_; bool shutdown_; bool shutdown_notified_; + + // TODO (sreek) : Remove num_running_cb_ and callback_cv_; // The number of threads which are running callbacks. - int num_running_cb_; - grpc::condition_variable callback_cv_; + // int num_running_cb_; + // grpc::condition_variable callback_cv_; grpc::condition_variable shutdown_cv_; std::shared_ptr<GlobalCallbacks> global_callbacks_; - std::list<SyncRequest>* sync_methods_; std::vector<grpc::string> services_; - std::unique_ptr<RpcServiceMethod> unknown_method_; bool has_generic_service_; - // Pointer to the c grpc server. + // Pointer to the c core's grpc server. grpc_server* server_; - ThreadPoolInterface* thread_pool_; - // Whether the thread pool is created and owned by the server. - bool thread_pool_owned_; - std::unique_ptr<ServerInitializer> server_initializer_; }; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 37f1f8cb80..8fac168ff7 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -62,6 +62,22 @@ class ServerBuilder { public: ServerBuilder(); + struct SyncServerSettings { + // Number of server completion queues to create to listen to incoming RPCs. + int num_cqs; + + // Minimum number of threads per completion queue that should be listening + // to incoming RPCs. + int min_pollers; + + // Maximum number of threads per completion queue that can be listening to + // incoming RPCs. + int max_pollers; + + // The timeout for server completion queue's AsyncNext call. + int cq_timeout_msec; + }; + /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the \a Server instance returned /// by \a BuildAndStart(). @@ -115,6 +131,9 @@ class ServerBuilder { ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option); + /// Note: Only useful if this is a Synchronous server. + void SetSyncServerSettings(SyncServerSettings settings); + /// Tries to bind \a server to the given \a addr. /// /// It can be invoked multiple times. @@ -170,6 +189,14 @@ class ServerBuilder { int* selected_port; }; + // Sync server settings. If this is not set via SetSyncServerSettings(), the + // following default values are used: + // sync_server_settings_.num_cqs = Number of CPUs + // sync_server_settings_.min_pollers = 1 + // sync_server_settings_.max_pollers = INT_MAX + // sync_server_settings_.cq_timeout_msec = 1000 + struct SyncServerSettings sync_server_settings_; + typedef std::unique_ptr<grpc::string> HostString; struct NamedService { explicit NamedService(Service* s) : service(s) {} @@ -184,7 +211,10 @@ class ServerBuilder { std::vector<std::unique_ptr<ServerBuilderOption>> options_; std::vector<std::unique_ptr<NamedService>> services_; std::vector<Port> ports_; + + /* List of completion queues added via AddCompletionQueue() method */ std::vector<ServerCompletionQueue*> cqs_; + std::shared_ptr<ServerCredentials> creds_; std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_; AsyncGenericService* generic_service_; diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc new file mode 100644 index 0000000000..2299dbdcd3 --- /dev/null +++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc @@ -0,0 +1,188 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc/support/log.h> +#include <climits> + +#include "src/cpp/rpcmanager/grpc_rpc_manager.h" + +namespace grpc { + +GrpcRpcManager::GrpcRpcManagerThread::GrpcRpcManagerThread( + GrpcRpcManager* rpc_mgr) + : rpc_mgr_(rpc_mgr), + thd_(new std::thread(&GrpcRpcManager::GrpcRpcManagerThread::Run, this)) {} + +void GrpcRpcManager::GrpcRpcManagerThread::Run() { + rpc_mgr_->MainWorkLoop(); + rpc_mgr_->MarkAsCompleted(this); +} + +GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() { + thd_->join(); + thd_.reset(); +} + +GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers) + : shutdown_(false), + num_pollers_(0), + min_pollers_(min_pollers), + max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), + num_threads_(0) {} + +GrpcRpcManager::~GrpcRpcManager() { + { + std::unique_lock<grpc::mutex> lock(mu_); + GPR_ASSERT(num_threads_ == 0); + } + + CleanupCompletedThreads(); +} + +void GrpcRpcManager::Wait() { + std::unique_lock<grpc::mutex> lock(mu_); + while (num_threads_ != 0) { + shutdown_cv_.wait(lock); + } +} + +void GrpcRpcManager::ShutdownRpcManager() { + std::unique_lock<grpc::mutex> lock(mu_); + shutdown_ = true; +} + +bool GrpcRpcManager::IsShutdown() { + std::unique_lock<grpc::mutex> lock(mu_); + return shutdown_; +} + +void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) { + { + std::unique_lock<grpc::mutex> list_lock(list_mu_); + completed_threads_.push_back(thd); + } + + grpc::unique_lock<grpc::mutex> lock(mu_); + num_threads_--; + if (num_threads_ == 0) { + shutdown_cv_.notify_one(); + } +} + +void GrpcRpcManager::CleanupCompletedThreads() { + std::unique_lock<grpc::mutex> lock(list_mu_); + for (auto thd = completed_threads_.begin(); thd != completed_threads_.end(); + thd = completed_threads_.erase(thd)) { + delete *thd; + } +} + +void GrpcRpcManager::Initialize() { + for (int i = 0; i < min_pollers_; i++) { + MaybeCreatePoller(); + } +} + +// If the number of pollers (i.e threads currently blocked in PollForWork()) is +// less than max threshold (i.e max_pollers_) and the total number of threads is +// below the maximum threshold, we can let the current thread continue as poller +bool GrpcRpcManager::MaybeContinueAsPoller() { + std::unique_lock<grpc::mutex> lock(mu_); + + if (shutdown_ || num_pollers_ > max_pollers_) { + return false; + } + + num_pollers_++; + return true; +} + +// Create a new poller if the current number of pollers i.e num_pollers_ (i.e +// threads currently blocked in PollForWork()) is below the threshold (i.e +// min_pollers_) and the total number of threads is below the maximum threshold +void GrpcRpcManager::MaybeCreatePoller() { + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_ && num_pollers_ < min_pollers_) { + num_pollers_++; + num_threads_++; + + // Create a new thread (which ends up calling the MainWorkLoop() function + new GrpcRpcManagerThread(this); + } +} + +void GrpcRpcManager::MainWorkLoop() { + void* tag; + bool ok; + + /* + 1. Poll for work (i.e PollForWork()) + 2. After returning from PollForWork, reduce the number of pollers by 1. If + PollForWork() returned a TIMEOUT, then it may indicate that we have more + polling threads than needed. Check if the number of pollers is greater + than min_pollers and if so, terminate the thread. + 3. Since we are short of one poller now, see if a new poller has to be + created (i.e see MaybeCreatePoller() for more details) + 4. Do the actual work (DoWork()) + 5. After doing the work, see it this thread can resume polling work (i.e + see MaybeContinueAsPoller() for more details) */ + do { + WorkStatus work_status = PollForWork(&tag, &ok); + + { + grpc::unique_lock<grpc::mutex> lock(mu_); + num_pollers_--; + + if (work_status == TIMEOUT && num_pollers_ > min_pollers_) { + break; + } + } + + // TODO (sreek) See if we need to check for shutdown here and quit + // Note that MaybeCreatePoller does check for shutdown and creates a new + // thread only if GrpcRpcManager is not shutdown + if (work_status == WORK_FOUND) { + MaybeCreatePoller(); + DoWork(tag, ok); + } + } while (MaybeContinueAsPoller()); + + CleanupCompletedThreads(); + + // If we are here, either GrpcRpcManager is shutting down or it already has + // enough threads. +} + +} // namespace grpc diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h new file mode 100644 index 0000000000..d00771b9a1 --- /dev/null +++ b/src/cpp/rpcmanager/grpc_rpc_manager.h @@ -0,0 +1,157 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H +#define GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H + +#include <list> +#include <memory> + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> + +namespace grpc { + +class GrpcRpcManager { + public: + explicit GrpcRpcManager(int min_pollers, int max_pollers); + virtual ~GrpcRpcManager(); + + // This function MUST be called before using the object + void Initialize(); + + enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT }; + + // "Polls" for new work. + // If the return value is WORK_FOUND: + // - The implementaion of PollForWork() MAY set some opaque identifier to + // (identify the work item found) via the '*tag' parameter + // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A + // value of 'false' indicates some implemenation specific error (that is + // neither SHUTDOWN nor TIMEOUT) + // - GrpcRpcManager does not interpret the values of 'tag' and 'ok' + // - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to + // DoWork() + // + // If the return value is SHUTDOWN:, + // - GrpcManager WILL NOT call DoWork() and terminates the thead + // + // If the return value is TIMEOUT:, + // - GrpcManager WILL NOT call DoWork() + // - GrpcManager MAY terminate the thread depending on the current number of + // active poller threads and mix_pollers/max_pollers settings + // - Also, the value of timeout is specific to the derived class + // implementation + virtual WorkStatus PollForWork(void** tag, bool* ok) = 0; + + // The implementation of DoWork() is supposed to perform the work found by + // PollForWork(). The tag and ok parameters are the same as returned by + // PollForWork() + // + // The implementation of DoWork() should also do any setup needed to ensure + // that the next call to PollForWork() (not necessarily by the current thread) + // actually finds some work + virtual void DoWork(void* tag, bool ok) = 0; + + // Mark the GrpcRpcManager as shutdown and begin draining the work. + // This is a non-blocking call and the caller should call Wait(), a blocking + // call which returns only once the shutdown is complete + void ShutdownRpcManager(); + + // Has ShutdownRpcManager() been called + bool IsShutdown(); + + // A blocking call that returns only after the GrpcRpcManager has shutdown and + // all the threads have drained all the outstanding work + void Wait(); + + private: + // Helper wrapper class around std::thread. This takes a GrpcRpcManager object + // and starts a new std::thread to calls the Run() function. + // + // The Run() function calls GrpcManager::MainWorkLoop() function and once that + // completes, it marks the GrpcRpcManagerThread completed by calling + // GrpcRpcManager::MarkAsCompleted() + class GrpcRpcManagerThread { + public: + GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr); + ~GrpcRpcManagerThread(); + + private: + // Calls rpc_mgr_->MainWorkLoop() and once that completes, calls + // rpc_mgr_>MarkAsCompleted(this) to mark the thread as completed + void Run(); + + GrpcRpcManager* rpc_mgr_; + std::unique_ptr<grpc::thread> thd_; + }; + + // The main funtion in GrpcRpcManager + void MainWorkLoop(); + + // Create a new poller if the number of current pollers is less than the + // minimum number of pollers needed (i.e min_pollers). + void MaybeCreatePoller(); + + // Returns true if the current thread can resume as a poller. i.e if the + // current number of pollers is less than the max_pollers. + bool MaybeContinueAsPoller(); + + void MarkAsCompleted(GrpcRpcManagerThread* thd); + void CleanupCompletedThreads(); + + // Protects shutdown_, num_pollers_ and num_threads_ + // TODO: sreek - Change num_pollers and num_threads_ to atomics + grpc::mutex mu_; + + bool shutdown_; + grpc::condition_variable shutdown_cv_; + + // Number of threads doing polling + int num_pollers_; + + // The minimum and maximum number of threads that should be doing polling + int min_pollers_; + int max_pollers_; + + // The total number of threads (includes threads includes the threads that are + // currently polling i.e num_pollers_) + int num_threads_; + + grpc::mutex list_mu_; + std::list<GrpcRpcManagerThread*> completed_threads_; +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 2980b16c56..59c40dedaf 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -35,6 +35,7 @@ #include <grpc++/impl/service_type.h> #include <grpc++/server.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -61,6 +62,7 @@ ServerBuilder::ServerBuilder() auto& factory = *it; plugins_.emplace_back(factory()); } + // all compression algorithms enabled by default. enabled_compression_algorithms_bitset_ = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; @@ -68,6 +70,16 @@ ServerBuilder::ServerBuilder() sizeof(maybe_default_compression_level_)); memset(&maybe_default_compression_algorithm_, 0, sizeof(maybe_default_compression_algorithm_)); + + // Sync server setting defaults + sync_server_settings_.min_pollers = 1; + sync_server_settings_.max_pollers = INT_MAX; + + int num_cpus = gpr_cpu_num_cores(); + num_cpus = GPR_MAX(num_cpus, 4); + sync_server_settings_.num_cqs = num_cpus; + + sync_server_settings_.cq_timeout_msec = 1000; } std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( @@ -94,7 +106,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", - service); + (void*)service); } else { generic_service_ = service; } @@ -130,6 +142,10 @@ ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm( return *this; } +void ServerBuilder::SetSyncServerSettings(SyncServerSettings settings) { + sync_server_settings_ = settings; // copy the settings +} + ServerBuilder& ServerBuilder::AddListeningPort( const grpc::string& addr, std::shared_ptr<ServerCredentials> creds, int* selected_port) { @@ -139,35 +155,24 @@ ServerBuilder& ServerBuilder::AddListeningPort( } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { - std::unique_ptr<ThreadPoolInterface> thread_pool; - bool has_sync_methods = false; - for (auto it = services_.begin(); it != services_.end(); ++it) { - if ((*it)->service->has_synchronous_methods()) { - if (!thread_pool) { - thread_pool.reset(CreateDefaultThreadPool()); - has_sync_methods = true; - break; - } - } - } ChannelArguments args; for (auto option = options_.begin(); option != options_.end(); ++option) { (*option)->UpdateArguments(&args); (*option)->UpdatePlugins(&plugins_); } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { - if (!thread_pool && (*plugin)->has_sync_methods()) { - thread_pool.reset(CreateDefaultThreadPool()); - has_sync_methods = true; - } (*plugin)->UpdateChannelArguments(&args); } + if (max_receive_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_); } + if (max_send_message_size_ >= 0) { args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_); } + args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, enabled_compression_algorithms_bitset_); if (maybe_default_compression_level_.is_set) { @@ -178,27 +183,84 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, maybe_default_compression_algorithm_.algorithm); } - std::unique_ptr<Server> server(new Server(thread_pool.release(), true, - max_receive_message_size_, &args)); + + // == Determine if the server has any syncrhonous methods == + bool has_sync_methods = false; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_synchronous_methods()) { + has_sync_methods = true; + break; + } + } + + if (!has_sync_methods) { + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { + if ((*plugin)->has_sync_methods()) { + has_sync_methods = true; + break; + } + } + } + + // If this is a Sync server, i.e a server expositing sync API, then the server + // needs to create some completion queues to listen for incoming requests. + // 'sync_server_cqs' are those internal completion queues. + // + // This is different from the completion queues added to the server via + // ServerBuilder's AddCompletionQueue() method (those completion queues + // are in 'cqs_' member variable of ServerBuilder object) + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs( + new std::vector<std::unique_ptr<ServerCompletionQueue>>()); + + if (has_sync_methods) { + // This is a Sync server + gpr_log(GPR_INFO, + "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: " + "%d, CQ timeout (msec): %d", + sync_server_settings_.num_cqs, sync_server_settings_.min_pollers, + sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec); + + // Create completion queues to listen to incoming rpc requests + for (int i = 0; i < sync_server_settings_.num_cqs; i++) { + sync_server_cqs->emplace_back(new ServerCompletionQueue()); + } + } + + std::unique_ptr<Server> server(new Server( + sync_server_cqs, max_receive_message_size_, &args, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec)); + ServerInitializer* initializer = server->initializer(); - // If the server has atleast one sync methods, we know that this is a Sync - // server or a Hybrid server and the completion queue (server->cq_) would be - // frequently polled. - int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; - - for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - // A completion queue that is not polled frequently (by calling Next() or - // AsyncNext()) is not safe to use for listening to incoming channels. - // Register all such completion queues as non-listening completion queues - // with the GRPC core library. - if ((*cq)->IsFrequentlyPolled()) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + // Register all the completion queues with the server. i.e + // 1. sync_server_cqs: internal completion queues created IF this is a sync + // server + // 2. cqs_: Completion queues added via AddCompletionQueue() call + + // All sync cqs (if any) are frequently polled by the GrpcRpcManager + int num_frequently_polled_cqs = sync_server_cqs->size(); + + for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), + nullptr); + } + + // cqs_ contains the completion queue added by calling the ServerBuilder's + // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by + // calling Next() or AsyncNext()) and hence are not safe to be used for + // listening to incoming channels. Such completion queues must be registered + // as non-listening queues + for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { + if ((*it)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), nullptr); num_frequently_polled_cqs++; } else { grpc_server_register_non_listening_completion_queue(server->server_, - (*cq)->cq(), nullptr); + (*it)->cq(), nullptr); } } @@ -214,9 +276,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->InitServer(initializer); } + if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); } else { @@ -229,6 +293,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } } } + for (auto port = ports_.begin(); port != ports_.end(); port++) { int r = server->AddListeningPort(port->addr, port->creds.get()); if (!r) return nullptr; @@ -236,13 +301,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { *port->selected_port = r; } } + auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; if (!server->Start(cqs_data, cqs_.size())) { return nullptr; } + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { (*plugin)->Finish(initializer); } + return server; } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 3f89275370..36bc61fdf1 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -118,6 +118,7 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL UnimplementedAsyncRequest* const request_; }; +// TODO (sreek) - This might no longer be needed class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -126,6 +127,11 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { } }; +class ShutdownTag : public CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { return false; } +}; + class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { public: SyncRequest(RpcServiceMethod* method, void* tag) @@ -147,6 +153,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_metadata_array_destroy(&request_metadata_); } + // TODO (Sreek) This function is probably no longer needed static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; *ok = false; @@ -158,6 +165,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } + // TODO (sreek) - This function is probably no longer needed static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, gpr_timespec deadline) { void* tag = nullptr; @@ -177,6 +185,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { GPR_UNREACHABLE_CODE(return false); } + // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest + // functions void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -184,6 +194,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { cq_ = nullptr; } + void ResetRequest() { in_flight_ = false; } + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; @@ -275,53 +287,168 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; +class Server::SyncRequestManager : public GrpcRpcManager { + public: + SyncRequestManager(Server* server, CompletionQueue* server_cq, + std::shared_ptr<GlobalCallbacks> global_callbacks, + int min_pollers, int max_pollers, int cq_timeout_msec) + : GrpcRpcManager(min_pollers, max_pollers), + server_(server), + server_cq_(server_cq), + cq_timeout_msec_(cq_timeout_msec), + global_callbacks_(global_callbacks) {} + + WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE { + *tag = nullptr; + gpr_timespec deadline = + gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN); + + switch (server_cq_->AsyncNext(tag, ok, deadline)) { + case CompletionQueue::TIMEOUT: + return TIMEOUT; + case CompletionQueue::SHUTDOWN: + return SHUTDOWN; + case CompletionQueue::GOT_EVENT: + return WORK_FOUND; + } + + GPR_UNREACHABLE_CODE(return TIMEOUT); + } + + void DoWork(void* tag, bool ok) GRPC_OVERRIDE { + SyncRequest* sync_req = static_cast<SyncRequest*>(tag); + + if (!sync_req) { + // No tag. Nothing to work on + // TODO (sreek) - Log a warning here since this is an unlikely case + return; + } + + if (ok) { + SyncRequest::CallData cd(server_, sync_req); + { + sync_req->SetupRequest(); + if (!IsShutdown()) { + sync_req->Request(server_->c_server(), server_cq_->cq()); + } else { + sync_req->TeardownRequest(); + } + } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); + } else { + sync_req->ResetRequest(); + // ok is false. For some reason, the tag was returned but event was not + // successful. In this case, request again unless we are shutting down + if (!IsShutdown()) { + // TODO (sreek) Remove this + // sync_req->Request(server_->c_server(), server_cq_->cq()); + } + } + } + + void AddSyncMethod(RpcServiceMethod* method, void* tag) { + sync_methods_.emplace_back(method, tag); + } + + void AddUnknownSyncMethod() { + // TODO (sreek) - Check if !sync_methods_.empty() is really needed here + if (!sync_methods_.empty()) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted + // here by gcc-4.4 because it can't match the anonymous nullptr with a + // proper constructor implicitly. Construct the object and use push_back. + sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr)); + } + } + + void ShutdownAndDrainCompletionQueue() { + server_cq_->Shutdown(); + + // Drain any pending items from the queue + void* tag; + bool ok; + while (server_cq_->Next(&tag, &ok)) { + // Nothing to be done here + } + } + + void Start() { + if (!sync_methods_.empty()) { + for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) { + m->SetupRequest(); + m->Request(server_->c_server(), server_cq_->cq()); + } + + GrpcRpcManager::Initialize(); + } + } + + private: + Server* server_; + CompletionQueue* server_cq_; + int cq_timeout_msec_; + std::vector<SyncRequest> sync_methods_; + std::unique_ptr<RpcServiceMethod> unknown_method_; + std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; +}; + static internal::GrpcLibraryInitializer g_gli_initializer; -Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_receive_message_size, ChannelArguments* args) +Server::Server( + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs, + int max_receive_message_size, ChannelArguments* args, int min_pollers, + int max_pollers, int sync_cq_timeout_msec) : max_receive_message_size_(max_receive_message_size), + sync_server_cqs_(sync_server_cqs), started_(false), shutdown_(false), shutdown_notified_(false), - num_running_cb_(0), - sync_methods_(new std::list<SyncRequest>), has_generic_service_(false), server_(nullptr), - thread_pool_(thread_pool), - thread_pool_owned_(thread_pool_owned), server_initializer_(new ServerInitializer(this)) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; global_callbacks_->UpdateArguments(args); + + for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); + it++) { + sync_req_mgrs_.emplace_back( + new SyncRequestManager(this, (*it).get(), global_callbacks_, + min_pollers, max_pollers, sync_cq_timeout_msec)); + } + grpc_channel_args channel_args; args->SetChannelArgs(&channel_args); + server_ = grpc_server_create(&channel_args, nullptr); - if (thread_pool_ == nullptr) { - grpc_server_register_non_listening_completion_queue(server_, cq_.cq(), - nullptr); - } else { - grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); - } } Server::~Server() { { + // TODO (sreek) Check if we can just call Shutdown() even in case where + // started_ == false. This will make things much simpler grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { lock.unlock(); Shutdown(); } else if (!started_) { - cq_.Shutdown(); + // Shutdown the completion queues + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->ShutdownAndDrainCompletionQueue(); + } } } + + // TODO(sreek) Do thisfor all cqs ? + /* void* got_tag; bool ok; GPR_ASSERT(!cq_.Next(&got_tag, &ok)); + */ grpc_server_destroy(server_); - if (thread_pool_owned_) { - delete thread_pool_; - } - delete sync_methods_; } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { @@ -352,6 +479,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { "Can only register an asynchronous service against one server."); service->server_ = this; } + const char* method_name = nullptr; for (auto it = service->methods_.begin(); it != service->methods_.end(); ++it) { @@ -370,7 +498,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { if (method->handler() == nullptr) { method->set_server_tag(tag); } else { - sync_methods_->emplace_back(method, tag); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->AddSyncMethod(method, tag); + } } method_name = method->name(); } @@ -406,20 +536,23 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { grpc_server_start(server_); if (!has_generic_service_) { - if (!sync_methods_->empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - // Use of emplace_back with just constructor arguments is not accepted - // here by gcc-4.4 because it can't match the anonymous nullptr with a - // proper constructor implicitly. Construct the object and use push_back. - sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->AddUnknownSyncMethod(); } + for (size_t i = 0; i < num_cqs; i++) { if (cqs[i]->IsFrequentlyPolled()) { new UnimplementedAsyncRequest(this, cqs[i]); } } } + + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Start(); + } + + /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */ + /* // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { @@ -427,24 +560,76 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { m->Request(server_, cq_.cq()); } - ScheduleCallback(); + GrpcRpcManager::Initialize(); } + */ return true; } +/* TODO (sreek) check if started_ and shutdown_ are needed anymore */ void Server::ShutdownInternal(gpr_timespec deadline) { grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; + + /// The completion queue to use for server shutdown completion notification + CompletionQueue shutdown_cq; + ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); + + // Shutdown all RpcManagers. This will try to gracefully stop all the + // threads in the RpcManagers (once they process any inflight requests) + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->ShutdownRpcManager(); + } + + shutdown_cq.Shutdown(); + + void* tag; + bool ok; + CompletionQueue::NextStatus status = + shutdown_cq.AsyncNext(&tag, &ok, deadline); + + // If this timed out, it means we are done with the grace period for a clean + // shutdown. We should force a shutdown now by cancelling all inflight calls + if (status == CompletionQueue::NextStatus::TIMEOUT) { + grpc_server_cancel_all_calls(server_); + } + // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has + // successfully shutdown + + // Wait for threads in all RpcManagers to terminate + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Wait(); + (*it)->ShutdownAndDrainCompletionQueue(); + } + + // Drain the shutdown queue (if the previous call to AsyncNext() timed out + // and we didn't remove the tag from the queue yet) + while (shutdown_cq.Next(&tag, &ok)) { + // Nothing to be done here + } + + /* grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); lock.unlock(); + */ + + // TODO (sreek) Delete this + /* + GrpcRpcManager::ShutdownRpcManager(); + GrpcRpcManager::Wait(); + */ + // Spin, eating requests until the completion queue is completely shutdown. // If the deadline expires then cancel anything that's pending and keep // spinning forever until the work is actually drained. // Since nothing else needs to touch state guarded by mu_, holding it // through this loop is fine. + // + /* SyncRequest* request; bool ok; while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { @@ -456,11 +641,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } } lock.lock(); + */ + /* TODO (sreek) - Remove this block */ // Wait for running callbacks to finish. - while (num_running_cb_ != 0) { - callback_cv_.wait(lock); - } + /* + while (num_running_cb_ != 0) { + callback_cv_.wait(lock); + } + */ shutdown_notified_ = true; shutdown_cv_.notify_all(); @@ -585,46 +774,86 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( request_->stream()->call_.PerformOps(this); } +// TODO: sreek - Remove this function void Server::ScheduleCallback() { + GPR_ASSERT(false); + /* { grpc::unique_lock<grpc::mutex> lock(mu_); num_running_cb_++; } thread_pool_->Add(std::bind(&Server::RunRpc, this)); + */ } +// TODO: sreek - Remove this function void Server::RunRpc() { - // Wait for one more incoming rpc. - bool ok; - GPR_TIMER_SCOPE("Server::RunRpc", 0); - auto* mrd = SyncRequest::Wait(&cq_, &ok); - if (mrd) { - ScheduleCallback(); - if (ok) { - SyncRequest::CallData cd(this, mrd); - { - mrd->SetupRequest(); - grpc::unique_lock<grpc::mutex> lock(mu_); - if (!shutdown_) { - mrd->Request(server_, cq_.cq()); - } else { - // destroy the structure that was created - mrd->TeardownRequest(); + GPR_ASSERT(false); + /* + // Wait for one more incoming rpc. + bool ok; + GPR_TIMER_SCOPE("Server::RunRpc", 0); + auto* mrd = SyncRequest::Wait(&cq_, &ok); + if (mrd) { + ScheduleCallback(); + if (ok) { + SyncRequest::CallData cd(this, mrd); + { + mrd->SetupRequest(); + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_) { + mrd->Request(server_, cq_.cq()); + } else { + // destroy the structure that was created + mrd->TeardownRequest(); + } } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); + } + } + + { + grpc::unique_lock<grpc::mutex> lock(mu_); + num_running_cb_--; + if (shutdown_) { + callback_cv_.notify_all(); } - GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); } + */ +} + +/* TODO (sreek) Move this to SyncRequestManager */ +/* +void Server::PollForWork(bool& is_work_found, void** tag) { + is_work_found = true; + *tag = nullptr; + auto* mrd = SyncRequest::Wait(&cq_, &is_work_found); + if (is_work_found) { + *tag = mrd; } +} - { - grpc::unique_lock<grpc::mutex> lock(mu_); - num_running_cb_--; - if (shutdown_) { - callback_cv_.notify_all(); + +void Server::DoWork(void* tag) { + auto* mrd = static_cast<SyncRequest*>(tag); + if (mrd) { + SyncRequest::CallData cd(this, mrd); + { + mrd->SetupRequest(); + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_) { + mrd->Request(server_, cq_.cq()); + } else { + // destroy the structure that was created + mrd->TeardownRequest(); + } } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); } } +*/ ServerInitializer* Server::initializer() { return server_initializer_.get(); } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index b1d3ce92f6..a46f9f268b 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -226,6 +226,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { kMaxMessageSize_(8192), special_service_("special") { GetParam().Log(); + + sync_server_settings_.max_pollers = INT_MAX; + sync_server_settings_.min_pollers = 1; + sync_server_settings_.cq_timeout_msec = 10; + sync_server_settings_.num_cqs = 4; } void TearDown() GRPC_OVERRIDE { @@ -250,6 +255,9 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { builder.SetMaxMessageSize( kMaxMessageSize_); // For testing max message size. builder.RegisterService(&dup_pkg_service_); + + builder.SetSyncServerSettings(sync_server_settings_); + server_ = builder.BuildAndStart(); is_server_started_ = true; } @@ -279,6 +287,8 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { ServerBuilder builder; builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials()); builder.RegisterService(proxy_service_.get()); + builder.SetSyncServerSettings(sync_server_settings_); + proxy_server_ = builder.BuildAndStart(); channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials()); @@ -299,6 +309,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { TestServiceImpl special_service_; TestServiceImplDupPkg dup_pkg_service_; grpc::string user_agent_prefix_; + ServerBuilder::SyncServerSettings sync_server_settings_; }; static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, diff --git a/test/cpp/rpcmanager/grpc_rpc_manager_test.cc b/test/cpp/rpcmanager/grpc_rpc_manager_test.cc new file mode 100644 index 0000000000..ce43b27856 --- /dev/null +++ b/test/cpp/rpcmanager/grpc_rpc_manager_test.cc @@ -0,0 +1,89 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + *is % allowed in string + */ + +#include <chrono> +#include <memory> +#include <string> + +#include <gflags/gflags.h> +#include <grpc++/grpc++.h> + +#include "test/cpp/rpcmanager/grpc_rpc_manager_test.h" +#include "test/cpp/util/test_config.h" + +using grpc::testing::GrpcRpcManagerTest; + +// TODO: sreek - Rewrite this test. Find a better test case + +grpc::GrpcRpcManager::WorkStatus GrpcRpcManagerTest::PollForWork(void **tag, + bool *ok) { + { + std::unique_lock<grpc::mutex> lock(mu_); + std::cout << "Poll: " << std::this_thread::get_id() << std::endl; + } + + WorkStatus work_status = WORK_FOUND; + *tag = nullptr; + *ok = true; + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + { + std::unique_lock<grpc::mutex> lock(mu_); + num_calls_++; + if (num_calls_ > 50) { + std::cout << "poll: False" << std::endl; + work_status = SHUTDOWN; + ShutdownRpcManager(); + } + } + + return work_status; +} + +void GrpcRpcManagerTest::DoWork(void *tag, bool ok) { + { + std::unique_lock<grpc::mutex> lock(mu_); + std::cout << "Work: " << std::this_thread::get_id() << std::endl; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +} + +int main(int argc, char **argv) { + grpc::testing::InitTest(&argc, &argv, true); + GrpcRpcManagerTest test_rpc_manager(3, 15); + test_rpc_manager.Initialize(); + test_rpc_manager.Wait(); + + return 0; +} diff --git a/test/cpp/rpcmanager/grpc_rpc_manager_test.h b/test/cpp/rpcmanager/grpc_rpc_manager_test.h new file mode 100644 index 0000000000..0f1d3b3ed2 --- /dev/null +++ b/test/cpp/rpcmanager/grpc_rpc_manager_test.h @@ -0,0 +1,58 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + *is % allowed in string + */ +#ifndef GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H +#define GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H + +#include "src/cpp/rpcmanager/grpc_rpc_manager.h" + +namespace grpc { +namespace testing { + +class GrpcRpcManagerTest GRPC_FINAL : public GrpcRpcManager { + public: + GrpcRpcManagerTest(int min_pollers, int max_pollers) + : GrpcRpcManager(min_pollers, max_pollers), num_calls_(0){}; + + grpc::GrpcRpcManager::WorkStatus PollForWork(void **tag, + bool *ok) GRPC_OVERRIDE; + void DoWork(void *tag, bool ok) GRPC_OVERRIDE; + + private: + grpc::mutex mu_; + int num_calls_; +}; + +} // namespace testing +} // namespace grpc + +#endif // GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index e266e15481..dac227d077 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -861,6 +861,7 @@ src/cpp/common/secure_auth_context.h \ src/cpp/server/secure_server_credentials.h \ src/cpp/client/create_channel_internal.h \ src/cpp/common/channel_filter.h \ +src/cpp/rpcmanager/grpc_rpc_manager.h \ src/cpp/server/dynamic_thread_pool.h \ src/cpp/server/thread_pool_interface.h \ src/cpp/client/insecure_credentials.cc \ @@ -883,6 +884,7 @@ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ src/cpp/common/core_codegen.cc \ src/cpp/common/rpc_method.cc \ +src/cpp/rpcmanager/grpc_rpc_manager.cc \ src/cpp/server/async_generic_service.cc \ src/cpp/server/create_default_thread_pool.cc \ src/cpp/server/dynamic_thread_pool.cc \ diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index fa401fdaaf..8166fbd419 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -31,52 +31,49 @@ import math -WARMUP_SECONDS=5 -JAVA_WARMUP_SECONDS=15 # Java needs more warmup time for JIT to kick in. -BENCHMARK_SECONDS=30 +WARMUP_SECONDS = 5 +JAVA_WARMUP_SECONDS = 15 # Java needs more warmup time for JIT to kick in. +BENCHMARK_SECONDS = 30 -SMOKETEST='smoketest' -SCALABLE='scalable' -SWEEP='sweep' -DEFAULT_CATEGORIES=[SCALABLE, SMOKETEST] +SMOKETEST = 'smoketest' +SCALABLE = 'scalable' +SWEEP = 'sweep' +DEFAULT_CATEGORIES = [SCALABLE, SMOKETEST] SECURE_SECARGS = {'use_test_ca': True, 'server_host_override': 'foo.test.google.fr'} HISTOGRAM_PARAMS = { - 'resolution': 0.01, - 'max_possible': 60e9, + 'resolution': 0.01, + 'max_possible': 60e9, } EMPTY_GENERIC_PAYLOAD = { - 'bytebuf_params': { - 'req_size': 0, - 'resp_size': 0, - } + 'bytebuf_params': { + 'req_size': 0, + 'resp_size': 0, + } } EMPTY_PROTO_PAYLOAD = { - 'simple_params': { - 'req_size': 0, - 'resp_size': 0, - } + 'simple_params': { + 'req_size': 0, + 'resp_size': 0, + } } BIG_GENERIC_PAYLOAD = { - 'bytebuf_params': { - 'req_size': 65536, - 'resp_size': 65536, - } + 'bytebuf_params': { + 'req_size': 65536, + 'resp_size': 65536, + } } # target number of RPCs outstanding on across all client channels in # non-ping-pong tests (since we can only specify per-channel numbers, the # actual target will be slightly higher) -OUTSTANDING_REQUESTS={ - 'async': 6400, - 'sync': 1000 -} +OUTSTANDING_REQUESTS = {'async': 6400, 'sync': 1000} # wide is the number of client channels in multi-channel tests (1 otherwise) -WIDE=64 +WIDE = 64 def _get_secargs(is_secure): @@ -102,8 +99,10 @@ def geometric_progression(start, stop, step): n *= step -def _ping_pong_scenario(name, rpc_type, - client_type, server_type, +def _ping_pong_scenario(name, + rpc_type, + client_type, + server_type, secure=True, use_generic_payload=False, unconstrained_client=None, @@ -117,29 +116,29 @@ def _ping_pong_scenario(name, rpc_type, outstanding=None): """Creates a basic ping pong scenario.""" scenario = { - 'name': name, - 'num_servers': 1, - 'num_clients': 1, - 'client_config': { - 'client_type': client_type, - 'security_params': _get_secargs(secure), - 'outstanding_rpcs_per_channel': 1, - 'client_channels': 1, - 'async_client_threads': 1, - 'rpc_type': rpc_type, - 'load_params': { - 'closed_loop': {} + 'name': name, + 'num_servers': 1, + 'num_clients': 1, + 'client_config': { + 'client_type': client_type, + 'security_params': _get_secargs(secure), + 'outstanding_rpcs_per_channel': 1, + 'client_channels': 1, + 'async_client_threads': 1, + 'rpc_type': rpc_type, + 'load_params': { + 'closed_loop': {} + }, + 'histogram_params': HISTOGRAM_PARAMS, }, - 'histogram_params': HISTOGRAM_PARAMS, - }, - 'server_config': { - 'server_type': server_type, - 'security_params': _get_secargs(secure), - 'core_limit': server_core_limit, - 'async_server_threads': async_server_threads, - }, - 'warmup_seconds': warmup_seconds, - 'benchmark_seconds': BENCHMARK_SECONDS + 'server_config': { + 'server_type': server_type, + 'security_params': _get_secargs(secure), + 'core_limit': server_core_limit, + 'async_server_threads': async_server_threads, + }, + 'warmup_seconds': warmup_seconds, + 'benchmark_seconds': BENCHMARK_SECONDS } if use_generic_payload: if server_type != 'ASYNC_GENERIC_SERVER': @@ -151,7 +150,8 @@ def _ping_pong_scenario(name, rpc_type, scenario['client_config']['payload_config'] = EMPTY_PROTO_PAYLOAD if unconstrained_client: - outstanding_calls = outstanding if outstanding is not None else OUTSTANDING_REQUESTS[unconstrained_client] + outstanding_calls = outstanding if outstanding is not None else OUTSTANDING_REQUESTS[ + unconstrained_client] wide = channels if channels is not None else WIDE deep = int(math.ceil(1.0 * outstanding_calls / wide)) @@ -197,7 +197,9 @@ class CXXLanguage: rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_generic_payload=True, server_core_limit=1, async_server_threads=1, + use_generic_payload=True, + server_core_limit=1, + async_server_threads=1, secure=secure, categories=smoketest_categories) @@ -206,49 +208,71 @@ class CXXLanguage: rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - unconstrained_client='async', use_generic_payload=True, + unconstrained_client='async', + use_generic_payload=True, secure=secure, - categories=smoketest_categories+[SCALABLE]) + categories=smoketest_categories + [SCALABLE]) yield _ping_pong_scenario( 'cpp_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - unconstrained_client='async', use_generic_payload=True, - server_core_limit=1, async_server_threads=1, + unconstrained_client='async', + use_generic_payload=True, + server_core_limit=1, + async_server_threads=1, secure=secure) + yield _ping_pong_scenario( + 'cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_%s' % + (secstr), + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='SYNC_SERVER', + unconstrained_client='async', + secure=secure, + categories=smoketest_categories + [SCALABLE]) + for rpc_type in ['unary', 'streaming']: for synchronicity in ['sync', 'async']: yield _ping_pong_scenario( - 'cpp_protobuf_%s_%s_ping_pong_%s' % (synchronicity, rpc_type, secstr), + 'cpp_protobuf_%s_%s_ping_pong_%s' % + (synchronicity, rpc_type, secstr), rpc_type=rpc_type.upper(), client_type='%s_CLIENT' % synchronicity.upper(), server_type='%s_SERVER' % synchronicity.upper(), - server_core_limit=1, async_server_threads=1, + server_core_limit=1, + async_server_threads=1, secure=secure) yield _ping_pong_scenario( - 'cpp_protobuf_%s_%s_qps_unconstrained_%s' % (synchronicity, rpc_type, secstr), + 'cpp_protobuf_%s_%s_qps_unconstrained_%s' % + (synchronicity, rpc_type, secstr), rpc_type=rpc_type.upper(), client_type='%s_CLIENT' % synchronicity.upper(), server_type='%s_SERVER' % synchronicity.upper(), unconstrained_client=synchronicity, secure=secure, - categories=smoketest_categories+[SCALABLE]) + categories=smoketest_categories + [SCALABLE]) for channels in geometric_progression(1, 20000, math.sqrt(10)): for outstanding in geometric_progression(1, 200000, math.sqrt(10)): - if synchronicity == 'sync' and outstanding > 1200: continue - if outstanding < channels: continue - yield _ping_pong_scenario( - 'cpp_protobuf_%s_%s_qps_unconstrained_%s_%d_channels_%d_outstanding' % (synchronicity, rpc_type, secstr, channels, outstanding), - rpc_type=rpc_type.upper(), - client_type='%s_CLIENT' % synchronicity.upper(), - server_type='%s_SERVER' % synchronicity.upper(), - unconstrained_client=synchronicity, secure=secure, - categories=[SWEEP], channels=channels, outstanding=outstanding) + if synchronicity == 'sync' and outstanding > 1200: + continue + if outstanding < channels: + continue + yield _ping_pong_scenario( + 'cpp_protobuf_%s_%s_qps_unconstrained_%s_%d_channels_%d_outstanding' + % (synchronicity, rpc_type, secstr, channels, outstanding), + rpc_type=rpc_type.upper(), + client_type='%s_CLIENT' % synchronicity.upper(), + server_type='%s_SERVER' % synchronicity.upper(), + unconstrained_client=synchronicity, + secure=secure, + categories=[SWEEP], + channels=channels, + outstanding=outstanding) def __str__(self): return 'c++' @@ -267,66 +291,94 @@ class CSharpLanguage: def scenarios(self): yield _ping_pong_scenario( - 'csharp_generic_async_streaming_ping_pong', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', + 'csharp_generic_async_streaming_ping_pong', + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_GENERIC_SERVER', use_generic_payload=True, categories=[SMOKETEST]) yield _ping_pong_scenario( - 'csharp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') + 'csharp_protobuf_async_streaming_ping_pong', + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER') yield _ping_pong_scenario( - 'csharp_protobuf_async_unary_ping_pong', rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'csharp_protobuf_async_unary_ping_pong', + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', categories=[SMOKETEST]) yield _ping_pong_scenario( - 'csharp_protobuf_sync_to_async_unary_ping_pong', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='ASYNC_SERVER') + 'csharp_protobuf_sync_to_async_unary_ping_pong', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='ASYNC_SERVER') yield _ping_pong_scenario( - 'csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'csharp_protobuf_async_unary_qps_unconstrained', + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', unconstrained_client='async', - categories=[SMOKETEST,SCALABLE]) + categories=[SMOKETEST, SCALABLE]) yield _ping_pong_scenario( - 'csharp_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'csharp_protobuf_async_streaming_qps_unconstrained', + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', unconstrained_client='async', categories=[SCALABLE]) yield _ping_pong_scenario( - 'csharp_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - server_language='c++', server_core_limit=1, async_server_threads=1, + 'csharp_to_cpp_protobuf_sync_unary_ping_pong', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', + server_language='c++', + server_core_limit=1, + async_server_threads=1, categories=[SMOKETEST]) yield _ping_pong_scenario( - 'csharp_to_cpp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - server_language='c++', server_core_limit=1, async_server_threads=1) + 'csharp_to_cpp_protobuf_async_streaming_ping_pong', + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', + server_language='c++', + server_core_limit=1, + async_server_threads=1) yield _ping_pong_scenario( - 'csharp_to_cpp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - unconstrained_client='async', server_language='c++', + 'csharp_to_cpp_protobuf_async_unary_qps_unconstrained', + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', + unconstrained_client='async', + server_language='c++', categories=[SCALABLE]) yield _ping_pong_scenario( - 'csharp_to_cpp_protobuf_sync_to_async_unary_qps_unconstrained', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', - unconstrained_client='sync', server_language='c++', + 'csharp_to_cpp_protobuf_sync_to_async_unary_qps_unconstrained', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='ASYNC_SERVER', + unconstrained_client='sync', + server_language='c++', categories=[SCALABLE]) yield _ping_pong_scenario( - 'cpp_to_csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - unconstrained_client='async', client_language='c++', + 'cpp_to_csharp_protobuf_async_unary_qps_unconstrained', + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', + unconstrained_client='async', + client_language='c++', categories=[SCALABLE]) - def __str__(self): return 'csharp' @@ -356,13 +408,17 @@ class NodeLanguage: # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') yield _ping_pong_scenario( - 'node_protobuf_unary_ping_pong', rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'node_protobuf_unary_ping_pong', + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', categories=[SMOKETEST]) yield _ping_pong_scenario( - 'node_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'node_protobuf_async_unary_qps_unconstrained', + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', unconstrained_client='async', categories=[SMOKETEST]) @@ -387,6 +443,7 @@ class NodeLanguage: def __str__(self): return 'node' + class PythonLanguage: def __init__(self): @@ -400,48 +457,69 @@ class PythonLanguage: def scenarios(self): yield _ping_pong_scenario( - 'python_generic_sync_streaming_ping_pong', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', + 'python_generic_sync_streaming_ping_pong', + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='ASYNC_GENERIC_SERVER', use_generic_payload=True, categories=[SMOKETEST]) yield _ping_pong_scenario( - 'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='ASYNC_SERVER') + 'python_protobuf_sync_streaming_ping_pong', + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='ASYNC_SERVER') yield _ping_pong_scenario( - 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') + 'python_protobuf_async_unary_ping_pong', + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER') yield _ping_pong_scenario( - 'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', + 'python_protobuf_sync_unary_ping_pong', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='ASYNC_SERVER', categories=[SMOKETEST]) yield _ping_pong_scenario( - 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', + 'python_protobuf_sync_unary_qps_unconstrained', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='ASYNC_SERVER', unconstrained_client='sync') yield _ping_pong_scenario( - 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', + 'python_protobuf_sync_streaming_qps_unconstrained', + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='ASYNC_SERVER', unconstrained_client='sync') yield _ping_pong_scenario( - 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', - server_language='c++', server_core_limit=1, async_server_threads=1, + 'python_to_cpp_protobuf_sync_unary_ping_pong', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='ASYNC_SERVER', + server_language='c++', + server_core_limit=1, + async_server_threads=1, categories=[SMOKETEST]) yield _ping_pong_scenario( - 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='ASYNC_SERVER', - server_language='c++', server_core_limit=1, async_server_threads=1) + 'python_to_cpp_protobuf_sync_streaming_ping_pong', + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='ASYNC_SERVER', + server_language='c++', + server_core_limit=1, + async_server_threads=1) def __str__(self): return 'python' + class RubyLanguage: def __init__(self): @@ -456,34 +534,50 @@ class RubyLanguage: def scenarios(self): yield _ping_pong_scenario( - 'ruby_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'ruby_protobuf_sync_streaming_ping_pong', + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', categories=[SMOKETEST]) yield _ping_pong_scenario( - 'ruby_protobuf_unary_ping_pong', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'ruby_protobuf_unary_ping_pong', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', categories=[SMOKETEST]) yield _ping_pong_scenario( - 'ruby_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'ruby_protobuf_sync_unary_qps_unconstrained', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', unconstrained_client='sync') yield _ping_pong_scenario( - 'ruby_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'ruby_protobuf_sync_streaming_qps_unconstrained', + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', unconstrained_client='sync') yield _ping_pong_scenario( - 'ruby_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - server_language='c++', server_core_limit=1, async_server_threads=1) + 'ruby_to_cpp_protobuf_sync_unary_ping_pong', + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', + server_language='c++', + server_core_limit=1, + async_server_threads=1) yield _ping_pong_scenario( - 'ruby_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - server_language='c++', server_core_limit=1, async_server_threads=1) + 'ruby_to_cpp_protobuf_sync_streaming_ping_pong', + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', + server_language='c++', + server_core_limit=1, + async_server_threads=1) def __str__(self): return 'ruby' @@ -507,58 +601,85 @@ class JavaLanguage: smoketest_categories = [SMOKETEST] if secure else [] yield _ping_pong_scenario( - 'java_generic_async_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_generic_payload=True, async_server_threads=1, - secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS, + 'java_generic_async_streaming_ping_pong_%s' % secstr, + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_GENERIC_SERVER', + use_generic_payload=True, + async_server_threads=1, + secure=secure, + warmup_seconds=JAVA_WARMUP_SECONDS, categories=smoketest_categories) yield _ping_pong_scenario( - 'java_protobuf_async_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'java_protobuf_async_streaming_ping_pong_%s' % secstr, + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', async_server_threads=1, - secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS) + secure=secure, + warmup_seconds=JAVA_WARMUP_SECONDS) yield _ping_pong_scenario( - 'java_protobuf_async_unary_ping_pong_%s' % secstr, rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'java_protobuf_async_unary_ping_pong_%s' % secstr, + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', async_server_threads=1, - secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS, + secure=secure, + warmup_seconds=JAVA_WARMUP_SECONDS, categories=smoketest_categories) yield _ping_pong_scenario( - 'java_protobuf_unary_ping_pong_%s' % secstr, rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'java_protobuf_unary_ping_pong_%s' % secstr, + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', async_server_threads=1, - secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS) + secure=secure, + warmup_seconds=JAVA_WARMUP_SECONDS) yield _ping_pong_scenario( - 'java_protobuf_async_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'java_protobuf_async_unary_qps_unconstrained_%s' % secstr, + rpc_type='UNARY', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', unconstrained_client='async', - secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS, - categories=smoketest_categories+[SCALABLE]) + secure=secure, + warmup_seconds=JAVA_WARMUP_SECONDS, + categories=smoketest_categories + [SCALABLE]) yield _ping_pong_scenario( - 'java_protobuf_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + 'java_protobuf_async_streaming_qps_unconstrained_%s' % secstr, + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_SERVER', unconstrained_client='async', - secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS, + secure=secure, + warmup_seconds=JAVA_WARMUP_SECONDS, categories=[SCALABLE]) yield _ping_pong_scenario( - 'java_generic_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - unconstrained_client='async', use_generic_payload=True, - secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS, + 'java_generic_async_streaming_qps_unconstrained_%s' % secstr, + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_GENERIC_SERVER', + unconstrained_client='async', + use_generic_payload=True, + secure=secure, + warmup_seconds=JAVA_WARMUP_SECONDS, categories=[SCALABLE]) yield _ping_pong_scenario( - 'java_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - unconstrained_client='async', use_generic_payload=True, + 'java_generic_async_streaming_qps_one_server_core_%s' % secstr, + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='ASYNC_GENERIC_SERVER', + unconstrained_client='async', + use_generic_payload=True, async_server_threads=1, - secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS) + secure=secure, + warmup_seconds=JAVA_WARMUP_SECONDS) # TODO(jtattermusch): add scenarios java vs C++ @@ -586,37 +707,48 @@ class GoLanguage: # ASYNC_GENERIC_SERVER for Go actually uses a sync streaming server, # but that's mostly because of lack of better name of the enum value. yield _ping_pong_scenario( - 'go_generic_sync_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_generic_payload=True, async_server_threads=1, + 'go_generic_sync_streaming_ping_pong_%s' % secstr, + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='ASYNC_GENERIC_SERVER', + use_generic_payload=True, + async_server_threads=1, secure=secure, categories=smoketest_categories) yield _ping_pong_scenario( - 'go_protobuf_sync_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'go_protobuf_sync_streaming_ping_pong_%s' % secstr, + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', async_server_threads=1, secure=secure) yield _ping_pong_scenario( - 'go_protobuf_sync_unary_ping_pong_%s' % secstr, rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'go_protobuf_sync_unary_ping_pong_%s' % secstr, + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', async_server_threads=1, secure=secure, categories=smoketest_categories) # unconstrained_client='async' is intended (client uses goroutines) yield _ping_pong_scenario( - 'go_protobuf_sync_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'go_protobuf_sync_unary_qps_unconstrained_%s' % secstr, + rpc_type='UNARY', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', unconstrained_client='async', secure=secure, - categories=smoketest_categories+[SCALABLE]) + categories=smoketest_categories + [SCALABLE]) # unconstrained_client='async' is intended (client uses goroutines) yield _ping_pong_scenario( - 'go_protobuf_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + 'go_protobuf_sync_streaming_qps_unconstrained_%s' % secstr, + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='SYNC_SERVER', unconstrained_client='async', secure=secure, categories=[SCALABLE]) @@ -625,9 +757,12 @@ class GoLanguage: # ASYNC_GENERIC_SERVER for Go actually uses a sync streaming server, # but that's mostly because of lack of better name of the enum value. yield _ping_pong_scenario( - 'go_generic_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING', - client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - unconstrained_client='async', use_generic_payload=True, + 'go_generic_sync_streaming_qps_unconstrained_%s' % secstr, + rpc_type='STREAMING', + client_type='SYNC_CLIENT', + server_type='ASYNC_GENERIC_SERVER', + unconstrained_client='async', + use_generic_payload=True, secure=secure, categories=[SCALABLE]) @@ -638,11 +773,11 @@ class GoLanguage: LANGUAGES = { - 'c++' : CXXLanguage(), - 'csharp' : CSharpLanguage(), - 'node' : NodeLanguage(), - 'ruby' : RubyLanguage(), - 'java' : JavaLanguage(), - 'python' : PythonLanguage(), - 'go' : GoLanguage(), + 'c++': CXXLanguage(), + 'csharp': CSharpLanguage(), + 'node': NodeLanguage(), + 'ruby': RubyLanguage(), + 'java': JavaLanguage(), + 'python': PythonLanguage(), + 'go': GoLanguage(), } diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 03bdcd4264..936703223e 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -2498,6 +2498,26 @@ }, { "deps": [ + "gpr", + "grpc", + "grpc++", + "grpc++_test_config" + ], + "headers": [ + "test/cpp/rpcmanager/grpc_rpc_manager_test.h" + ], + "is_filegroup": false, + "language": "c++", + "name": "grpc_rpc_manager_test", + "src": [ + "test/cpp/rpcmanager/grpc_rpc_manager_test.cc", + "test/cpp/rpcmanager/grpc_rpc_manager_test.h" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ "grpc_plugin_support" ], "headers": [], @@ -7211,6 +7231,7 @@ "include/grpc++/support/time.h", "src/cpp/client/create_channel_internal.h", "src/cpp/common/channel_filter.h", + "src/cpp/rpcmanager/grpc_rpc_manager.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h" ], @@ -7279,6 +7300,8 @@ "src/cpp/common/completion_queue_cc.cc", "src/cpp/common/core_codegen.cc", "src/cpp/common/rpc_method.cc", + "src/cpp/rpcmanager/grpc_rpc_manager.cc", + "src/cpp/rpcmanager/grpc_rpc_manager.h", "src/cpp/server/async_generic_service.cc", "src/cpp/server/create_default_thread_pool.cc", "src/cpp/server/dynamic_thread_pool.cc", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 355fea5d5f..bc95f4274f 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -2406,6 +2406,27 @@ "cpu_cost": 1.0, "exclude_configs": [], "flaky": false, + "gtest": false, + "language": "c++", + "name": "grpc_rpc_manager_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "flaky": false, "gtest": true, "language": "c++", "name": "grpc_tool_test", @@ -31503,6 +31524,27 @@ { "args": [ "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" + ], + "boringssl": true, + "ci_platforms": [ + "linux" + ], + "cpu_cost": 8, + "defaults": "boringssl", + "exclude_configs": [], + "flaky": false, + "language": "c++", + "name": "json_run_localhost", + "platforms": [ + "linux" + ], + "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_secure", + "timeout_seconds": 180 + }, + { + "args": [ + "--scenarios_json", "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, @@ -31713,6 +31755,27 @@ { "args": [ "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" + ], + "boringssl": true, + "ci_platforms": [ + "linux" + ], + "cpu_cost": 8, + "defaults": "boringssl", + "exclude_configs": [], + "flaky": false, + "language": "c++", + "name": "json_run_localhost", + "platforms": [ + "linux" + ], + "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_insecure", + "timeout_seconds": 180 + }, + { + "args": [ + "--scenarios_json", "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj index 6bb9a6169d..509e66d00b 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj @@ -361,6 +361,7 @@ <ClInclude Include="$(SolutionDir)\..\src\cpp\server\secure_server_credentials.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" /> + <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" /> </ItemGroup> @@ -405,6 +406,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\server\create_default_thread_pool.cc"> diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters index 761424c3fa..1dd5fd90e5 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters @@ -61,6 +61,9 @@ <ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc"> <Filter>src\cpp\common</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc"> + <Filter>src\cpp\rpcmanager</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc"> <Filter>src\cpp\server</Filter> </ClCompile> @@ -410,6 +413,9 @@ <ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h"> <Filter>src\cpp\common</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h"> + <Filter>src\cpp\rpcmanager</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h"> <Filter>src\cpp\server</Filter> </ClInclude> @@ -467,6 +473,9 @@ <Filter Include="src\cpp\common"> <UniqueIdentifier>{2336e396-7e0b-8bf9-3b09-adc6ad1f0e5b}</UniqueIdentifier> </Filter> + <Filter Include="src\cpp\rpcmanager"> + <UniqueIdentifier>{f142b1a2-5198-040b-9da4-2afc09e9248a}</UniqueIdentifier> + </Filter> <Filter Include="src\cpp\server"> <UniqueIdentifier>{321b0980-74ad-e8ca-f23b-deffa5d6bb8f}</UniqueIdentifier> </Filter> diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj index 02e65926fe..5ec59397ec 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -357,6 +357,7 @@ <ItemGroup> <ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" /> + <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" /> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" /> </ItemGroup> @@ -391,6 +392,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\server\create_default_thread_pool.cc"> diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index ff1a0e9644..7e5b912f21 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -46,6 +46,9 @@ <ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc"> <Filter>src\cpp\common</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc"> + <Filter>src\cpp\rpcmanager</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc"> <Filter>src\cpp\server</Filter> </ClCompile> @@ -383,6 +386,9 @@ <ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h"> <Filter>src\cpp\common</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h"> + <Filter>src\cpp\rpcmanager</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h"> <Filter>src\cpp\server</Filter> </ClInclude> @@ -440,6 +446,9 @@ <Filter Include="src\cpp\common"> <UniqueIdentifier>{ed8e4daa-825f-fbe5-2a45-846ad9165d3d}</UniqueIdentifier> </Filter> + <Filter Include="src\cpp\rpcmanager"> + <UniqueIdentifier>{cb26a5cb-4725-6fee-8abc-09d5fcd52f39}</UniqueIdentifier> + </Filter> <Filter Include="src\cpp\server"> <UniqueIdentifier>{8a54a279-d14b-4237-0df3-1ffe1ef5a7af}</UniqueIdentifier> </Filter> diff --git a/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj new file mode 100644 index 0000000000..4502de8167 --- /dev/null +++ b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj @@ -0,0 +1,204 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" /> + <ItemGroup Label="ProjectConfigurations"> + <ProjectConfiguration Include="Debug|Win32"> + <Configuration>Debug</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Debug|x64"> + <Configuration>Debug</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|Win32"> + <Configuration>Release</Configuration> + <Platform>Win32</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|x64"> + <Configuration>Release</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + </ItemGroup> + <PropertyGroup Label="Globals"> + <ProjectGuid>{A4F24E89-1766-2FAA-9058-1094EAA018A8}</ProjectGuid> + <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected> + <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration"> + <PlatformToolset>v100</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration"> + <PlatformToolset>v110</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration"> + <PlatformToolset>v120</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration"> + <PlatformToolset>v140</PlatformToolset> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>true</UseDebugLibraries> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>false</UseDebugLibraries> + <WholeProgramOptimization>true</WholeProgramOptimization> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> + <ImportGroup Label="ExtensionSettings"> + </ImportGroup> + <ImportGroup Label="PropertySheets"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + <Import Project="$(SolutionDir)\..\vsprojects\cpptest.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\global.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\protobuf.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" /> + <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" /> + </ImportGroup> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup Condition="'$(Configuration)'=='Debug'"> + <TargetName>grpc_rpc_manager_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib> + <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> + <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)'=='Release'"> + <TargetName>grpc_rpc_manager_test</TargetName> + <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib> + <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib> + <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl> + <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl> + </PropertyGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>Disabled</Optimization> + <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>Disabled</Optimization> + <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>MaxSpeed</Optimization> + <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + </Link> + </ItemDefinitionGroup> + + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <WarningLevel>Level3</WarningLevel> + <Optimization>MaxSpeed</Optimization> + <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + <TreatWarningAsError>true</TreatWarningAsError> + <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat> + <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation> + <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + </Link> + </ItemDefinitionGroup> + + <ItemGroup> + <ClInclude Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.h" /> + </ItemGroup> + <ItemGroup> + <ClCompile Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.cc"> + </ClCompile> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++\grpc++.vcxproj"> + <Project>{C187A093-A0FE-489D-A40A-6E33DE0F9FEB}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj"> + <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj"> + <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++_test_config\grpc++_test_config.vcxproj"> + <Project>{3F7D093D-11F9-C4BC-BEB7-18EB28E3F290}</Project> + </ProjectReference> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> + <ImportGroup Label="ExtensionTargets"> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" /> + </ImportGroup> + <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild"> + <PropertyGroup> + <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText> + </PropertyGroup> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" /> + <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" /> + </Target> +</Project> + diff --git a/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters new file mode 100644 index 0000000000..fedaea08d3 --- /dev/null +++ b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup> + <ClCompile Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.cc"> + <Filter>test\cpp\rpcmanager</Filter> + </ClCompile> + </ItemGroup> + <ItemGroup> + <ClInclude Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.h"> + <Filter>test\cpp\rpcmanager</Filter> + </ClInclude> + </ItemGroup> + + <ItemGroup> + <Filter Include="test"> + <UniqueIdentifier>{9da529f7-8064-34c0-54da-0fade27184ad}</UniqueIdentifier> + </Filter> + <Filter Include="test\cpp"> + <UniqueIdentifier>{b6e53cff-22ab-1194-866d-57caa3551fd2}</UniqueIdentifier> + </Filter> + <Filter Include="test\cpp\rpcmanager"> + <UniqueIdentifier>{c63d7236-e7c6-d7b7-e3d8-f25853e358e6}</UniqueIdentifier> + </Filter> + </ItemGroup> +</Project> + |